diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py new file mode 100644 index 0000000..99e5f8c --- /dev/null +++ b/eventlet/green/zmq.py @@ -0,0 +1,67 @@ +__zmq__ = __import__('zmq') +from eventlet.hubs import trampoline +__patched__ = ['Context', 'Socket'] +globals().update(dict([(var, getattr(__zmq__, var)) + for var in __zmq__.__all__ + if not (var.startswith('__') + or + var in __patched__) + ])) + +class Context(__zmq__.Context): + + def socket(self, socket_type): + return Socket(self, socket_type) + +class Socket(__zmq__.Socket): + + + def _send_message(self, data, flags=0, copy=True): +# flags |= __zmq__.NOBLOCK + print 'send' + while True: + try: + return super(Socket, self)._send_message(data, flags) + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, read=True) + + def _send_copy(self, data, flags=0, copy=True): +# flags |= __zmq__.NOBLOCK + while True: + try: + return super(Socket, self)._send_copy(data, flags) + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, write=True) + + def _recv_message(self, flags=0): + + flags |= __zmq__.NOBLOCK + while True: + try: + m = super(Socket, self)._recv_message(flags) + if m: + return m + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, read=True) + + def _recv_copy(self, flags=0): + flags |= __zmq__.NOBLOCK + while True: + try: + m = super(Socket, self)._recv_copy(flags) + if m: + return m + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, read=True) + + + + \ No newline at end of file diff --git a/eventlet/hubs/zeromq.py b/eventlet/hubs/zeromq.py new file mode 100644 index 0000000..4a52508 --- /dev/null +++ b/eventlet/hubs/zeromq.py @@ -0,0 +1,83 @@ +from eventlet import patcher +from eventlet.green import zmq +from eventlet.hubs import poll +from eventlet.hubs.hub import BaseHub, noop +from eventlet.hubs.poll import READ, WRITE +from eventlet.support import clear_sys_exc_info +import sys + +time = patcher.original('time') +select = patcher.original('select') +sleep = time.sleep + +EXC_MASK = zmq.POLLERR +READ_MASK = zmq.POLLIN +WRITE_MASK = zmq.POLLOUT + +class Hub(poll.Hub): + + + + def __init__(self, clock=time.time): + BaseHub.__init__(self, clock) + self.poll = zmq.Poller() + + def register(self, fileno, new=False): + mask = 0 + if self.listeners[READ].get(fileno): + mask |= READ_MASK + if self.listeners[WRITE].get(fileno): + mask |= WRITE_MASK + if mask: + self.poll.register(fileno, mask) + else: + self.poll.unregister(fileno) + + + def wait(self, seconds=None): + readers = self.listeners[READ] + writers = self.listeners[WRITE] + + if not readers and not writers: + if seconds: + sleep(seconds) + return + try: + presult = self.do_poll(seconds) + except zmq.ZMQError, e: + # In the poll hub this part exists to special case some exceptions + # from socket. There may be some error numbers that wider use of + # this hub will throw up as needing special treatment so leaving + # this block and this comment as a remineder + raise + SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS + + if self.debug_blocking: + self.block_detect_pre() + + for fileno, event in presult: + try: + if event & READ_MASK: + readers.get(fileno, noop).cb(fileno) + if event & WRITE_MASK: + writers.get(fileno, noop).cb(fileno) + if event & EXC_MASK: + # zmq.POLLERR is returned for any error condition in the + # underlying fd (as passed through to poll/epoll) + readers.get(fileno, noop).cb(fileno) + writers.get(fileno, noop).cb(fileno) + except SYSTEM_EXCEPTIONS: + raise + except: + self.squelch_exception(fileno, sys.exc_info()) + clear_sys_exc_info() + + if self.debug_blocking: + self.block_detect_post() + + +# def do_poll(self, seconds): +# print 'poll: ', seconds +# if seconds < 0: +# seconds = 500 +# return self.poll.poll(seconds) \ No newline at end of file diff --git a/examples/zmq_chat.py b/examples/zmq_chat.py new file mode 100644 index 0000000..cd07f80 --- /dev/null +++ b/examples/zmq_chat.py @@ -0,0 +1,64 @@ +import eventlet, sys +from eventlet.green import socket, zmq +from eventlet.hubs import use_hub +use_hub('zeromq') + +ADDR = 'ipc:///tmp/chat' + +ctx = zmq.Context() + +def publish(writer): + + print "connected" + socket = ctx.socket(zmq.SUB) + + socket.setsockopt(zmq.SUBSCRIBE, "") + socket.connect(ADDR) + eventlet.sleep(0.1) + + while True: + msg = socket.recv_pyobj() + str_msg = "%s: %s" % msg + writer.write(str_msg) + writer.flush() + + +PORT=3001 + +def read_chat_forever(reader, pub_socket): + + line = reader.readline() + who = 'someone' + while line: + print "Chat:", line.strip() + if line.startswith('name:'): + who = line.split(':')[-1].strip() + + try: + pub_socket.send_pyobj((who, line)) + except socket.error, e: + # ignore broken pipes, they just mean the participant + # closed its connection already + if e[0] != 32: + raise + line = reader.readline() + print "Participant left chat." + +try: + print "ChatServer starting up on port %s" % PORT + server = eventlet.listen(('0.0.0.0', PORT)) + pub_socket = ctx.socket(zmq.PUB) + pub_socket.bind(ADDR) + eventlet.spawn_n(publish, + sys.stdout) + while True: + new_connection, address = server.accept() + + print "Participant joined chat." + eventlet.spawn_n(publish, + new_connection.makefile('w')) + eventlet.spawn_n(read_chat_forever, + new_connection.makefile('r'), + pub_socket) +except (KeyboardInterrupt, SystemExit): + print "ChatServer exiting." \ No newline at end of file diff --git a/tests/zeromq_test.py b/tests/zeromq_test.py new file mode 100644 index 0000000..439cfaa --- /dev/null +++ b/tests/zeromq_test.py @@ -0,0 +1,49 @@ +from eventlet import spawn, sleep, getcurrent +from eventlet.hubs import use_hub, get_hub +from eventlet.green import zmq +from nose.tools import * +from tests import mock, LimitedTestCase +from eventlet.hubs.hub import READ, WRITE + +class _TestZMQ(LimitedTestCase): + + def setUp(self): + use_hub('zeromq') + + super(_TestZMQ, self).setUp() +# self.timer.cancel() + + def tearDown(self): + super(_TestZMQ, self).tearDown() + use_hub() + +class TestUpstreamDownStream(_TestZMQ): + + def _get_socket_pair(self): + return (zmq.Context().socket(zmq.PAIR), + zmq.Context().socket(zmq.PAIR)) + + + def test_recv_non_blocking(self): + ipc = 'ipc:///tmp/tests' + req, rep = self._get_socket_pair() + req.connect(ipc) + rep.bind(ipc) + sleep(0.2) +# req.send('test') +# set_trace() + hub = get_hub() +# hub.add(READ, rep, getcurrent().switch) + msg = {} + def rx(): + msg['res'] = rep.recv() + spawn(rx) + + req.send('test') + + sleep(0.2) + + + self.assertEqual(msg['res'], 'test') + +