Refactored the hubs again, now we support multiple readers/writers on a single socket, not sure if that's useful or not but it's sure better than silently unscheduling the original reader/writer, which was the original behavior.

This commit is contained in:
Ryan Williams
2009-08-13 17:25:28 -07:00
parent 0245630a87
commit 253f2d3f1f
7 changed files with 245 additions and 136 deletions

View File

@@ -150,13 +150,13 @@ def trampoline(fd, read=None, write=None, timeout=None, timeout_exc=TimeoutError
t = hub.schedule_call_global(timeout, current.throw, timeout_exc)
try:
if read:
hub.add_reader(fileno, cb)
listener = hub.add("read", fileno, cb)
if write:
hub.add_writer(fileno, cb)
listener = hub.add("write", fileno, cb)
try:
return hub.switch()
finally:
hub.remove_descriptor(fileno)
hub.remove(listener)
finally:
if t is not None:
t.cancel()
@@ -206,9 +206,9 @@ def select(read_list, write_list, error_list, timeout=None):
try:
for k, v in ds.iteritems():
if v.get('read'):
hub.add_reader(k, on_read)
hub.add('read', k, on_read)
if v.get('write'):
hub.add_writer(k, on_read)
hub.add_writer('write', k, on_write)
descriptors.append(k)
try:
return hub.switch()

View File

@@ -100,7 +100,7 @@ def socket_accept(descriptor):
if e[0] == errno.EWOULDBLOCK:
return None
raise
def socket_send(descriptor, data, flags=0):
try:

View File

@@ -27,6 +27,17 @@ from eventlet.timer import Timer, LocalTimer
_g_debug = True
class FdListener(object):
def __init__(self, evtype, fileno, cb):
self.evtype = evtype
self.fileno = fileno
self.cb = cb
def __call__(self, *args, **kw):
return self.cb(*args, **kw)
def __repr__(self):
return "FdListener(%r, %r, %r)" % (self.evtype, self.fileno, self.cb)
__str__ = __repr__
class BaseHub(object):
""" Base hub class for easing the implementation of subclasses that are
specific to a particular underlying event architecture. """
@@ -34,8 +45,7 @@ class BaseHub(object):
SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
def __init__(self, clock=time.time):
self.readers = {}
self.writers = {}
self.listeners = {'read':{}, 'write':{}}
self.closed_fds = []
self.clock = clock
@@ -53,27 +63,29 @@ class BaseHub(object):
'exit': [],
}
def add_reader(self, fileno, read_cb):
""" Signals an intent to read from a particular file descriptor.
def add(self, evtype, fileno, cb):
""" Signals an intent to or write a particular file descriptor.
The *evtype* argument is either the string 'read' or the string 'write'.
The *fileno* argument is the file number of the file of interest.
The *read_cb* argument is the callback which will be called when the file
is ready for reading.
The *cb* argument is the callback which will be called when the file
is ready for reading/writing.
"""
self.readers[fileno] = read_cb
def add_writer(self, fileno, write_cb):
""" Signals an intent to write to a particular file descriptor.
The *fileno* argument is the file number of the file of interest.
The *write_cb* argument is the callback which will be called when the file
is ready for writing.
"""
self.writers[fileno] = write_cb
listener = FdListener(evtype, fileno, cb)
self.listeners[evtype].setdefault(fileno, []).append(listener)
return listener
def remove(self, listener):
listener_list = self.listeners[listener.evtype].pop(listener.fileno, [])
try:
listener_list.remove(listener)
except ValueError:
pass
if listener_list:
self.listeners[listener.evtype][listener.fileno] = listener_list
def closed(self, fileno):
""" Clean up any references so that we don't try and
do I/O on a closed fd.
@@ -81,8 +93,9 @@ class BaseHub(object):
self.closed_fds.append(fileno)
def remove_descriptor(self, fileno):
self.readers.pop(fileno, None)
self.writers.pop(fileno, None)
""" Completely remove all listeners for this fileno."""
self.listeners['read'].pop(fileno, None)
self.listeners['write'].pop(fileno, None)
def stop(self):
self.abort()
@@ -279,11 +292,16 @@ class BaseHub(object):
# for debugging:
def get_readers(self):
return self.readers
return self.listeners['read']
def get_writers(self):
return self.writers
return self.listeners['write']
def get_timers_count(hub):
return max(len(x) for x in [hub.timers, hub.next_timers])
def describe_listeners(self):
import pprint
return pprint.pformat(self.listeners)

