From 8972567f0a4e81e96eff32a0bef9f4980cea25e1 Mon Sep 17 00:00:00 2001 From: Ben Ford Date: Mon, 20 Sep 2010 06:57:40 +0100 Subject: [PATCH 01/13] Added pycharm and pip-log.txt to hgignore --- .hgignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.hgignore b/.hgignore index b0ff21d..30dbc1c 100644 --- a/.hgignore +++ b/.hgignore @@ -7,6 +7,7 @@ dist build *.esproj .DS_Store +.idea doc/_build annotated cover @@ -17,6 +18,7 @@ lib* bin include .noseids +pip-log.txt syntax: re ^.ropeproject/.*$ From b75e83a35c2bd3458f14917cb532348e958ca0b5 Mon Sep 17 00:00:00 2001 From: Ben Ford Date: Mon, 20 Sep 2010 07:08:27 +0100 Subject: [PATCH 02/13] First stab at zeromq support. This consists of: A new hub: This closely mirrors the poll hub with some of the internal logic changed to reflect zmq's flags. A green module for zmq: This subclasses Context and Socket to ensure calls are non blocking. A (very sparse) beginings of a test module. An example: A melding of the pyzmq chat example and the eventlet telnet chat example. TODO zmq_poll chokes if the sockets passed to it come from different contexts. As context is the entry point to everything else then it would make sense to include a check in here that each thread has only one context instance. By context being the entry point I mean: ctx = zmq.Context() socket = ctx.socket(zmq.) This call to socket is repeated for each socket you want and ctx must be the same one for each thread. Tests. I'd like to get to the point f having all zmq socket pairs tested - and perhaps a nice benchmark suite too. --- eventlet/green/zmq.py | 67 +++++++++++++++++++++++++++++++++ eventlet/hubs/zeromq.py | 83 +++++++++++++++++++++++++++++++++++++++++ examples/zmq_chat.py | 64 +++++++++++++++++++++++++++++++ tests/zeromq_test.py | 49 ++++++++++++++++++++++++ 4 files changed, 263 insertions(+) create mode 100644 eventlet/green/zmq.py create mode 100644 eventlet/hubs/zeromq.py create mode 100644 examples/zmq_chat.py create mode 100644 tests/zeromq_test.py diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py new file mode 100644 index 0000000..99e5f8c --- /dev/null +++ b/eventlet/green/zmq.py @@ -0,0 +1,67 @@ +__zmq__ = __import__('zmq') +from eventlet.hubs import trampoline +__patched__ = ['Context', 'Socket'] +globals().update(dict([(var, getattr(__zmq__, var)) + for var in __zmq__.__all__ + if not (var.startswith('__') + or + var in __patched__) + ])) + +class Context(__zmq__.Context): + + def socket(self, socket_type): + return Socket(self, socket_type) + +class Socket(__zmq__.Socket): + + + def _send_message(self, data, flags=0, copy=True): +# flags |= __zmq__.NOBLOCK + print 'send' + while True: + try: + return super(Socket, self)._send_message(data, flags) + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, read=True) + + def _send_copy(self, data, flags=0, copy=True): +# flags |= __zmq__.NOBLOCK + while True: + try: + return super(Socket, self)._send_copy(data, flags) + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, write=True) + + def _recv_message(self, flags=0): + + flags |= __zmq__.NOBLOCK + while True: + try: + m = super(Socket, self)._recv_message(flags) + if m: + return m + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, read=True) + + def _recv_copy(self, flags=0): + flags |= __zmq__.NOBLOCK + while True: + try: + m = super(Socket, self)._recv_copy(flags) + if m: + return m + except __zmq__.ZMQError, e: + if e.errno != EAGAIN: + raise + trampoline(self, read=True) + + + + \ No newline at end of file diff --git a/eventlet/hubs/zeromq.py b/eventlet/hubs/zeromq.py new file mode 100644 index 0000000..4a52508 --- /dev/null +++ b/eventlet/hubs/zeromq.py @@ -0,0 +1,83 @@ +from eventlet import patcher +from eventlet.green import zmq +from eventlet.hubs import poll +from eventlet.hubs.hub import BaseHub, noop +from eventlet.hubs.poll import READ, WRITE +from eventlet.support import clear_sys_exc_info +import sys + +time = patcher.original('time') +select = patcher.original('select') +sleep = time.sleep + +EXC_MASK = zmq.POLLERR +READ_MASK = zmq.POLLIN +WRITE_MASK = zmq.POLLOUT + +class Hub(poll.Hub): + + + + def __init__(self, clock=time.time): + BaseHub.__init__(self, clock) + self.poll = zmq.Poller() + + def register(self, fileno, new=False): + mask = 0 + if self.listeners[READ].get(fileno): + mask |= READ_MASK + if self.listeners[WRITE].get(fileno): + mask |= WRITE_MASK + if mask: + self.poll.register(fileno, mask) + else: + self.poll.unregister(fileno) + + + def wait(self, seconds=None): + readers = self.listeners[READ] + writers = self.listeners[WRITE] + + if not readers and not writers: + if seconds: + sleep(seconds) + return + try: + presult = self.do_poll(seconds) + except zmq.ZMQError, e: + # In the poll hub this part exists to special case some exceptions + # from socket. There may be some error numbers that wider use of + # this hub will throw up as needing special treatment so leaving + # this block and this comment as a remineder + raise + SYSTEM_EXCEPTIONS = self.SYSTEM_EXCEPTIONS + + if self.debug_blocking: + self.block_detect_pre() + + for fileno, event in presult: + try: + if event & READ_MASK: + readers.get(fileno, noop).cb(fileno) + if event & WRITE_MASK: + writers.get(fileno, noop).cb(fileno) + if event & EXC_MASK: + # zmq.POLLERR is returned for any error condition in the + # underlying fd (as passed through to poll/epoll) + readers.get(fileno, noop).cb(fileno) + writers.get(fileno, noop).cb(fileno) + except SYSTEM_EXCEPTIONS: + raise + except: + self.squelch_exception(fileno, sys.exc_info()) + clear_sys_exc_info() + + if self.debug_blocking: + self.block_detect_post() + + +# def do_poll(self, seconds): +# print 'poll: ', seconds +# if seconds < 0: +# seconds = 500 +# return self.poll.poll(seconds) \ No newline at end of file diff --git a/examples/zmq_chat.py b/examples/zmq_chat.py new file mode 100644 index 0000000..cd07f80 --- /dev/null +++ b/examples/zmq_chat.py @@ -0,0 +1,64 @@ +import eventlet, sys +from eventlet.green import socket, zmq +from eventlet.hubs import use_hub +use_hub('zeromq') + +ADDR = 'ipc:///tmp/chat' + +ctx = zmq.Context() + +def publish(writer): + + print "connected" + socket = ctx.socket(zmq.SUB) + + socket.setsockopt(zmq.SUBSCRIBE, "") + socket.connect(ADDR) + eventlet.sleep(0.1) + + while True: + msg = socket.recv_pyobj() + str_msg = "%s: %s" % msg + writer.write(str_msg) + writer.flush() + + +PORT=3001 + +def read_chat_forever(reader, pub_socket): + + line = reader.readline() + who = 'someone' + while line: + print "Chat:", line.strip() + if line.startswith('name:'): + who = line.split(':')[-1].strip() + + try: + pub_socket.send_pyobj((who, line)) + except socket.error, e: + # ignore broken pipes, they just mean the participant + # closed its connection already + if e[0] != 32: + raise + line = reader.readline() + print "Participant left chat." + +try: + print "ChatServer starting up on port %s" % PORT + server = eventlet.listen(('0.0.0.0', PORT)) + pub_socket = ctx.socket(zmq.PUB) + pub_socket.bind(ADDR) + eventlet.spawn_n(publish, + sys.stdout) + while True: + new_connection, address = server.accept() + + print "Participant joined chat." + eventlet.spawn_n(publish, + new_connection.makefile('w')) + eventlet.spawn_n(read_chat_forever, + new_connection.makefile('r'), + pub_socket) +except (KeyboardInterrupt, SystemExit): + print "ChatServer exiting." \ No newline at end of file diff --git a/tests/zeromq_test.py b/tests/zeromq_test.py new file mode 100644 index 0000000..439cfaa --- /dev/null +++ b/tests/zeromq_test.py @@ -0,0 +1,49 @@ +from eventlet import spawn, sleep, getcurrent +from eventlet.hubs import use_hub, get_hub +from eventlet.green import zmq +from nose.tools import * +from tests import mock, LimitedTestCase +from eventlet.hubs.hub import READ, WRITE + +class _TestZMQ(LimitedTestCase): + + def setUp(self): + use_hub('zeromq') + + super(_TestZMQ, self).setUp() +# self.timer.cancel() + + def tearDown(self): + super(_TestZMQ, self).tearDown() + use_hub() + +class TestUpstreamDownStream(_TestZMQ): + + def _get_socket_pair(self): + return (zmq.Context().socket(zmq.PAIR), + zmq.Context().socket(zmq.PAIR)) + + + def test_recv_non_blocking(self): + ipc = 'ipc:///tmp/tests' + req, rep = self._get_socket_pair() + req.connect(ipc) + rep.bind(ipc) + sleep(0.2) +# req.send('test') +# set_trace() + hub = get_hub() +# hub.add(READ, rep, getcurrent().switch) + msg = {} + def rx(): + msg['res'] = rep.recv() + spawn(rx) + + req.send('test') + + sleep(0.2) + + + self.assertEqual(msg['res'], 'test') + + From 91920364429bb6138d895d18bd0798574c202668 Mon Sep 17 00:00:00 2001 From: Ben Ford Date: Wed, 22 Sep 2010 09:00:18 +0100 Subject: [PATCH 03/13] Added some logic to zmq so that the context will be unique per thread --- eventlet/green/zmq.py | 9 ++- eventlet/hubs/zeromq.py | 19 ++++- tests/zeromq_test.py | 158 ++++++++++++++++++++++++++++++++-------- 3 files changed, 149 insertions(+), 37 deletions(-) diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py index 99e5f8c..f4d6581 100644 --- a/eventlet/green/zmq.py +++ b/eventlet/green/zmq.py @@ -1,5 +1,7 @@ __zmq__ = __import__('zmq') +from eventlet import patcher from eventlet.hubs import trampoline + __patched__ = ['Context', 'Socket'] globals().update(dict([(var, getattr(__zmq__, var)) for var in __zmq__.__all__ @@ -17,18 +19,17 @@ class Socket(__zmq__.Socket): def _send_message(self, data, flags=0, copy=True): -# flags |= __zmq__.NOBLOCK - print 'send' + flags |= __zmq__.NOBLOCK while True: try: return super(Socket, self)._send_message(data, flags) except __zmq__.ZMQError, e: if e.errno != EAGAIN: raise - trampoline(self, read=True) + trampoline(self, write=True) def _send_copy(self, data, flags=0, copy=True): -# flags |= __zmq__.NOBLOCK + flags |= __zmq__.NOBLOCK while True: try: return super(Socket, self)._send_copy(data, flags) diff --git a/eventlet/hubs/zeromq.py b/eventlet/hubs/zeromq.py index 4a52508..9ce6503 100644 --- a/eventlet/hubs/zeromq.py +++ b/eventlet/hubs/zeromq.py @@ -1,6 +1,6 @@ from eventlet import patcher from eventlet.green import zmq -from eventlet.hubs import poll +from eventlet.hubs import poll, _threadlocal from eventlet.hubs.hub import BaseHub, noop from eventlet.hubs.poll import READ, WRITE from eventlet.support import clear_sys_exc_info @@ -22,6 +22,23 @@ class Hub(poll.Hub): BaseHub.__init__(self, clock) self.poll = zmq.Poller() + def get_context(self): + """zmq's Context must be unique within a hub + + The zeromq API documentation states: + All zmq sockets passed to the zmq_poll() function must share the same + zmq context and must belong to the thread calling zmq_poll() + + As zmq_poll is what's eventually being called then we need to insure + that all sockets that are going to be passed to zmq_poll (via + hub.do_poll) are in the same context + """ + try: + return _threadlocal.context + except AttributeError: + _threadlocal.context = zmq.Context() + return _threadlocal.context + def register(self, fileno, new=False): mask = 0 if self.listeners[READ].get(fileno): diff --git a/tests/zeromq_test.py b/tests/zeromq_test.py index 439cfaa..10308ab 100644 --- a/tests/zeromq_test.py +++ b/tests/zeromq_test.py @@ -1,49 +1,143 @@ -from eventlet import spawn, sleep, getcurrent -from eventlet.hubs import use_hub, get_hub +from eventlet import event, spawn, sleep, patcher +from eventlet.hubs import use_hub, get_hub, _threadlocal +from eventlet.hubs.hub import READ, WRITE from eventlet.green import zmq from nose.tools import * -from tests import mock, LimitedTestCase -from eventlet.hubs.hub import READ, WRITE +from tests import mock, LimitedTestCase, skip_unless +from unittest import TestCase -class _TestZMQ(LimitedTestCase): +from threading import Thread - def setUp(self): - use_hub('zeromq') +def using_zmq(_f): + return 'zeromq' in type(get_hub()).__module__ - super(_TestZMQ, self).setUp() -# self.timer.cancel() +def skip_unless_zmq(func): + """ Decorator that skips a test if we're using the pyevent hub.""" + return skip_unless(using_zmq)(func) - def tearDown(self): - super(_TestZMQ, self).tearDown() - use_hub() +class TestUpstreamDownStream(LimitedTestCase): -class TestUpstreamDownStream(_TestZMQ): + def create_bound_pair(self, type1, type2, interface='tcp://127.0.0.1'): + """Create a bound socket pair using a random port.""" + self.context = context = get_hub().get_context() + s1 = context.socket(type1) + port = s1.bind_to_random_port(interface) + s2 = context.socket(type2) + s2.connect('%s:%s' % (interface, port)) + return s1, s2 - def _get_socket_pair(self): - return (zmq.Context().socket(zmq.PAIR), - zmq.Context().socket(zmq.PAIR)) + def assertRaisesErrno(self, errno, func, *args): + try: + func(*args) + except zmq.ZMQError, e: + self.assertEqual(e.errno, errno, "wrong error raised, expected '%s' \ +got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) + else: + self.fail("Function did not raise any error") - - def test_recv_non_blocking(self): + @skip_unless_zmq + def test_recv_spawned_before_send_is_non_blocking(self): ipc = 'ipc:///tmp/tests' - req, rep = self._get_socket_pair() - req.connect(ipc) - rep.bind(ipc) - sleep(0.2) -# req.send('test') -# set_trace() - hub = get_hub() -# hub.add(READ, rep, getcurrent().switch) - msg = {} + req, rep = self.create_bound_pair(zmq.PAIR, zmq.PAIR, interface='inproc://') +# req.connect(ipc) +# rep.bind(ipc) + sleep() + msg = dict(res=None) + done = event.Event() def rx(): msg['res'] = rep.recv() + done.send('done') spawn(rx) - req.send('test') - - sleep(0.2) - - + done.wait() self.assertEqual(msg['res'], 'test') + @skip_unless_zmq + def test_send_1k_req_rep(self): + self.reset_timeout(2) + req, rep = self.create_bound_pair(zmq.REQ, zmq.REP, interface='inproc://') + sleep() + done = event.Event() + def tx(): + tx_i = 0 + req.send(str(tx_i)) + while req.recv() != 'done': + tx_i += 1 + req.send(str(tx_i)) + def rx(): + while True: + rx_i = rep.recv() + if rx_i == "1000": + rep.send('done') + done.send(0) + break + rep.send('i') + spawn(tx) + spawn(rx) + final_i = done.wait() + self.assertEqual(final_i, 0) + + @skip_unless_zmq + def test_send_1k_up_down(self): + self.reset_timeout(2) + down, up = self.create_bound_pair(zmq.DOWNSTREAM, zmq.UPSTREAM, interface='inproc://') + sleep() + done = event.Event() + def tx(): + tx_i = 0 + while True: + tx_i += 1 + down.send(str(tx_i)) + def rx(): + while True: + rx_i = up.recv() + if rx_i == "1000": + done.send(0) + break + spawn(tx) + spawn(rx) + final_i = done.wait() + self.assertEqual(final_i, 0) + + + + + + +class TestThreadedContextAccess(TestCase): + """zmq's Context must be unique within a hub + + The zeromq API documentation states: + All zmq sockets passed to the zmq_poll() function must share the same zmq + context and must belong to the thread calling zmq_poll() + + As zmq_poll is what's eventually being called then we need to insure that + all sockets that are going to be passed to zmq_poll (via hub.do_poll) are + in the same context + """ + + @skip_unless_zmq + def test_threadlocal_context(self): + hub = get_hub() + context = hub.get_context() + self.assertEqual(context, _threadlocal.context) + next_context = hub.get_context() + self.assertTrue(context is next_context) + + @skip_unless_zmq + def test_different_context_in_different_thread(self): + context = get_hub().get_context() + test_result = [] + def assert_different(ctx): + assert not hasattr(_threadlocal, 'hub') + this_thread_context = get_hub().get_context() + test_result.append(ctx is this_thread_context) + Thread(target=assert_different, args=(context,)).start() + while not len(test_result): + pass + self.assertFalse(test_result[0]) + + + + From 77673057cc368610beb508c6c31ded602c03d05d Mon Sep 17 00:00:00 2001 From: Ben Ford Date: Thu, 23 Sep 2010 07:15:55 +0100 Subject: [PATCH 04/13] Changed send methods to call to sleep in the case of successful send. Without this the client programmer will have to remember to call sleep in a loop. Not sure if this i the best solution or not. --- eventlet/green/zmq.py | 10 +++++++--- tests/zeromq_test.py | 10 ++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py index f4d6581..545ef50 100644 --- a/eventlet/green/zmq.py +++ b/eventlet/green/zmq.py @@ -1,5 +1,5 @@ __zmq__ = __import__('zmq') -from eventlet import patcher +from eventlet import sleep from eventlet.hubs import trampoline __patched__ = ['Context', 'Socket'] @@ -22,7 +22,9 @@ class Socket(__zmq__.Socket): flags |= __zmq__.NOBLOCK while True: try: - return super(Socket, self)._send_message(data, flags) + super(Socket, self)._send_message(data, flags) + sleep() + return except __zmq__.ZMQError, e: if e.errno != EAGAIN: raise @@ -32,7 +34,9 @@ class Socket(__zmq__.Socket): flags |= __zmq__.NOBLOCK while True: try: - return super(Socket, self)._send_copy(data, flags) + super(Socket, self)._send_copy(data, flags) + sleep() + return except __zmq__.ZMQError, e: if e.errno != EAGAIN: raise diff --git a/tests/zeromq_test.py b/tests/zeromq_test.py index 10308ab..34e2c84 100644 --- a/tests/zeromq_test.py +++ b/tests/zeromq_test.py @@ -38,7 +38,7 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) @skip_unless_zmq def test_recv_spawned_before_send_is_non_blocking(self): ipc = 'ipc:///tmp/tests' - req, rep = self.create_bound_pair(zmq.PAIR, zmq.PAIR, interface='inproc://') + req, rep = self.create_bound_pair(zmq.PAIR, zmq.PAIR) # req.connect(ipc) # rep.bind(ipc) sleep() @@ -54,8 +54,7 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) @skip_unless_zmq def test_send_1k_req_rep(self): - self.reset_timeout(2) - req, rep = self.create_bound_pair(zmq.REQ, zmq.REP, interface='inproc://') + req, rep = self.create_bound_pair(zmq.REQ, zmq.REP) sleep() done = event.Event() def tx(): @@ -79,13 +78,12 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) @skip_unless_zmq def test_send_1k_up_down(self): - self.reset_timeout(2) - down, up = self.create_bound_pair(zmq.DOWNSTREAM, zmq.UPSTREAM, interface='inproc://') + down, up = self.create_bound_pair(zmq.DOWNSTREAM, zmq.UPSTREAM) sleep() done = event.Event() def tx(): tx_i = 0 - while True: + while tx_i <= 1000: tx_i += 1 down.send(str(tx_i)) def rx(): From 0d4f4485003fdbcd28c0f968ab68b7f31e08daf4 Mon Sep 17 00:00:00 2001 From: Ben Ford Date: Fri, 24 Sep 2010 07:56:52 +0100 Subject: [PATCH 05/13] Took sleep statements out of send/recv functions. Renamed tests to zmq_tests --- eventlet/green/zmq.py | 2 -- tests/{zeromq_test.py => zmq_test.py} | 16 ++++++++++++++-- 2 files changed, 14 insertions(+), 4 deletions(-) rename tests/{zeromq_test.py => zmq_test.py} (89%) diff --git a/eventlet/green/zmq.py b/eventlet/green/zmq.py index 545ef50..2da0a6b 100644 --- a/eventlet/green/zmq.py +++ b/eventlet/green/zmq.py @@ -23,7 +23,6 @@ class Socket(__zmq__.Socket): while True: try: super(Socket, self)._send_message(data, flags) - sleep() return except __zmq__.ZMQError, e: if e.errno != EAGAIN: @@ -35,7 +34,6 @@ class Socket(__zmq__.Socket): while True: try: super(Socket, self)._send_copy(data, flags) - sleep() return except __zmq__.ZMQError, e: if e.errno != EAGAIN: diff --git a/tests/zeromq_test.py b/tests/zmq_test.py similarity index 89% rename from tests/zeromq_test.py rename to tests/zmq_test.py index 34e2c84..e7fbbe3 100644 --- a/tests/zeromq_test.py +++ b/tests/zmq_test.py @@ -37,7 +37,6 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) @skip_unless_zmq def test_recv_spawned_before_send_is_non_blocking(self): - ipc = 'ipc:///tmp/tests' req, rep = self.create_bound_pair(zmq.PAIR, zmq.PAIR) # req.connect(ipc) # rep.bind(ipc) @@ -52,6 +51,14 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) done.wait() self.assertEqual(msg['res'], 'test') + @skip_unless_zmq + def test_close_socket_raises_enotsup(self): + req, rep = self.create_bound_pair(zmq.PAIR, zmq.PAIR) + rep.close() + req.close() + self.assertRaisesErrno(zmq.ENOTSUP, rep.recv) + self.assertRaisesErrno(zmq.ENOTSUP, req.send, 'test') + @skip_unless_zmq def test_send_1k_req_rep(self): req, rep = self.create_bound_pair(zmq.REQ, zmq.REP) @@ -68,6 +75,7 @@ got '%s'" % (zmq.ZMQError(errno), zmq.ZMQError(e.errno))) rx_i = rep.recv() if rx_i == "1000": rep.send('done') + sleep() done.send(0) break rep.send('i') @@ -128,7 +136,11 @@ class TestThreadedContextAccess(TestCase): test_result = [] def assert_different(ctx): assert not hasattr(_threadlocal, 'hub') - this_thread_context = get_hub().get_context() + hub = get_hub() + try: + this_thread_context = hub.get_context() + except: + test_result.append('fail') test_result.append(ctx is this_thread_context) Thread(target=assert_different, args=(context,)).start() while not len(test_result): From 64c065b3d196eb912f8b47e0e40d73baa4696a78 Mon Sep 17 00:00:00 2001 From: Ben Ford Date: Wed, 29 Sep 2010 07:16:32 +0100 Subject: [PATCH 06/13] Changed existing websocket examples to take a port --- examples/websocket_chat.html | 2 +- examples/websocket_chat.py | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/examples/websocket_chat.html b/examples/websocket_chat.html index 3eb7efc..9237532 100644 --- a/examples/websocket_chat.html +++ b/examples/websocket_chat.html @@ -5,7 +5,7 @@