Source code for btStreamingNode

'''A Node which accepts audio streams and forwards them to the speech to text service of your choice and publishes the transcript on the transcript topic'''

# System imports
from threading import Thread
import time
import os
# 3rd Party imports
from btNode import Node
from twisted.internet import ssl
from btPostRequest import BTPostRequest

# local imports
from nexus.protocols.audioStreamFactory import AudioStreamFactory
# end file header
__author__      = 'Adrian Lubitz'
__copyright__   = 'Copyright (c)2017, Blackout Technologies'


[docs]class StreamingNode(Node): # def __init__((self, token=None, axonURL=None, debug=None, logger=None): # super()
[docs] def connect(self): from twisted.internet import reactor super(StreamingNode, self).connect(blocking=False) reactor.run()
def _setUp(self): super(StreamingNode, self)._setUp() print("Calling StreamingNode _setUp") self.transport = None self.personalityId = os.environ["PERSONALITYID"] # self.msKey = os.environ["MSKEY"] self.integrationId = os.environ['INTEGRATIONID'] #"f0458d18-3108-11e9-b210-d663bd873d93" - This is the robot integrationId - this needs to be set correctly using env params = { 'integrationId': self.integrationId, 'personalityId': self.personalityId } try: print("URL: {}".format(self.axonURL)) BTPostRequest('sessionAccessRequest', params, accessToken=self.token, url=self.axonURL, callback=self.setSessionId).send(True) #This is called as a blocking call - if there is never a response coming this might be a problem... except Exception as e: try: self.publishError('Unable to get sessionId: {}'.format(e)) except: pass # if not connected it will only print here time.sleep(2) # sleep self._setUp() # and retry self.language = 'en-US' # TODO: This needs to be dynamic!!!
[docs] def setSessionId(self, response): # print('response: {}'.format(response)) if response['success']: self.sessionId = response['sessionToken'] print('set sessionId to {}'.format(self.sessionId)) else: pass # TODO: what should I do here? - retry
def _onDisconnected(self): # kill the connection here super(StreamingNode, self)._onDisconnected() if self.transport: self.transport.loseConnection() self.transport.connectionLost(reason=None) print('Killing the AudioStreamer') def _onConnected(self): """ This will be executed after a the Node is succesfully connected to the btNexus Here you need to subscribe and set everything else up. :returns: None """ super(StreamingNode, self)._onConnected() # start the streaming in a thread # start a sending client here self.subscribe(group=self.personalityId, topic='audio', callback=self.initStream_response) self.publish(group=self.personalityId, topic='audio', funcName='initStream', params=[self.sessionId, self.language])
[docs] def initStream_response(self, **kwargs): if not self.transport: serverAddress = kwargs['returnValue'] print('Want to connect to {}'.format(serverAddress)) self.host, self.port = serverAddress.split(':') self.port = int(self.port) factory = AudioStreamFactory(self) from twisted.internet import reactor reactor.callFromThread(reactor.connectSSL, self.host, self.port, factory, ssl.ClientContextFactory()) print('Starting the AudioStreamer on {}:{}'.format(self.host, self.port)) else: print('Im already connected - just ignoring this.')
def _startStreaming(self, transport): self.transport = transport Thread(target=self.onStreamReady).start()
[docs] def onStreamReady(self): ''' Stream as long as you want but use reactor.callFromThread and after finishing self.transport.loseConnection and self.transport = None ''' pass
[docs] def getSessionId(self): ''' return the sessionId this needs to be implemented for a service node ''' return self.sessionId
if __name__ == '__main__': asn = StreamingNode() asn.connect()