Fixes #66. Not super-happy that it doesn't work in epoll, but haven't cracked that nut yet.

This commit is contained in:
Ryan Williams
2010-11-20 17:53:38 -08:00
parent 49d5eb53d8
commit 1a8a9fc051
3 changed files with 62 additions and 9 deletions

View File

@@ -136,7 +136,7 @@ class BaseHub(object):
self.listeners[evtype].pop(fileno, None)
# migrate a secondary listener to be the primary listener
if fileno in self.secondaries[evtype]:
sec = self.secondaries[evtype].get(fileno, ())
sec = self.secondaries[evtype].get(fileno, None)
if not sec:
return
self.listeners[evtype][fileno] = sec.pop(0)
@@ -146,10 +146,16 @@ class BaseHub(object):
def remove_descriptor(self, fileno):
""" Completely remove all listeners for this fileno. For internal use
only."""
self.listeners[READ].pop(fileno, None)
self.listeners[WRITE].pop(fileno, None)
self.secondaries[READ].pop(fileno, None)
self.secondaries[WRITE].pop(fileno, None)
listeners = []
listeners.append(self.listeners[READ].pop(fileno, noop))
listeners.append(self.listeners[WRITE].pop(fileno, noop))
listeners.extend(self.secondaries[READ].pop(fileno, ()))
listeners.extend(self.secondaries[WRITE].pop(fileno, ()))
for listener in listeners:
try:
listener.cb(fileno)
except Exception, e:
self.squelch_generic_exception(sys.exc_info())
def switch(self):
cur = greenlet.getcurrent()
@@ -160,7 +166,6 @@ class BaseHub(object):
switch_out()
except:
self.squelch_generic_exception(sys.exc_info())
clear_sys_exc_info()
if self.greenlet.dead:
self.greenlet = greenlet.greenlet(self.run)
try:
@@ -252,11 +257,13 @@ class BaseHub(object):
if self.debug_exceptions:
traceback.print_exception(*exc_info)
sys.stderr.flush()
clear_sys_exc_info()
def squelch_timer_exception(self, timer, exc_info):
if self.debug_exceptions:
traceback.print_exception(*exc_info)
sys.stderr.flush()
clear_sys_exc_info()
def add_timer(self, timer):
scheduled_time = self.clock() + timer.seconds

View File

@@ -21,7 +21,7 @@ class Hub(BaseHub):
try:
select.select([fd], [], [], 0)
except select.error, e:
if get_errno(e) == errno.EBADF:
if get_errno(e) in BAD_SOCK:
self.remove_descriptor(fd)
def wait(self, seconds=None):

View File

@@ -1,5 +1,5 @@
import socket as _orig_sock
from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b
from tests import LimitedTestCase, skip_with_pyevent, main, skipped, s2b, skip_if
from eventlet import event
from eventlet import greenio
from eventlet import debug
@@ -31,6 +31,15 @@ def min_buf_size():
test_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1)
return test_sock.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF)
def using_epoll_hub(_f):
from eventlet.hubs import get_hub
try:
return 'epolls' in type(get_hub()).__module__
except Exception:
return False
class TestGreenSocket(LimitedTestCase):
def assertWriteToClosedFileRaises(self, fd):
if sys.version_info[0]<3:
@@ -472,7 +481,44 @@ class TestGreenSocket(LimitedTestCase):
s.sendall('b')
a.wait()
@skip_with_pyevent
@skip_if(using_epoll_hub)
def test_closure(self):
def spam_to_me(address):
sock = eventlet.connect(address)
while True:
try:
sock.sendall('hello world')
except socket.error, e:
if e.errno == errno.EPIPE:
return
raise
server = eventlet.listen(('127.0.0.1', 0))
sender = eventlet.spawn(spam_to_me, server.getsockname())
client, address = server.accept()
server.close()
def reader():
try:
while True:
data = client.recv(1024)
self.assert_(data)
except socket.error, e:
# we get an EBADF because client is closed in the same process
# (but a different greenthread)
if e.errno != errno.EBADF:
raise
def closer():
client.close()
reader = eventlet.spawn(reader)
eventlet.spawn_n(closer)
reader.wait()
sender.wait()
class TestGreenPipe(LimitedTestCase):
def setUp(self):
super(self.__class__, self).setUp()