'''A Node which accepts audio streams and forwards them to the speech to text service of your choice and publishes the transcript on the transcript topic'''
# System imports
from threading import Thread
import time
import os
# 3rd Party imports
from btNode import Node
from twisted.internet import ssl
from btPostRequest import BTPostRequest
# local imports
from nexus.protocols.audioStreamFactory import AudioStreamFactory
# end file header
__author__ = 'Adrian Lubitz'
__copyright__ = 'Copyright (c)2017, Blackout Technologies'
[docs]class StreamingNode(Node):
def __init__(self, **kwargs):
super(StreamingNode, self).__init__(**kwargs)
# self.personalityId = personalityId
# self.integrationId = integrationId
# self.language = language
# self.sessionId = sessionId
# print('after STREAMINGINIT')
[docs] def connect(self, **kwargs):
from twisted.internet import reactor
super(StreamingNode, self).connect(blocking=False, **kwargs)
reactor.run()
def _setUp(self):
self.transport = None
if 'language' in self.initKwargs:
self.language = self.initKwargs['language']
else:
self.language = 'en-US'
if 'personalityId' in self.initKwargs:
self.personalityId = self.initKwargs['personalityId']
else:
self.personalityId = os.environ["PERSONALITYID"]
# self.msKey = os.environ["MSKEY"]
if 'integrationId' in self.initKwargs:
self.integrationId = self.initKwargs['integrationId']
else:
self.integrationId = os.environ['INTEGRATIONID'] #"f0458d18-3108-11e9-b210-d663bd873d93" - This is the robot integrationId - this needs to be set correctly using env
params = {
'integrationId': self.integrationId,
'personalityId': self.personalityId
}
if 'sessionId' in self.initKwargs:
self.sessionId = self.initKwargs['sessionId']
else:
try:
print("reusing sessionId: {}".format(self.sessionId))
except AttributeError:
try:
print("URL: {}".format(self.axonURL))
BTPostRequest('sessionAccessRequest', params, accessToken=self.token, url=self.axonURL, callback=self.setSessionId).send(True) #This is called as a blocking call - if there is never a response coming this might be a problem...
except Exception as e:
try:
self.publishError('Unable to get the sessionId: {}'.format(e))
except:
print('Unable to get the sessionId: {}'.format(e)) # if not connected just prints
time.sleep(2) # sleep
self._setUp() # and retry
super(StreamingNode, self)._setUp()
[docs] def setSessionId(self, response):
# print('response: {}'.format(response))
if response['success']:
self.sessionId = response['sessionToken']
print('[{}]set sessionId to {}'.format(self.nodeName, self.sessionId))
else:
pass # TODO: what should I do here? - retry
def _onDisconnected(self):
# kill the connection here
if self.transport:
self.transport.loseConnection()
# self.transport.connectionLost(reason=None) - this is a callback not a function to call ^^
print('Killing the Streaming')
if self.disconnecting: # Disconnect was initialized by myself
from twisted.internet import reactor
reactor.stop()
super(StreamingNode, self)._onDisconnected()
def _onConnected(self):
"""
This will be executed after a the Node is succesfully connected to the btNexus
Here you need to subscribe and set everything else up.
:returns: None
"""
# start the streaming in a thread
# start a sending client here
# self.subscribe(group=self.personalityId, topic='speechToText', callback=self.initStream_response)
# TODO: also subscribe to persId.sessId, speechToText, streamTo
self.subscribe(group='{}.{}'.format(self.personalityId, self.sessionId), topic='speechToText', callback=self.streamTo)
self.publish(group=self.personalityId, topic='speechToText', funcName='initStream', params=[self.sessionId, self.language])
super(StreamingNode, self)._onConnected()
[docs] def streamTo(self, host, port):
if not self.transport:
self.host = host
self.port = int(port)
print('Want to connect to {}:{}'.format(host, port))
factory = AudioStreamFactory(self)
from twisted.internet import reactor
reactor.callFromThread(reactor.connectSSL, self.host, self.port, factory, ssl.ClientContextFactory())
print('Starting the AudioStreamer on {}:{}'.format(self.host, self.port))
else:
print('Im already connected - just ignoring this.')
def _startStreaming(self, transport):
self.transport = transport
Thread(target=self.onStreamReady).start()
[docs] def onStreamReady(self):
'''
Stream as long as you want but use reactor.callFromThread and after finishing self.transport.loseConnection and self.transport = None
'''
pass
[docs] def getSessionId(self):
'''
return the sessionId this needs to be implemented for a service node
'''
return self.sessionId
if __name__ == '__main__':
asn = StreamingNode()
asn.connect()