diff --git a/.hgignore b/.hgignore index b0ff21d..30dbc1c 100644 --- a/.hgignore +++ b/.hgignore @@ -7,6 +7,7 @@ dist build *.esproj .DS_Store +.idea doc/_build annotated cover @@ -17,6 +18,7 @@ lib* bin include .noseids +pip-log.txt syntax: re ^.ropeproject/.*$ diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py new file mode 100644 index 0000000..8f3c824 --- /dev/null +++ b/eventlet/green/zmq.py @@ -0,0 +1,81 @@ +__zmq__ = __import__('zmq') +from eventlet import sleep +from eventlet.hubs import trampoline, get_hub + +__patched__ = ['Context', 'Socket'] +globals().update(dict([(var, getattr(__zmq__, var)) + for var in __zmq__.__all__ + if not (var.startswith('__') + or + var in __patched__) + ])) + + +def get_hub_name_from_instance(hub): + return hub.__class__.__module__.rsplit('.',1)[-1] + +def Context(io_threads=1): + hub = get_hub() + hub_name = get_hub_name_from_instance(hub) + if hub_name != 'zeromq': + raise RuntimeError("Hub must be 'zeromq', got '%s'" % hub_name) + return hub.get_context(io_threads) + +class _Context(__zmq__.Context): + + def socket(self, socket_type): + return Socket(self, socket_type) + +class Socket(__zmq__.Socket): + + + def _send_message(self, data, flags=0, copy=True): + flags |= __zmq__.NOBLOCK + while True: + try: + super(Socket, self)._send_message(data, flags) + return + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, write=True) + + def _send_copy(self, data, flags=0, copy=True): + flags |= __zmq__.NOBLOCK + while True: + try: + super(Socket, self)._send_copy(data, flags) + return + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, write=True) + + def _recv_message(self, flags=0): + + flags |= __zmq__.NOBLOCK + while True: + try: + m = super(Socket, self)._recv_message(flags) + if m: + return m + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, read=True) + + def _recv_copy(self, flags=0): + flags |= __zmq__.NOBLOCK + while True: + try: + m = super(Socket, self)._recv_copy(flags) + if m: + return m + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, read=True) + + + + \ No newline at end of file diff --git a/eventlet/hubs/zeromq.py b/eventlet/hubs/zeromq.py new file mode 100644 index 0000000..2433bb9 --- /dev/null +++ b/eventlet/hubs/zeromq.py @@ -0,0 +1,99 @@ +from eventlet import patcher +from eventlet.green import zmq +from eventlet.hubs import poll, _threadlocal +from eventlet.hubs.hub import BaseHub, noop +from eventlet.hubs.poll import READ, WRITE +from eventlet.support import clear_sys_exc_info +import sys + +time = patcher.original('time') +select = patcher.original('select') +sleep = time.sleep + +EXC_MASK = zmq.POLLERR +READ_MASK = zmq.POLLIN +WRITE_MASK = zmq.POLLOUT + +class Hub(poll.Hub): + + + def __init__(self, clock=time.time): + BaseHub.__init__(self, clock) + self.poll = zmq.Poller() + + def get_context(self, io_threads=1): + """zmq's Context must be unique within a hub + + The zeromq API documentation states: + All zmq sockets passed to the zmq_poll() function must share the same + zmq context and must belong to the thread calling zmq_poll() + + As zmq_poll is what's eventually being called then we need to insure + that all sockets that are going to be passed to zmq_poll (via + hub.do_poll) are in the same context + """ + try: + return _threadlocal.context + except AttributeError: + _threadlocal.context = zmq._Context(io_threads) + return _threadlocal.context + + def register(self, fileno, new=False): + mask = 0 + if self.listeners[READ].get(fileno): + mask |= READ_MASK + if self.listeners[WRITE].get(fileno): + mask |= WRITE_MASK + if mask: + self.poll.register(fileno, mask) + else: + self.poll.unregister(fileno) + + + def wait(self, seconds=None): + readers = self.listeners[READ] + writers = self.listeners[WRITE] + + if not readers and not writers: + if seconds: + sleep(seconds) + return + try: + presult = self.do_poll(seconds) + except zmq.ZMQError, e: + # In the poll hub this part exists to special case some exceptions + # from socket. There may be some error numbers that wider use of + # this hub will throw up as needing special treatment so leaving + # this block and this comment as a remineder + raise + SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS + + if self.debug_blocking: + self.block_detect_pre() + + for fileno, event in presult: + try: + if event & READ_MASK: + readers.get(fileno, noop).cb(fileno) + if event & WRITE_MASK: + writers.get(fileno, noop).cb(fileno) + if event & EXC_MASK: + # zmq.POLLERR is returned for any error condition in the + # underlying fd (as passed through to poll/epoll) + readers.get(fileno, noop).cb(fileno) + writers.get(fileno, noop).cb(fileno) + except SYSTEM_EXCEPTIONS: + raise + except: + self.squelch_exception(fileno, sys.exc_info()) + clear_sys_exc_info() + + if self.debug_blocking: + self.block_detect_post() + + +# def do_poll(self, seconds): +# print 'poll: ', seconds +# if seconds < 0: +# seconds = 500 +# return self.poll.poll(seconds) \ No newline at end of file diff --git a/examples/chat_bridge.py b/examples/chat_bridge.py new file mode 100644 index 0000000..20aec04 --- /dev/null +++ b/examples/chat_bridge.py @@ -0,0 +1,20 @@ +import sys +from zmq import FORWARDER, PUB, SUB, SUBSCRIBE +from zmq.devices import Device + + +if __name__ == "__main__": + usage = 'usage: chat_bridge sub_address pub_address' + if len (sys.argv) != 3: + print usage + sys.exit(1) + + sub_addr = sys.argv[1] + pub_addr = sys.argv[2] + print "Recieving on %s" % sub_addr + print "Sending on %s" % pub_addr + device = Device(FORWARDER, SUB, PUB) + device.bind_in(sub_addr) + device.setsockopt_in(SUBSCRIBE, "") + device.bind_out(pub_addr) + device.start() diff --git a/examples/distributed_websocket_chat.py b/examples/distributed_websocket_chat.py new file mode 100644 index 0000000..7ad0483 --- /dev/null +++ b/examples/distributed_websocket_chat.py @@ -0,0 +1,127 @@ +"""This is a websocket chat example with many servers. A client can connect to +any of the servers and their messages will be received by all clients connected +to any of the servers. + +Run the examples like this: + +$ python examples/chat_bridge.py tcp://127.0.0.1:12345 tcp://127.0.0.1:12346 + +and the servers like this (changing the port for each one obviously): + +$ python examples/distributed_websocket_chat.py -p tcp://127.0.0.1:12345 -s tcp://127.0.0.1:12346 7000 + +So all messages are published to port 12345 and the device forwards all the +messages to 12346 where they are subscribed to +""" +import os, sys +import eventlet +from collections import defaultdict +from eventlet import spawn_n, sleep +from eventlet import wsgi +from eventlet import websocket +from eventlet.green import zmq +from eventlet.hubs import get_hub, use_hub +from uuid import uuid1 + +use_hub('zeromq') +ctx = zmq.Context() + +class IDName(object): + + def __init__(self): + self.id = uuid1() + self.name = None + + def __str__(self): + if self.name: + return self.name + else: + return str(self.id) + + def pack_message(self, msg): + return self, msg + + def unpack_message(self, msg): + sender, message = msg + sender_name = 'you said' if sender.id == self.id \ + else '%s says' % sender + return "%s: %s" % (sender_name, message) + + +participants = defaultdict(IDName) + +def subscribe_and_distribute(sub_socket): + global participants + while True: + msg = sub_socket.recv_pyobj() + for ws, name_id in participants.items(): + to_send = name_id.unpack_message(msg) + if to_send: + try: + ws.send(to_send) + except: + del participants[ws] + +@websocket.WebSocketWSGI +def handle(ws): + global pub_socket + name_id = participants[ws] + ws.send("Connected as %s, change name with 'name: new_name'" % name_id) + try: + while True: + m = ws.wait() + if m is None: + break + if m.startswith('name:'): + old_name = str(name_id) + new_name = m.split(':', 1)[1].strip() + name_id.name = new_name + m = 'Changed name from %s' % old_name + pub_socket.send_pyobj(name_id.pack_message(m)) + sleep() + finally: + del participants[ws] + +def dispatch(environ, start_response): + """Resolves to the web page or the websocket depending on the path.""" + global port + if environ['PATH_INFO'] == '/chat': + return handle(environ, start_response) + else: + start_response('200 OK', [('content-type', 'text/html')]) + return [open(os.path.join( + os.path.dirname(__file__), + 'websocket_chat.html')).read() % dict(port=port)] + +port = None + +if __name__ == "__main__": + usage = 'usage: websocket_chat -p pub address -s sub address port number' + if len (sys.argv) != 6: + print usage + sys.exit(1) + + pub_addr = sys.argv[2] + sub_addr = sys.argv[4] + try: + port = int(sys.argv[5]) + except ValueError: + print "Error port supplied couldn't be converted to int\n", usage + sys.exit(1) + + try: + pub_socket = ctx.socket(zmq.PUB) + pub_socket.connect(pub_addr) + print "Publishing to %s" % pub_addr + sub_socket = ctx.socket(zmq.SUB) + sub_socket.connect(sub_addr) + sub_socket.setsockopt(zmq.SUBSCRIBE, "") + print "Subscribing to %s" % sub_addr + except: + print "Couldn't create sockets\n", usage + sys.exit(1) + + spawn_n(subscribe_and_distribute, sub_socket) + listener = eventlet.listen(('127.0.0.1', port)) + print "\nVisit http://localhost:%s/ in your websocket-capable browser.\n" % port + wsgi.server(listener, dispatch) diff --git a/examples/websocket_chat.html b/examples/websocket_chat.html index 3eb7efc..9237532 100644 --- a/examples/websocket_chat.html +++ b/examples/websocket_chat.html @@ -5,7 +5,7 @@