From 253f2d3f1f33de7750d718e87d01ae132ee9376b Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Thu, 13 Aug 2009 17:25:28 -0700 Subject: [PATCH] Refactored the hubs again, now we support multiple readers/writers on a single socket, not sure if that's useful or not but it's sure better than silently unscheduling the original reader/writer, which was the original behavior. --- eventlet/api.py | 10 +-- eventlet/greenio.py | 2 +- eventlet/hubs/hub.py | 62 ++++++++++++------- eventlet/hubs/libevent.py | 74 ++++++++++------------ eventlet/hubs/poll.py | 86 +++++++++++++------------- eventlet/hubs/selects.py | 22 ++++--- tests/greenio_test.py | 125 ++++++++++++++++++++++++++++++++------ 7 files changed, 245 insertions(+), 136 deletions(-) diff --git a/eventlet/api.py b/eventlet/api.py index 1df88f5..95c1ae8 100644 --- a/eventlet/api.py +++ b/eventlet/api.py @@ -150,13 +150,13 @@ def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError t = hub.schedule_call_global(timeout, current.throw, timeout_exc) try: if read: - hub.add_reader(fileno, cb) + listener = hub.add("read", fileno, cb) if write: - hub.add_writer(fileno, cb) + listener = hub.add("write", fileno, cb) try: return hub.switch() finally: - hub.remove_descriptor(fileno) + hub.remove(listener) finally: if t is not None: t.cancel() @@ -206,9 +206,9 @@ def select(read_list, write_list, error_list, timeout=None): try: for k, v in ds.iteritems(): if v.get('read'): - hub.add_reader(k, on_read) + hub.add('read', k, on_read) if v.get('write'): - hub.add_writer(k, on_read) + hub.add_writer('write', k, on_write) descriptors.append(k) try: return hub.switch() diff --git a/eventlet/greenio.py b/eventlet/greenio.py index ac752bd..8937dd0 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -100,7 +100,7 @@ def socket_accept(descriptor): if e[0] == errno.EWOULDBLOCK: return None raise - + def socket_send(descriptor, data, flags=0): try: diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index c455970..a3d2cc6 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -27,6 +27,17 @@ from eventlet.timer import Timer, LocalTimer _g_debug = True +class FdListener(object): + def __init__(self, evtype, fileno, cb): + self.evtype = evtype + self.fileno = fileno + self.cb = cb + def __call__(self, *args, **kw): + return self.cb(*args, **kw) + def __repr__(self): + return "FdListener(%r, %r, %r)" % (self.evtype, self.fileno, self.cb) + __str__ = __repr__ + class BaseHub(object): """ Base hub class for easing the implementation of subclasses that are specific to a particular underlying event architecture. """ @@ -34,8 +45,7 @@ class BaseHub(object): SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit) def __init__(self, clock=time.time): - self.readers = {} - self.writers = {} + self.listeners = {'read':{}, 'write':{}} self.closed_fds = [] self.clock = clock @@ -53,27 +63,29 @@ class BaseHub(object): 'exit': [], } - def add_reader(self, fileno, read_cb): - """ Signals an intent to read from a particular file descriptor. + def add(self, evtype, fileno, cb): + """ Signals an intent to or write a particular file descriptor. + + The *evtype* argument is either the string 'read' or the string 'write'. The *fileno* argument is the file number of the file of interest. - The *read_cb* argument is the callback which will be called when the file - is ready for reading. + The *cb* argument is the callback which will be called when the file + is ready for reading/writing. """ - self.readers[fileno] = read_cb - - def add_writer(self, fileno, write_cb): - """ Signals an intent to write to a particular file descriptor. - - The *fileno* argument is the file number of the file of interest. - - The *write_cb* argument is the callback which will be called when the file - is ready for writing. - """ - - self.writers[fileno] = write_cb + listener = FdListener(evtype, fileno, cb) + self.listeners[evtype].setdefault(fileno, []).append(listener) + return listener + def remove(self, listener): + listener_list = self.listeners[listener.evtype].pop(listener.fileno, []) + try: + listener_list.remove(listener) + except ValueError: + pass + if listener_list: + self.listeners[listener.evtype][listener.fileno] = listener_list + def closed(self, fileno): """ Clean up any references so that we don't try and do I/O on a closed fd. @@ -81,8 +93,9 @@ class BaseHub(object): self.closed_fds.append(fileno) def remove_descriptor(self, fileno): - self.readers.pop(fileno, None) - self.writers.pop(fileno, None) + """ Completely remove all listeners for this fileno.""" + self.listeners['read'].pop(fileno, None) + self.listeners['write'].pop(fileno, None) def stop(self): self.abort() @@ -279,11 +292,16 @@ class BaseHub(object): # for debugging: def get_readers(self): - return self.readers + return self.listeners['read'] def get_writers(self): - return self.writers + return self.listeners['write'] def get_timers_count(hub): return max(len(x) for x in [hub.timers, hub.next_timers]) + + def describe_listeners(self): + import pprint + return pprint.pformat(self.listeners) + diff --git a/eventlet/hubs/libevent.py b/eventlet/hubs/libevent.py index 560b7c9..bd328dc 100644 --- a/eventlet/hubs/libevent.py +++ b/eventlet/hubs/libevent.py @@ -24,6 +24,7 @@ import traceback import event from eventlet import api +from eventlet.hubs.hub import BaseHub, FdListener class event_wrapper(object): @@ -50,17 +51,14 @@ class event_wrapper(object): self.impl = None -class Hub(object): +class Hub(BaseHub): SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit) def __init__(self, clock=time.time): + super(Hub,self).__init__(clock) event.init() - self.clock = clock - self.readers = {} - self.writers = {} - self.greenlet = api.Greenlet(self.run) self.signal_exc_info = None self.signal(2, lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt)) self.events_to_add = [] @@ -123,29 +121,22 @@ class Hub(object): def abort(self): self.schedule_call_global(0, self.greenlet.throw, api.GreenletExit) - @property - def running(self): + def _getrunning(self): return bool(self.greenlet) + + def _setrunning(self, value): + pass # exists for compatibility with BaseHub + running = property(_getrunning, _setrunning) - def add_reader(self, fileno, read_cb): - """ Signals an intent to read from a particular file descriptor. - - The *fileno* argument is the file number of the file of interest. - - The *read_cb* argument is the callback which will be called when the file - is ready for reading. - """ - self.readers[fileno] = event.read(fileno, read_cb, fileno) + def add(self, evtype, fileno, cb): + if evtype == 'read': + evt = event.read(fileno, cb, fileno) + elif evtype == 'write': + evt = event.write(fileno, cb, fileno) - def add_writer(self, fileno, write_cb): - """ Signals an intent to write to a particular file descriptor. - - The *fileno* argument is the file number of the file of interest. - - The *write_cb* argument is the callback which will be called when the file - is ready for writing. - """ - self.readers[fileno] = event.write(fileno, write_cb, fileno) + listener = FdListener(evtype, fileno, evt) + self.listeners[evtype].setdefault(fileno, []).append(listener) + return listener def signal(self, signalnum, handler): def wrapper(): @@ -155,21 +146,22 @@ class Hub(object): self.signal_exc_info = sys.exc_info() event.abort() return event_wrapper(event.signal(signalnum, wrapper)) - + + def remove(self, listener): + super(Hub, self).remove(listener) + listener.cb.delete() + def remove_descriptor(self, fileno): - reader = self.readers.pop(fileno, None) - if reader is not None: - try: - reader.delete() - except: - traceback.print_exc() - writer = self.writers.pop(fileno, None) - if writer is not None: - try: - writer.delete() - except: - traceback.print_exc() - + for lcontainer in self.listeners.itervalues(): + l_list = lcontainer.pop(fileno, None) + for listener in l_list: + try: + listener.cb.delete() + except SYSTEM_EXCEPTIONS: + raise + except: + traceback.print_exc() + def schedule_call_local(self, seconds, cb, *args, **kwargs): current = api.getcurrent() if current is self.greenlet: @@ -188,10 +180,10 @@ class Hub(object): return wrapper def get_readers(self): - return self.readers + return self.listeners['read'] def get_writers(self): - return self.writers + return self.listeners['write'] def _version_info(self): baseversion = event.__version__ diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index 1c6c279..df333e3 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -37,45 +37,32 @@ class Hub(hub.BaseHub): super(Hub, self).__init__(clock) self.poll = select.poll() - def add_reader(self, fileno, read_cb): - """ Signals an intent to read from a particular file descriptor. + def add(self, evtype, fileno, cb): + oldlisteners = self.listeners[evtype].get(fileno) + + listener = super(Hub, self).add(evtype, fileno, cb) + if not oldlisteners: + # Means we've added a new listener + self.register(fileno) + return listener + + def remove(self, listener): + super(Hub, self).remove(listener) + self.register(listener.fileno) - The *fileno* argument is the file number of the file of interest. - - The *read_cb* argument is the callback which will be called when the file - is ready for reading. - """ - oldreader = self.readers.get(fileno) - super(Hub, self).add_reader(fileno, read_cb) - - if not oldreader: - # Only need to re-register this fileno if the mask changes - mask = self.get_fn_mask(read_cb, self.writers.get(fileno)) - self.poll.register(fileno, mask) - - def add_writer(self, fileno, write_cb): - """ Signals an intent to write to a particular file descriptor. - - The *fileno* argument is the file number of the file of interest. - - The *write_cb* argument is the callback which will be called when the file - is ready for writing. - """ - oldwriter = self.writers.get(fileno) - super(Hub, self).add_writer(fileno, write_cb) - - if not oldwriter: - # Only need to re-register this fileno if the mask changes - mask = self.get_fn_mask(self.readers.get(fileno), write_cb) - self.poll.register(fileno, mask) - - def get_fn_mask(self, read, write): + def register(self, fileno): mask = 0 - if read is not None: + if self.listeners['read'].get(fileno): mask |= READ_MASK - if write is not None: + if self.listeners['write'].get(fileno): mask |= WRITE_MASK - return mask + if mask: + self.poll.register(fileno, mask) + else: + try: + self.poll.unregister(fileno) + except KeyError: + pass def remove_descriptor(self, fileno): super(Hub, self).remove_descriptor(fileno) @@ -85,8 +72,8 @@ class Hub(hub.BaseHub): pass def wait(self, seconds=None): - readers = self.readers - writers = self.writers + readers = self.listeners['read'] + writers = self.listeners['write'] if not readers and not writers: if seconds: @@ -102,13 +89,26 @@ class Hub(hub.BaseHub): for fileno, event in presult: for dct, mask in ((readers, READ_MASK), (writers, WRITE_MASK)): - cb = dct.get(fileno) - func = None - if cb is not None and event & mask: - func = cb - if func: + if not mask & event: + continue + listeners = dct.get(fileno) + if listeners: try: - func(fileno) + listeners[0](fileno) + except SYSTEM_EXCEPTIONS: + raise + except: + self.squelch_exception(fileno, sys.exc_info()) + for fileno, event in presult: + if not EXC_MASK & event: + continue + if event & select.POLLNVAL: + self.remove_descriptor(fileno) + continue + for listeners in (readers.get(fileno), writers.get(fileno)): + if listeners: + try: + listeners[0](fileno) except SYSTEM_EXCEPTIONS: raise except: diff --git a/eventlet/hubs/selects.py b/eventlet/hubs/selects.py index 8e77740..57e328b 100644 --- a/eventlet/hubs/selects.py +++ b/eventlet/hubs/selects.py @@ -38,14 +38,15 @@ class Hub(hub.BaseHub): self.remove_descriptor(fd) def wait(self, seconds=None): - readers = self.readers - writers = self.writers + readers = self.listeners['read'] + writers = self.listeners['write'] if not readers and not writers: if seconds: time.sleep(seconds) return try: - r, w, ig = select.select(readers.keys(), writers.keys(), [], seconds) + print "waiting", readers, writers + r, w, er = select.select(readers.keys(), writers.keys(), readers.keys() + writers.keys(), seconds) self.closed_fds = [] except select.error, e: if e.args[0] == errno.EINTR: @@ -56,12 +57,19 @@ class Hub(hub.BaseHub): return else: raise - for observed, events in ((readers, r), (writers, w)): + + for fileno in er: + for r in readers.get(fileno): + r(fileno) + for w in writers.get(fileno): + w(fileno) + + for listeners, events in ((readers, r), (writers, w)): for fileno in events: try: - cb = observed.pop(fileno, None) - if cb is not None: - cb(fileno) + l_list = listeners[fileno] + if l_list: + l_list[0](fileno) except self.SYSTEM_EXCEPTIONS: raise except: diff --git a/tests/greenio_test.py b/tests/greenio_test.py index bc12c5f..45efcc5 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -22,7 +22,14 @@ from eventlet import api, util import os import socket -# TODO try and reuse unit tests from within Python itself + +def bufsized(sock, size=1): + """ Resize both send and receive buffers on a socket. + Useful for testing trampoline. Returns the socket.""" + sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size) + return sock + class TestGreenIo(TestCase): def test_close_with_makefile(self): @@ -101,35 +108,119 @@ class TestGreenIo(TestCase): def test_full_duplex(self): from eventlet import coros - listener = api.tcp_listener(('127.0.0.1', 0)) + large_data = '*' * 10 + listener = bufsized(api.tcp_listener(('127.0.0.1', 0))) def send_large(sock): - # needs to send enough data that trampoline() is called - sock.sendall('*' * 100000) + sock.sendall(large_data) + + def read_large(sock): + result = sock.recv(len(large_data)) + expected = 'hello world' + while len(result) < len(large_data): + result += sock.recv(len(large_data)) + self.assertEquals(result, large_data) def server(): - (client, addr) = listener.accept() - # start reading, then, while reading, start writing. - # the reader should not hang forever - api.spawn(send_large, client) - api.sleep(0) # allow send_large to execute up to the trampoline - result = client.recv(1000) - assert result == 'hello world', result + (sock, addr) = listener.accept() + sock = bufsized(sock) + send_large_coro = coros.execute(send_large, sock) + api.sleep(0) + result = sock.recv(10) + expected = 'hello world' + while len(result) < len(expected): + result += sock.recv(10) + self.assertEquals(result, expected) + send_large_coro.wait() server_evt = coros.execute(server) - client = api.connect_tcp(('127.0.0.1', listener.getsockname()[1])) - api.spawn(client.makefile().read) + client = bufsized(api.connect_tcp(('127.0.0.1', + listener.getsockname()[1]))) + large_evt = coros.execute(read_large, client) api.sleep(0) - client.send('hello world') - client.close() + client.sendall('hello world') server_evt.wait() + client.close() + + def test_sendall(self): + from eventlet import proc + # test adapted from Brian Brunswick's email + timer = api.exc_after(1, api.TimeoutError) + + MANY_BYTES = 1000 + SECOND_SEND = 10 + def sender(listener): + (sock, addr) = listener.accept() + sock = bufsized(sock) + sock.sendall('x'*MANY_BYTES) + sock.sendall('y'*SECOND_SEND) + + sender_coro = proc.spawn(sender, api.tcp_listener(("", 9020))) + client = bufsized(api.connect_tcp(('localhost', 9020))) + total = 0 + while total < MANY_BYTES: + data = client.recv(min(MANY_BYTES - total, MANY_BYTES/10)) + if data == '': + print "ENDED", data + break + total += len(data) + + total2 = 0 + while total < SECOND_SEND: + data = client.recv(SECOND_SEND) + if data == '': + print "ENDED2", data + break + total2 += len(data) + + sender_coro.wait() + client.close() + timer.cancel() + + def test_multiple_readers(self): + # test that we can have multiple coroutines reading + # from the same fd. We make no guarantees about which one gets which + # bytes, but they should both get at least some + from eventlet import proc + def reader(sock, results): + while True: + data = sock.recv(1) + if data == '': + break + results.append(data) + + results1 = [] + results2 = [] + listener = api.tcp_listener(('127.0.0.1', 0)) + def server(): + (sock, addr) = listener.accept() + sock = bufsized(sock) + try: + c1 = proc.spawn(reader, sock, results1) + c2 = proc.spawn(reader, sock, results2) + c1.wait() + c2.wait() + finally: + api.kill(c1) + api.kill(c2) + + server_coro = proc.spawn(server) + client = bufsized(api.connect_tcp(('127.0.0.1', + listener.getsockname()[1]))) + client.sendall('*' * 10) + client.close() + server_coro.wait() + listener.close() + + self.assert_(len(results1) > 0) + self.assert_(len(results2) > 0) def test_server(sock, func, *args): """ Convenience function for writing cheap test servers. - It calls *func* on each incoming connection from *sock*, with the first argument - being a file for the incoming connector. + It calls *func* on each incoming connection from *sock*, with the first + argument being a file for the incoming connector. """ def inner_server(connaddr, *args): conn, addr = connaddr