View File

@@ -24,6 +24,7 @@ import traceback
import event
from eventlet import api
from eventlet.hubs.hub import BaseHub, FdListener
class event_wrapper(object):
@@ -50,17 +51,14 @@ class event_wrapper(object):
self.impl = None
class Hub(object):
class Hub(BaseHub):
SYSTEM_EXCEPTIONS = (KeyboardInterrupt, SystemExit)
def __init__(self, clock=time.time):
super(Hub,self).__init__(clock)
event.init()
self.clock = clock
self.readers = {}
self.writers = {}
self.greenlet = api.Greenlet(self.run)
self.signal_exc_info = None
self.signal(2, lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt))
self.events_to_add = []
@@ -123,29 +121,22 @@ class Hub(object):
def abort(self):
self.schedule_call_global(0, self.greenlet.throw, api.GreenletExit)
@property
def running(self):
def _getrunning(self):
return bool(self.greenlet)
def _setrunning(self, value):
pass # exists for compatibility with BaseHub
running = property(_getrunning, _setrunning)
def add_reader(self, fileno, read_cb):
""" Signals an intent to read from a particular file descriptor.
The *fileno* argument is the file number of the file of interest.
The *read_cb* argument is the callback which will be called when the file
is ready for reading.
"""
self.readers[fileno] = event.read(fileno, read_cb, fileno)
def add(self, evtype, fileno, cb):
if evtype == 'read':
evt = event.read(fileno, cb, fileno)
elif evtype == 'write':
evt = event.write(fileno, cb, fileno)
def add_writer(self, fileno, write_cb):
""" Signals an intent to write to a particular file descriptor.
The *fileno* argument is the file number of the file of interest.
The *write_cb* argument is the callback which will be called when the file
is ready for writing.
"""
self.readers[fileno] = event.write(fileno, write_cb, fileno)
listener = FdListener(evtype, fileno, evt)
self.listeners[evtype].setdefault(fileno, []).append(listener)
return listener
def signal(self, signalnum, handler):
def wrapper():
@@ -155,21 +146,22 @@ class Hub(object):
self.signal_exc_info = sys.exc_info()
event.abort()
return event_wrapper(event.signal(signalnum, wrapper))
def remove(self, listener):
super(Hub, self).remove(listener)
listener.cb.delete()
def remove_descriptor(self, fileno):
reader = self.readers.pop(fileno, None)
if reader is not None:
try:
reader.delete()
except:
traceback.print_exc()
writer = self.writers.pop(fileno, None)
if writer is not None:
try:
writer.delete()
except:
traceback.print_exc()
for lcontainer in self.listeners.itervalues():
l_list = lcontainer.pop(fileno, None)
for listener in l_list:
try:
listener.cb.delete()
except SYSTEM_EXCEPTIONS:
raise
except:
traceback.print_exc()
def schedule_call_local(self, seconds, cb, *args, **kwargs):
current = api.getcurrent()
if current is self.greenlet:
@@ -188,10 +180,10 @@ class Hub(object):
return wrapper
def get_readers(self):
return self.readers
return self.listeners['read']
def get_writers(self):
return self.writers
return self.listeners['write']
def _version_info(self):
baseversion = event.__version__

View File

