Source code for btStreamingNode

'''A Node which accepts audio streams and forwards them to the speech to text service of your choice and publishes the transcript on the transcript topic'''

# System imports
from threading import Thread, Timer
import time
import os
import warnings

# 3rd Party imports
from btNode import Node
from twisted.internet import ssl
from btPostRequest import BTPostRequest

# local imports
from nexus.protocols.audioStreamFactory import AudioStreamFactory
# end file header
__author__      = 'Adrian Lubitz'
__copyright__   = 'Copyright (c)2017, Blackout Technologies'


[docs]class StreamingNode(Node): def __init__(self, language=None, personalityId=None, integrationId=None, sessionId=None, **kwargs): super(StreamingNode, self).__init__(**kwargs) self.language = language if self.language == None: self.language = os.environ['LANGUAGE'] self.personalityId = personalityId if self.personalityId == None: self.personalityId = os.environ['PERSONALITYID'] self.integrationId = integrationId if self.integrationId == None: self.integrationId = os.environ['INTEGRATIONID'] self.sessionId = sessionId if self.sessionId == None: try: self.sessionId = os.environ['SESSIONID'] except: print('SessionId not set - will be set in the _setUp()')
[docs] def connect(self, **kwargs): """ connect the Axon and start the reactor for twisted. This will always block because of the reactor pattern """ if 'blocking' in kwargs: warnings.warn("StreamingNodes cant be connected non-blocking. The call to connect will always blcok because of the reactor pattern.", Warning) #Apperently DeprecationWarnings are ignored for some reason del kwargs['blocking'] from twisted.internet import reactor super(StreamingNode, self).connect(blocking=False, **kwargs) reactor.run()
def _setUp(self): """ Setting up """ self.transport = None params = { 'integrationId': self.integrationId, 'personalityId': self.personalityId } try: print("reusing sessionId: {}".format(self.sessionId)) except AttributeError: try: print("URL: {}".format(self.axonURL)) BTPostRequest('sessionAccessRequest', params, accessToken=self.token, url=self.axonURL, callback=self.setSessionId).send(True) #This is called as a blocking call - if there is never a response coming this might be a problem... except Exception as e: try: self.publishError('Unable to get the sessionId: {}'.format(e)) except: print('Unable to get the sessionId: {}'.format(e)) # if not connected just prints time.sleep(2) # sleep self._setUp() # and retry super(StreamingNode, self)._setUp()
[docs] def setSessionId(self, response): """ callback for the sessionAccessRequest """ # print('response: {}'.format(response)) if response['success']: self.sessionId = response['sessionToken'] print('[{}]set sessionId to {}'.format(self.nodeName, self.sessionId)) else: pass # TODO: what should I do here? - retry
def _onDisconnected(self): """ Kill the twisted connection and if the diosconnect was initialized by myself stop the whole thing, because there will be no more reconnect. """ # kill the connection here if self.transport: self.transport.loseConnection() # self.transport.connectionLost(reason=None) - this is a callback not a function to call - apperently some say it should be called print('Killing the Streaming') if self.disconnecting: # Disconnect was initialized by myself from twisted.internet import reactor reactor.stop() super(StreamingNode, self)._onDisconnected() def _onConnected(self): """ This will be executed after a the Node is succesfully connected to the btNexus Here you need to subscribe and set everything else up. :returns: None """ # start the streaming in a thread # start a sending client here # self.subscribe(group=self.personalityId, topic='speechToText', callback=self.initStream_response) self.ready = False # ready when the handshake is done self.subscribe(group='{}.{}'.format(self.personalityId, self.sessionId), topic='speechToText', callback=self.streamTo) self.handshake() super(StreamingNode, self)._onConnected()
[docs] def streamTo(self, host, port): """ This reacts to the message from the Service telling where to connect to with twisted. """ self.ready = True if not self.transport: self.host = host self.port = int(port) print('Want to connect to {}:{}'.format(host, port)) factory = AudioStreamFactory(self) from twisted.internet import reactor reactor.callFromThread(reactor.connectSSL, self.host, self.port, factory, ssl.ClientContextFactory()) print('Starting the AudioStreamer on {}:{}'.format(self.host, self.port)) else: print('Im already connected - just ignoring this.')
def _startStreaming(self, transport): self.transport = transport Thread(target=self.onStreamReady).start()
[docs] def onStreamReady(self): ''' Start your stream with self.stream(stream) - this will stream data from the given stream to the SpeechService. ''' pass
[docs] def getSessionId(self): ''' return the sessionId this needs to be implemented for a service node ''' return self.sessionId
[docs] def stream(self, stream): """ This takes a stream and streams it to the Service """ # TODO: maybe put this in a Thread...Then it is not blocking from twisted.internet import reactor byte = stream.read(64) while byte: reactor.callFromThread(self.transport.write, byte) byte = stream.read(64)
[docs] def handshake(self): """ Do the Handshake with the Service and retry until it works. """ if not self.ready: Timer(3, self.handshake).start() print('Handshake not performed yet - retry') try: self.publish(group=self.personalityId, topic='speechToText', funcName='initStream', params=[self.sessionId, self.language]) except Exception as e: print(e)
if __name__ == '__main__': asn = StreamingNode(packagePath='./tests/packageIntegration.json', rcPath='../streaming-axon/speechIntegration/.btnexusrc') asn.connect()