Source code for btStreamingNode

"""
A Node which can stream binary data
"""
# System imports
from threading import Thread, Timer
import time
import os
import warnings

# 3rd Party imports
from btNode import Node

# local imports

# end file header
__author__      = 'Adrian Lubitz'
__copyright__   = 'Copyright (c)2017, Blackout Technologies'

[docs]class StreamingNode(Node): """ A Node which can stream binary data """ def __init__(self, **kwargs): super(StreamingNode, self).__init__(**kwargs) self.streams = {} # mapping between group/topic/funcName and the StreamingNodeHelper which sends the stream - can only be one self.subscribers = {} # mapping between group/topic/funcName and the StreamingNodeHelper which handles the stream - could also be multiple(does not make too much sense though) - first attempt should be only one!
[docs] def publishStream(self, group, topic, funcName, stream, **kwargs): """ starts a Stream with the payload(funcName and params) to a topic. :param group: Name of the group :type group: String :param topic: Name of the topic :type topic: String :param funcName: Name of the function. :type funcName: String :param stream: a stream Object that will be stream to the group/topic/funcName :type stream: stream """ # new publishing StreamHelperNode - maybe also check if the Helper is still alive if not '{}.{}.{}'.format(group, topic, funcName) in self.streams: self.streams['{}.{}.{}'.format(group, topic, funcName)] = StreamingHelperNode(group=group, topic=topic, funcName=funcName, stream=stream, packagePath=self.packagePath, connectHash=self.connectHash, debug=self.debug, logger=self.logger, **kwargs) self.streams['{}.{}.{}'.format(group, topic, funcName)].connect(blocking=False, binary=True, **kwargs) else: self.unpublishStream(group=group, topic=topic, funcName=funcName, **kwargs) self.publishStream(group=group, topic=topic, funcName=funcName, stream=stream, **kwargs)
[docs] def unpublishStream(self, group, topic, funcName, **kwargs): # kill correct StreamHelperNode if '{}.{}.{}'.format(group, topic, funcName) in self.streams: # disconnect self.streams['{}.{}.{}'.format(group, topic, funcName)].disconnect() del self.streams['{}.{}.{}'.format(group, topic, funcName)] else: pass #TODO: exception? print('You cannot unpublish from a topic you never published on')
[docs] def subscribeStream(self, group, topic, callback, funcName=None, **kwargs): """ Subscribe to a stream on group & topic with a callback :param group: Name of the group :type group: String :param topic: Name of the topic :type topic: String :param callback: function pointer to the callback which expects one parameter as the incoming chunks of the stream :type callback: function pointer :param funcName: Name of the function. If not set this is the name of the function in the implementation(needed if you want to link a function to a different name) :type funcName: String """ if not funcName: funcName = callback.__name__ # new subscring StreamHelperNode - maybe also check if the Helper is still alive if not '{}.{}.{}'.format(group, topic, funcName) in self.subscribers: self.subscribers['{}.{}.{}'.format(group, topic, funcName)] = StreamingHelperNode(group=group, topic=topic, funcName=funcName, callback=callback, packagePath=self.packagePath, connectHash=self.connectHash, debug=self.debug, logger=self.logger, **kwargs) self.subscribers['{}.{}.{}'.format(group, topic, funcName)].connect(blocking=False, binary=True, **kwargs) else: pass # kill the old and start the new one self.unsubscribeStream(group, topic, funcName, **kwargs) self.subscribeStream(group, topic, callback, funcName, **kwargs)
[docs] def unsubscribeStream(self, group, topic, funcName, **kwargs): # kill correct StreamHelperNode if '{}.{}.{}'.format(group, topic, funcName) in self.subscribers: # disconnect self.subscribers['{}.{}.{}'.format(group, topic, funcName)].disconnect() del self.subscribers['{}.{}.{}'.format(group, topic, funcName)] else: pass #TODO: exception? print('You cannot unsubscribe from a topic you never subscribed on')
[docs]class StreamingHelperNode(Node): """ A Helper Node, which sends or receives streams """ def __init__(self, group, topic, funcName, callback=None, stream=None, **kwargs): super(StreamingHelperNode, self).__init__(**kwargs) if (stream and callback) or (not stream and not callback): raise AttributeError("Either callback XOR stream to init StreamingHelperNode. Not both nor none.") elif stream: self.sending = True self.stream = stream elif callback: self.sending = False self.callback = callback self.group = group self.topic = topic self.funcName = funcName
[docs] def onStreamData(self, data): self.callback(data)
[docs] def onConnected(self): self.nexusConnector.join("{}.{}.{}".format(self.group, self.topic, self.funcName)) if self.sending: #TODO maybe start a Thread here to leave the onConnected method gracefully # StreamHelperNode sends chunks byte = self.stream.read(1024) while byte and self.isConnected: self.nexusConnector.sio.emit('btnexus-stream', byte, namespace="/{}".format(self.nexusConnector.hostId)) byte = self.stream.read(1024) #TODO: after I sent everything or after someone forced me to - stop sending and disconnect? Or end of stream callback? time.sleep(10) # TODO: here the underlying socket buffer may not be completely empty by now which means I would kill of some bytes if I dont sleep - maybe there is a better option like waiting for the buffer and then disconnect... self.disconnect() else: self.nexusConnector.sio.on('stream', self.onStreamData, namespace="/{}".format(self.nexusConnector.hostId)) #TODO: probably change btnexus-stream to stream later
if __name__ == "__main__": pass # See examples