@@ -37,45 +37,32 @@ class Hub(hub.BaseHub):
super(Hub, self).__init__(clock)
self.poll = select.poll()
def add_reader(self, fileno, read_cb):
""" Signals an intent to read from a particular file descriptor.
def add(self, evtype, fileno, cb):
oldlisteners = self.listeners[evtype].get(fileno)
listener = super(Hub, self).add(evtype, fileno, cb)
if not oldlisteners:
# Means we've added a new listener
self.register(fileno)
return listener
def remove(self, listener):
super(Hub, self).remove(listener)
self.register(listener.fileno)
The *fileno* argument is the file number of the file of interest.
The *read_cb* argument is the callback which will be called when the file
is ready for reading.
"""
oldreader = self.readers.get(fileno)
super(Hub, self).add_reader(fileno, read_cb)
if not oldreader:
# Only need to re-register this fileno if the mask changes
mask = self.get_fn_mask(read_cb, self.writers.get(fileno))
self.poll.register(fileno, mask)
def add_writer(self, fileno, write_cb):
""" Signals an intent to write to a particular file descriptor.
The *fileno* argument is the file number of the file of interest.
The *write_cb* argument is the callback which will be called when the file
is ready for writing.
"""
oldwriter = self.writers.get(fileno)
super(Hub, self).add_writer(fileno, write_cb)
if not oldwriter:
# Only need to re-register this fileno if the mask changes
mask = self.get_fn_mask(self.readers.get(fileno), write_cb)
self.poll.register(fileno, mask)
def get_fn_mask(self, read, write):
def register(self, fileno):
mask = 0
if read is not None:
if self.listeners['read'].get(fileno):
mask |= READ_MASK
if write is not None:
if self.listeners['write'].get(fileno):
mask |= WRITE_MASK
return mask
if mask:
self.poll.register(fileno, mask)
else:
try:
self.poll.unregister(fileno)
except KeyError:
pass
def remove_descriptor(self, fileno):
super(Hub, self).remove_descriptor(fileno)
@@ -85,8 +72,8 @@ class Hub(hub.BaseHub):
pass
def wait(self, seconds=None):
readers = self.readers
writers = self.writers
readers = self.listeners['read']
writers = self.listeners['write']
if not readers and not writers:
if seconds:
@@ -102,13 +89,26 @@ class Hub(hub.BaseHub):
for fileno, event in presult:
for dct, mask in ((readers, READ_MASK), (writers, WRITE_MASK)):
cb = dct.get(fileno)
func = None
if cb is not None and event & mask:
func = cb
if func:
if not mask & event:
continue
listeners = dct.get(fileno)
if listeners:
try:
func(fileno)
listeners[0](fileno)
except SYSTEM_EXCEPTIONS:
raise
except:
self.squelch_exception(fileno, sys.exc_info())
for fileno, event in presult:
if not EXC_MASK & event:
continue
if event & select.POLLNVAL:
self.remove_descriptor(fileno)
continue
for listeners in (readers.get(fileno), writers.get(fileno)):
if listeners:
try:
listeners[0](fileno)
except SYSTEM_EXCEPTIONS:
raise
except:

View File

@@ -38,14 +38,15 @@ class Hub(hub.BaseHub):
self.remove_descriptor(fd)
def wait(self, seconds=None):
readers = self.readers
writers = self.writers
readers = self.listeners['read']
writers = self.listeners['write']
if not readers and not writers:
if seconds:
time.sleep(seconds)
return
try:
r, w, ig = select.select(readers.keys(), writers.keys(), [], seconds)
print "waiting", readers, writers
r, w, er = select.select(readers.keys(), writers.keys(), readers.keys() + writers.keys(), seconds)
self.closed_fds = []
except select.error, e:
if e.args[0] == errno.EINTR:
@@ -56,12 +57,19 @@ class Hub(hub.BaseHub):
return
else:
raise
for observed, events in ((readers, r), (writers, w)):
for fileno in er:
for r in readers.get(fileno):
r(fileno)
for w in writers.get(fileno):
w(fileno)
for listeners, events in ((readers, r), (writers, w)):
for fileno in events:
try:
cb = observed.pop(fileno, None)
if cb is not None:
cb(fileno)
l_list = listeners[fileno]
if l_list:
l_list[0](fileno)
except self.SYSTEM_EXCEPTIONS:
raise
except:

View File

