updated clientwamp with fix and variable topic

This commit is contained in:
root 2015-10-22 12:24:01 +02:00
parent 91f8bae544
commit 7061b239be

View File

@ -5,41 +5,33 @@ import multiprocessing
from autobahn.twisted.util import sleep from autobahn.twisted.util import sleep
msg_queue=None msg_queue=None
class Publisher(ApplicationSession): class Publisher(ApplicationSession):
@inlineCallbacks
def onJoin(self, details): def onJoin(self, details):
print("Publisher session ready") print("Publisher session ready")
self.publish(u'board.connection', 'counter')
'''
global msg_queue
while True:
if not msg_queue.empty():
msg=msg_queue.get()
self.publish(u'board.connection', msg)
yield sleep(1)
'''
class Subscriber(ApplicationSession): class Subscriber(ApplicationSession):
@inlineCallbacks @inlineCallbacks
def onJoin(self, details): def onJoin(self, details):
print("Subscriber session ready") print("Subscriber session ready")
self.topic_reader = self.config.extra['topic']
print self.topic_reader
def oncounter(count): def manage_msg(msg):
print("event received: {0}", count) print("event received: {0}", msg)
try: try:
yield self.subscribe(oncounter, u'board.connection') yield self.subscribe(manage_msg, self.topic_reader)
print("subscribed to topic") print("subscribed to topic")
except Exception as e: except Exception as e:
print("could not subscribe to topic: {0}".format(e)) print("could not subscribe to topic: {0}".format(e))
self.publish(u'board.connection', 'counter')
global msg_queue global msg_queue
while True: while True:
if not msg_queue.empty(): if not msg_queue.empty():
msg=msg_queue.get() msg=msg_queue.get()
self.publish(u'board.connection', msg) self.publish(msg['topic'], msg['message'])
yield sleep(0.01) yield sleep(0.01)
@ -59,12 +51,13 @@ class PublisherClient:
class SubscriberClient: class SubscriberClient:
def __init__(self,ip,port,realm): def __init__(self,ip,port,realm,topic):
self.ip=unicode(ip) self.ip=unicode(ip)
self.port=unicode(port) self.port=unicode(port)
self.realm=unicode(realm) self.realm=unicode(realm)
self.topic=unicode(topic)
self._url = "ws://"+self.ip+":"+self.port+"/ws" self._url = "ws://"+self.ip+":"+self.port+"/ws"
self.runner = ApplicationRunner(url=unicode(self._url), realm=self.realm, self.runner = ApplicationRunner(url=unicode(self._url), realm=self.realm, extra={'topic':self.topic}
#debug=True, debug_wamp=True, debug_app=True #debug=True, debug_wamp=True, debug_app=True
) )
@ -74,8 +67,8 @@ class SubscriberClient:
class ClientWamp: class ClientWamp:
def __init__(self,ip,port,realm): def __init__(self,ip,port,realm,topic='board.connection'):
server = SubscriberClient(ip,port,realm) server = SubscriberClient(ip,port,realm,topic)
sendMessage = PublisherClient(ip,port,realm) sendMessage = PublisherClient(ip,port,realm)
server.start() server.start()
sendMessage.start() sendMessage.start()
@ -86,5 +79,6 @@ class ClientWamp:
multi = multiprocessing.Process(target=reactor.run, args=()) multi = multiprocessing.Process(target=reactor.run, args=())
multi.start() multi.start()
def send(self,msg): def send(self,topic,msg):
msg_queue.put(msg) full_msg={'topic':unicode(topic),'message':unicode(msg)}
msg_queue.put(full_msg)