Source code for btStreamingNode

'''A Node which accepts audio streams and forwards them to the speech to text service of your choice and publishes the transcript on the transcript topic'''

# System imports
from threading import Thread, Timer
import time
import os
import warnings

# 3rd Party imports
from btNode import Node
from twisted.internet import ssl
from btPostRequest import BTPostRequest

# local imports
from nexus.protocols.audioStreamFactory import AudioStreamFactory
from nexus.nexusExceptions import NexusNotConnectedException
# end file header
__author__      = 'Adrian Lubitz'
__copyright__   = 'Copyright (c)2017, Blackout Technologies'


[docs]class StreamingNode(Node): def __init__(self, language=None, personalityId=None, sessionId=None, **kwargs): super(StreamingNode, self).__init__(**kwargs) self.sessionId = sessionId self.language = language if not self.language: if 'LANGUAGE' in os.environ: self.language = os.environ['LANGUAGE'] else: self.language = self.package['language'] self.personalityId = personalityId if not self.personalityId: if 'PERSONALITYID' in os.environ: self.personalityId = os.environ['PERSONALITYID'] else: self.personalityId = self.package['personalityId']
[docs] def connect(self, **kwargs): """ connect the Axon and start the reactor for twisted. This will always block because of the reactor pattern """ if 'blocking' in kwargs: warnings.warn("StreamingNodes can only be connected blocking. The call to connect will always blcok because of the reactor pattern.", Warning) #Apperently DeprecationWarnings are ignored for some reason del kwargs['blocking'] from twisted.internet import reactor super(StreamingNode, self).connect(blocking=False, **kwargs) reactor.run()
def _setUp(self): """ Setting up """ self.transport = None params = { 'integrationId': self.config['id'], 'personalityId': self.personalityId } if self.sessionId: self.logger.log(self.NEXUSINFO, "[{}]: reusing sessionId: {}".format(self.nodeName, self.sessionId)) else: try: BTPostRequest('sessionAccessRequest', params, accessToken=self.config['token'], url=self.config['host'], callback=self.setSessionId).send(True) #This is called as a blocking call - if there is never a response coming this might be a problem... except Exception as e: try: self.publishError('Unable to get the sessionId: {}'.format(e)) except NexusNotConnectedException: # print('Unable to get the sessionId: {}'.format(e)) # if not connected just prints pass time.sleep(2) # sleep self._setUp() # and retry super(StreamingNode, self)._setUp()
[docs] def setSessionId(self, response): """ callback for the sessionAccessRequest """ # print('response: {}'.format(response)) if response['success']: self.sessionId = response['sessionToken'] self.logger.log(self.NEXUSINFO, '[{}]set sessionId to {}'.format(self.nodeName, self.sessionId)) else: pass # TODO: what should I do here? - retry time.sleep(2) self._setUp()
[docs] def disconnectFromService(self): """ clean up the connection to the Service """ self.ready = False # ready when the handshake is done if self.transport: self.transport.loseConnection() self.transport = None # self.transport.connectionLost(reason=None) - this is a callback not a function to call - apperently some say it should be called self.logger.log(self.NEXUSINFO, '[{}] Killing the Streaming'.format(self.nodeName)) if self.disconnecting: # Disconnect was initialized by myself from twisted.internet import reactor reactor.stop()
def _onDisconnected(self): """ Kill the twisted connection and if the diosconnect was initialized by myself stop the whole thing, because there will be no more reconnect. """ self.disconnectFromService() super(StreamingNode, self)._onDisconnected() def _onConnected(self): """ This will be executed after a the Node is succesfully connected to the btNexus Here you need to subscribe and set everything else up. :returns: None """ # start the streaming in a thread # start a sending client here # self.subscribe(group=self.personalityId, topic='speechToText', callback=self.initStream_response) self.ready = False # ready when the handshake is done self.subscribe(group='{}.{}'.format(self.personalityId, self.sessionId), topic='speechToText', callback=self.streamTo) self.handshake() super(StreamingNode, self)._onConnected()
[docs] def streamTo(self, host, port): """ This reacts to the message from the Service telling where to connect to with twisted. """ if not self.ready: self.ready = True if not self.transport: self.host = host self.port = int(port) self.logger.log(self.NEXUSINFO, "[{}]: Want to connect to {}:{}".format(self.nodeName, host, port)) self.factory = AudioStreamFactory(self) from twisted.internet import reactor reactor.callFromThread(reactor.connectSSL, self.host, self.port, self.factory, ssl.ClientContextFactory()) self.logger.log(self.NEXUSINFO, "[{}]: Starting the AudioStreamer on {}:{}".format(self.nodeName, self.host, self.port)) else: self.logger.log(self.NEXUSINFO,'[{}] Im already connected - just ignoring this.'.format(self.nodeName)) else: self.logger.log(self.NEXUSINFO,'[{}] Im already ready - just ignoring this.'.format(self.nodeName))
def _startStreaming(self, transport): self.transport = transport Thread(target=self.onStreamReady).start()
[docs] def onStreamReady(self): ''' Start your stream with self.stream(stream) - this will stream data from the given stream to the SpeechService. ''' pass
[docs] def getSessionId(self): ''' return the sessionId this needs to be implemented for a service node ''' return self.sessionId
[docs] def stream(self, stream): """ This takes a stream and streams it to the Service """ # TODO: maybe put this in a Thread...Then it is not blocking self.datStream = stream byte = self.datStream.read(64) if not byte: self.publishDebug("Stoping to send, because no byte before loop") while byte: try: from twisted.internet import reactor reactor.callFromThread(self.transport.write, byte) except AttributeError: # if disconnected self.transport will be None and will not have a member write - thus the sending should stop break self.publishDebug("Stoping to send, because no transport") byte = self.datStream.read(64) if not byte: self.publishDebug("Stoping to send, because no byte in loop") self.publishDebug("End the streaming")
[docs] def disconnectService(self): """ This disconnects the service - to reconnect the service call reconnectService(). This will trigger onStreamReady again. """ self.disconnectFromService() # this disconnects the connection to the service # if not self.datStream.seekable(): # read from buffer if I cannot seek in reconnectService Thread(target=self.eatStream).start()
[docs] def eatStream(self): """ This just eats all the bytes from the stream if it is not seekable. """ while(not self.ready): try: self.datStream.read(64) # that maybe not there if someone calls disconnectService before stream except: # if not possible do nothing pass
[docs] def reconnectService(self): """ Reconnect to the service after a call to disconnectService - This will trigger onStreamReady. """ # if self.datStream.seekable(): # self.datStream.seek(0, 2) # set the stream position to the end. SEEK_END:2 try: if not self.transport: self.handshake() else: self.publishDebug("Already connected - ignoring this call to reconnectStream") except AttributeError as e: self.publishError("Cant reconnect stream before it was connected once! - {}".format(e))
# from twisted.internet import reactor # try: # if not self.transport: # reactor.callFromThread(reactor.connectSSL, self.host, self.port, self.factory, ssl.ClientContextFactory()) # self.logger.log(self.NEXUSINFO, "[{}]: Starting the AudioStreamer on {}:{}".format(self.nodeName, self.host, self.port)) # else: # self.publishDebug("Already connected - ignoring this call to reconnectStream") # except AttributeError as e: # self.publishError("Cant reconnect stream before it was connected once! - {}".format(e))
[docs] def handshake(self): """ Do the Handshake with the Service and retry until it works. """ if not self.ready: Timer(3, self.handshake).start() self.logger.log(self.NEXUSINFO, "[{}]: Handshake not performed yet - retry".format(self.nodeName)) try: self.publish(group=self.personalityId, topic='speechToText', funcName='initStream', params=[self.sessionId, self.language]) except Exception as e: self.logger.error(str(e))
if __name__ == '__main__': asn = StreamingNode(packagePath='./tests/packageIntegration.json', rcPath='../streaming-axon/speechIntegration/.btnexusrc') asn.connect()