@@ -22,7 +22,14 @@ from eventlet import api, util
import os
import socket
# TODO try and reuse unit tests from within Python itself
def bufsized(sock, size=1):
""" Resize both send and receive buffers on a socket.
Useful for testing trampoline. Returns the socket."""
sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, size)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size)
return sock
class TestGreenIo(TestCase):
def test_close_with_makefile(self):
@@ -101,35 +108,119 @@ class TestGreenIo(TestCase):
def test_full_duplex(self):
from eventlet import coros
listener = api.tcp_listener(('127.0.0.1', 0))
large_data = '*' * 10
listener = bufsized(api.tcp_listener(('127.0.0.1', 0)))
def send_large(sock):
# needs to send enough data that trampoline() is called
sock.sendall('*' * 100000)
sock.sendall(large_data)
def read_large(sock):
result = sock.recv(len(large_data))
expected = 'hello world'
while len(result) < len(large_data):
result += sock.recv(len(large_data))
self.assertEquals(result, large_data)
def server():
(client, addr) = listener.accept()
# start reading, then, while reading, start writing.
# the reader should not hang forever
api.spawn(send_large, client)
api.sleep(0) # allow send_large to execute up to the trampoline
result = client.recv(1000)
assert result == 'hello world', result
(sock, addr) = listener.accept()
sock = bufsized(sock)
send_large_coro = coros.execute(send_large, sock)
api.sleep(0)
result = sock.recv(10)
expected = 'hello world'
while len(result) < len(expected):
result += sock.recv(10)
self.assertEquals(result, expected)
send_large_coro.wait()
server_evt = coros.execute(server)
client = api.connect_tcp(('127.0.0.1', listener.getsockname()[1]))
api.spawn(client.makefile().read)
client = bufsized(api.connect_tcp(('127.0.0.1',
listener.getsockname()[1])))
large_evt = coros.execute(read_large, client)
api.sleep(0)
client.send('hello world')
client.close()
client.sendall('hello world')
server_evt.wait()
client.close()
def test_sendall(self):
from eventlet import proc
# test adapted from Brian Brunswick's email
timer = api.exc_after(1, api.TimeoutError)
MANY_BYTES = 1000
SECOND_SEND = 10
def sender(listener):
(sock, addr) = listener.accept()
sock = bufsized(sock)
sock.sendall('x'*MANY_BYTES)
sock.sendall('y'*SECOND_SEND)
sender_coro = proc.spawn(sender, api.tcp_listener(("", 9020)))
client = bufsized(api.connect_tcp(('localhost', 9020)))
total = 0
while total < MANY_BYTES:
data = client.recv(min(MANY_BYTES - total, MANY_BYTES/10))
if data == '':
print "ENDED", data
break
total += len(data)
total2 = 0
while total < SECOND_SEND:
data = client.recv(SECOND_SEND)
if data == '':
print "ENDED2", data
break
total2 += len(data)
sender_coro.wait()
client.close()
timer.cancel()
def test_multiple_readers(self):
# test that we can have multiple coroutines reading
# from the same fd. We make no guarantees about which one gets which
# bytes, but they should both get at least some
from eventlet import proc
def reader(sock, results):
while True:
data = sock.recv(1)
if data == '':
break
results.append(data)
results1 = []
results2 = []
listener = api.tcp_listener(('127.0.0.1', 0))
def server():
(sock, addr) = listener.accept()
sock = bufsized(sock)
try:
c1 = proc.spawn(reader, sock, results1)
c2 = proc.spawn(reader, sock, results2)
c1.wait()
c2.wait()
finally:
api.kill(c1)
api.kill(c2)
server_coro = proc.spawn(server)
client = bufsized(api.connect_tcp(('127.0.0.1',
listener.getsockname()[1])))
client.sendall('*' * 10)
client.close()
server_coro.wait()
listener.close()
self.assert_(len(results1) > 0)
self.assert_(len(results2) > 0)
def test_server(sock, func, *args):
""" Convenience function for writing cheap test servers.
It calls *func* on each incoming connection from *sock*, with the first argument
being a file for the incoming connector.
It calls *func* on each incoming connection from *sock*, with the first
argument being a file for the incoming connector.
"""
def inner_server(connaddr, *args):
conn, addr = connaddr