diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index af7bd89..00e3e1e 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -136,7 +136,7 @@ class BaseHub(object): self.listeners[evtype].pop(fileno, None) # migrate a secondary listener to be the primary listener if fileno in self.secondaries[evtype]: - sec = self.secondaries[evtype].get(fileno, ()) + sec = self.secondaries[evtype].get(fileno, None) if not sec: return self.listeners[evtype][fileno] = sec.pop(0) @@ -146,10 +146,16 @@ class BaseHub(object): def remove_descriptor(self, fileno): """ Completely remove all listeners for this fileno. For internal use only.""" - self.listeners[READ].pop(fileno, None) - self.listeners[WRITE].pop(fileno, None) - self.secondaries[READ].pop(fileno, None) - self.secondaries[WRITE].pop(fileno, None) + listeners = [] + listeners.append(self.listeners[READ].pop(fileno, noop)) + listeners.append(self.listeners[WRITE].pop(fileno, noop)) + listeners.extend(self.secondaries[READ].pop(fileno, ())) + listeners.extend(self.secondaries[WRITE].pop(fileno, ())) + for listener in listeners: + try: + listener.cb(fileno) + except Exception, e: + self.squelch_generic_exception(sys.exc_info()) def switch(self): cur = greenlet.getcurrent() @@ -160,7 +166,6 @@ class BaseHub(object): switch_out() except: self.squelch_generic_exception(sys.exc_info()) - clear_sys_exc_info() if self.greenlet.dead: self.greenlet = greenlet.greenlet(self.run) try: @@ -252,11 +257,13 @@ class BaseHub(object): if self.debug_exceptions: traceback.print_exception(*exc_info) sys.stderr.flush() + clear_sys_exc_info() def squelch_timer_exception(self, timer, exc_info): if self.debug_exceptions: traceback.print_exception(*exc_info) sys.stderr.flush() + clear_sys_exc_info() def add_timer(self, timer): scheduled_time = self.clock() + timer.seconds diff --git a/eventlet/hubs/selects.py b/eventlet/hubs/selects.py index 7a0e2ed..b7d5540 100644 --- a/eventlet/hubs/selects.py +++ b/eventlet/hubs/selects.py @@ -21,7 +21,7 @@ class Hub(BaseHub): try: select.select([fd], [], [], 0) except select.error, e: - if get_errno(e) == errno.EBADF: + if get_errno(e) in BAD_SOCK: self.remove_descriptor(fd) def wait(self, seconds=None): diff --git a/tests/greenio_test.py b/tests/greenio_test.py index 02b9d8e..5a90c8c 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -1,5 +1,5 @@ import socket as _orig_sock -from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b +from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b, skip_if from eventlet import event from eventlet import greenio from eventlet import debug @@ -31,6 +31,15 @@ def min_buf_size(): test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) + +def using_epoll_hub(_f): + from eventlet.hubs import get_hub + try: + return 'epolls' in type(get_hub()).__module__ + except Exception: + return False + + class TestGreenSocket(LimitedTestCase): def assertWriteToClosedFileRaises(self, fd): if sys.version_info[0]<3: @@ -472,7 +481,44 @@ class TestGreenSocket(LimitedTestCase): s.sendall('b') a.wait() - + @skip_with_pyevent + @skip_if(using_epoll_hub) + def test_closure(self): + def spam_to_me(address): + sock = eventlet.connect(address) + while True: + try: + sock.sendall('hello world') + except socket.error, e: + if e.errno == errno.EPIPE: + return + raise + + server = eventlet.listen(('127.0.0.1', 0)) + sender = eventlet.spawn(spam_to_me, server.getsockname()) + client, address = server.accept() + server.close() + + def reader(): + try: + while True: + data = client.recv(1024) + self.assert_(data) + except socket.error, e: + # we get an EBADF because client is closed in the same process + # (but a different greenthread) + if e.errno != errno.EBADF: + raise + + def closer(): + client.close() + + reader = eventlet.spawn(reader) + eventlet.spawn_n(closer) + reader.wait() + sender.wait() + + class TestGreenPipe(LimitedTestCase): def setUp(self): super(self.__class__, self).setUp()