diff --git a/iotronic/wamp/clientwamp.py b/iotronic/wamp/clientwamp.py index f0d6fb3..4c582f0 100644 --- a/iotronic/wamp/clientwamp.py +++ b/iotronic/wamp/clientwamp.py @@ -5,41 +5,33 @@ import multiprocessing from autobahn.twisted.util import sleep msg_queue=None + class Publisher(ApplicationSession): - @inlineCallbacks def onJoin(self, details): print("Publisher session ready") - self.publish(u'board.connection', 'counter') - ''' - global msg_queue - while True: - if not msg_queue.empty(): - msg=msg_queue.get() - self.publish(u'board.connection', msg) - yield sleep(1) - ''' class Subscriber(ApplicationSession): @inlineCallbacks def onJoin(self, details): print("Subscriber session ready") + self.topic_reader = self.config.extra['topic'] + print self.topic_reader - def oncounter(count): - print("event received: {0}", count) + def manage_msg(msg): + print("event received: {0}", msg) try: - yield self.subscribe(oncounter, u'board.connection') + yield self.subscribe(manage_msg, self.topic_reader) print("subscribed to topic") except Exception as e: print("could not subscribe to topic: {0}".format(e)) - self.publish(u'board.connection', 'counter') global msg_queue while True: if not msg_queue.empty(): msg=msg_queue.get() - self.publish(u'board.connection', msg) + self.publish(msg['topic'], msg['message']) yield sleep(0.01) @@ -59,12 +51,13 @@ class PublisherClient: class SubscriberClient: - def __init__(self,ip,port,realm): + def __init__(self,ip,port,realm,topic): self.ip=unicode(ip) self.port=unicode(port) self.realm=unicode(realm) + self.topic=unicode(topic) self._url = "ws://"+self.ip+":"+self.port+"/ws" - self.runner = ApplicationRunner(url=unicode(self._url), realm=self.realm, + self.runner = ApplicationRunner(url=unicode(self._url), realm=self.realm, extra={'topic':self.topic} #debug=True, debug_wamp=True, debug_app=True ) @@ -74,8 +67,8 @@ class SubscriberClient: class ClientWamp: - def __init__(self,ip,port,realm): - server = SubscriberClient(ip,port,realm) + def __init__(self,ip,port,realm,topic='board.connection'): + server = SubscriberClient(ip,port,realm,topic) sendMessage = PublisherClient(ip,port,realm) server.start() sendMessage.start() @@ -86,5 +79,6 @@ class ClientWamp: multi = multiprocessing.Process(target=reactor.run, args=()) multi.start() - def send(self,msg): - msg_queue.put(msg) \ No newline at end of file + def send(self,topic,msg): + full_msg={'topic':unicode(topic),'message':unicode(msg)} + msg_queue.put(full_msg) \ No newline at end of file