diff --git a/NEWS b/NEWS index 71df493..8b14602 100644 --- a/NEWS +++ b/NEWS @@ -5,7 +5,6 @@ * patcher.monkey_patch() method replacing util.wrap_* * monkeypatch threading support * removed api.named -* move api_tests into greenthread_tests and hub_tests, or wherever's appropriate * imported timeout module from gevent, replace exc_after and with_timeout() * replace call_after with spawn_after; this is so that users don't see the Timer class * added cancel() method to GreenThread to support the semantic of "abort if not already in the middle of something" diff --git a/benchmarks/context.py b/benchmarks/context.py new file mode 100644 index 0000000..c237528 --- /dev/null +++ b/benchmarks/context.py @@ -0,0 +1,92 @@ +"""Test context switching performance of threading and eventlet""" + +import threading +import time + +import eventlet +from eventlet import hubs +from eventlet.hubs import pyevent, epolls, poll, selects + + +CONTEXT_SWITCHES = 100000 + +def run(event, wait_event): + counter = 0 + while counter <= CONTEXT_SWITCHES: + wait_event.wait() + wait_event.reset() + counter += 1 + event.send() + +def test_eventlet(): + event1 = eventlet.event.Event() + event2 = eventlet.event.Event() + event1.send() + thread1 = eventlet.spawn(run, event1, event2) + thread2 = eventlet.spawn(run, event2, event1) + + thread1.wait() + thread2.wait() + +class BenchThread(threading.Thread): + def __init__(self, event, wait_event): + threading.Thread.__init__(self) + self.counter = 0 + self.event = event + self.wait_event = wait_event + + def run(self): + while self.counter <= CONTEXT_SWITCHES: + self.wait_event.wait() + self.wait_event.clear() + self.counter += 1 + self.event.set() + +def test_thread(): + + event1 = threading.Event() + event2 = threading.Event() + event1.set() + thread1 = BenchThread(event1, event2) + thread2 = BenchThread(event2, event1) + thread1.start() + thread2.start() + thread1.join() + thread2.join() + +print "Testing with %d context switches" % CONTEXT_SWITCHES +start = time.time() +test_thread() +print "threading: %.02f seconds" % (time.time() - start) + +try: + hubs.use_hub(pyevent) + start = time.time() + test_eventlet() + print "pyevent: %.02f seconds" % (time.time() - start) +except: + print "pyevent hub unavailable" + +try: + hubs.use_hub(epolls) + start = time.time() + test_eventlet() + print "epoll: %.02f seconds" % (time.time() - start) +except: + print "epoll hub unavailable" + +try: + hubs.use_hub(poll) + start = time.time() + test_eventlet() + print "poll: %.02f seconds" % (time.time() - start) +except: + print "poll hub unavailable" + +try: + hubs.use_hub(selects) + start = time.time() + test_eventlet() + print "select: %.02f seconds" % (time.time() - start) +except: + print "select hub unavailable" diff --git a/eventlet/greenio.py b/eventlet/greenio.py index b95ab39..1d305ea 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -1,3 +1,4 @@ +import eventlet from eventlet.hubs import trampoline from eventlet.hubs import get_hub @@ -151,7 +152,7 @@ class GreenSocket(object): set_nonblocking(client) return type(self)(client), addr trampoline(fd, read=True, timeout=self.gettimeout(), - timeout_exc=socket.timeout) + timeout_exc=socket.timeout("timed out")) def bind(self, *args, **kw): fn = self.bind = self.fd.bind @@ -170,15 +171,17 @@ class GreenSocket(object): fd = self.fd if self.gettimeout() is None: while not socket_connect(fd, address): - trampoline(fd, write=True, timeout_exc=socket.timeout) + trampoline(fd, write=True, + timeout_exc=socket.timeout("timed out")) else: end = time.time() + self.gettimeout() while True: if socket_connect(fd, address): return if time.time() >= end: - raise socket.timeout - trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout) + raise socket.timeout("timed out") + trampoline(fd, write=True, timeout=end-time.time(), + timeout_exc=socket.timeout("timed out")) def connect_ex(self, address): if self.act_non_blocking: @@ -187,7 +190,8 @@ class GreenSocket(object): if self.gettimeout() is None: while not socket_connect(fd, address): try: - trampoline(fd, write=True, timeout_exc=socket.timeout) + trampoline(fd, write=True, + timeout_exc=socket.timeout(errno.EAGAIN)) except socket.error, ex: return ex[0] else: @@ -196,9 +200,10 @@ class GreenSocket(object): if socket_connect(fd, address): return 0 if time.time() >= end: - raise socket.timeout + raise socket.timeout(errno.EAGAIN) try: - trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout) + trampoline(fd, write=True, timeout=end-time.time(), + timeout_exc=socket.timeout(errno.EAGAIN)) except socket.error, ex: return ex[0] @@ -254,21 +259,24 @@ class GreenSocket(object): trampoline(fd, read=True, timeout=self.timeout, - timeout_exc=socket.timeout) + timeout_exc=socket.timeout("timed out")) def recvfrom(self, *args): if not self.act_non_blocking: - trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout) + trampoline(self.fd, read=True, timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out")) return self.fd.recvfrom(*args) def recvfrom_into(self, *args): if not self.act_non_blocking: - trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout) + trampoline(self.fd, read=True, timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out")) return self.fd.recvfrom_into(*args) def recv_into(self, *args): if not self.act_non_blocking: - trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout) + trampoline(self.fd, read=True, timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out")) return self.fd.recv_into(*args) def send(self, data, flags=0): @@ -276,6 +284,8 @@ class GreenSocket(object): if self.act_non_blocking: return fd.send(data, flags) try: + trampoline(self.fd, write=True, timeout=self.gettimeout(), + timeout_exc=socket.timeout("timed out")) return fd.send(data, flags) except socket.error, e: if e[0] in SOCKET_BLOCKING: @@ -290,11 +300,11 @@ class GreenSocket(object): trampoline(fd, write=True, timeout=self.timeout, - timeout_exc=socket.timeout) + timeout_exc=socket.timeout("timed out")) tail += self.send(data[tail:], flags) def sendto(self, *args): - trampoline(self.fd, write=True, timeout_exc=socket.timeout) + trampoline(self.fd, write=True, timeout_exc=socket.timeout("timed out")) return self.fd.sendto(*args) def setblocking(self, flag): diff --git a/tests/debug_test.py b/tests/debug_test.py index 40af038..25a11ef 100644 --- a/tests/debug_test.py +++ b/tests/debug_test.py @@ -118,11 +118,15 @@ class TestDebug(LimitedTestCase): eventlet.sleep(0) client.send(' ') eventlet.sleep(0) + # allow the "hurl" greenlet to trigger the KeyError + # not sure why the extra context switch is needed + eventlet.sleep(0) finally: sys.stderr = orig self.assertRaises(KeyError, gt.wait) debug.hub_exceptions(False) - self.assert_('Traceback' in fake.getvalue(), + # look for the KeyError exception in the traceback + self.assert_('KeyError: 1' in fake.getvalue(), "Traceback not in:\n" + fake.getvalue()) if __name__ == "__main__": diff --git a/tests/greenio_test.py b/tests/greenio_test.py index 2cbaad4..0aa8823 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -8,6 +8,7 @@ import errno import eventlet import os import sys +import array def bufsized(sock, size=1): """ Resize both send and receive buffers on a socket. @@ -32,12 +33,190 @@ class TestGreenIo(LimitedTestCase): s.settimeout(0.1) gs = greenio.GreenSocket(s) try: - self.assertRaises(socket.timeout, gs.connect, ('192.0.2.1', 80)) + gs.connect(('192.0.2.1', 80)) + self.fail("socket.timeout not raised") + except socket.timeout, e: + self.assert_(hasattr(e, 'args')) + self.assertEqual(e.args[0], 'timed out') except socket.error, e: # unreachable is also a valid outcome if e[0] != errno.EHOSTUNREACH: raise + def test_accept_timeout(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(('', 0)) + s.listen(50) + + s.settimeout(0.1) + gs = greenio.GreenSocket(s) + try: + gs.accept() + self.fail("socket.timeout not raised") + except socket.timeout, e: + self.assert_(hasattr(e, 'args')) + self.assertEqual(e.args[0], 'timed out') + + def test_connect_ex_timeout(self): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(0.1) + gs = greenio.GreenSocket(s) + e = gs.connect_ex(('192.0.2.1', 80)) + self.assertEquals(e, errno.EAGAIN) + + def test_recv_timeout(self): + listener = greenio.GreenSocket(socket.socket()) + listener.bind(('', 0)) + listener.listen(50) + + def server(): + # accept the connection in another greenlet + sock, addr = listener.accept() + + eventlet.sleep(.2) + + gt = eventlet.spawn(server) + + addr = listener.getsockname() + + client = greenio.GreenSocket(socket.socket()) + client.settimeout(0.1) + + client.connect(addr) + + try: + r = client.recv(8192) + self.fail("socket.timeout not raised") + except socket.timeout, e: + self.assert_(hasattr(e, 'args')) + self.assertEqual(e.args[0], 'timed out') + + gt.wait() + + def test_recvfrom_timeout(self): + gs = greenio.GreenSocket( + socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) + gs.settimeout(.1) + gs.bind(('', 0)) + + try: + gs.recvfrom(8192) + self.fail("socket.timeout not raised") + except socket.timeout, e: + self.assert_(hasattr(e, 'args')) + self.assertEqual(e.args[0], 'timed out') + + def test_recvfrom_into_timeout(self): + buf = buffer(array.array('B')) + + gs = greenio.GreenSocket( + socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) + gs.settimeout(.1) + gs.bind(('', 0)) + + try: + gs.recvfrom_into(buf) + self.fail("socket.timeout not raised") + except socket.timeout, e: + self.assert_(hasattr(e, 'args')) + self.assertEqual(e.args[0], 'timed out') + + def test_recv_into_timeout(self): + buf = buffer(array.array('B')) + + listener = greenio.GreenSocket(socket.socket()) + listener.bind(('', 0)) + listener.listen(50) + + def server(): + # accept the connection in another greenlet + sock, addr = listener.accept() + + eventlet.sleep(.2) + + gt = eventlet.spawn(server) + + addr = listener.getsockname() + + client = greenio.GreenSocket(socket.socket()) + client.settimeout(0.1) + + client.connect(addr) + + try: + r = client.recv_into(buf) + self.fail("socket.timeout not raised") + except socket.timeout, e: + self.assert_(hasattr(e, 'args')) + self.assertEqual(e.args[0], 'timed out') + + gt.wait() + + def test_send_timeout(self): + listener = greenio.GreenSocket(socket.socket()) + listener.bind(('', 0)) + listener.listen(50) + + def server(): + # accept the connection in another greenlet + sock, addr = listener.accept() + + eventlet.sleep(.5) + + gt = eventlet.spawn(server) + + addr = listener.getsockname() + + client = greenio.GreenSocket(socket.socket()) + client.settimeout(0.1) + + client.connect(addr) + + try: + msg = "A"*(8*1024*1024) + + # want to exceed the size of the OS buffer so it'll block + for x in range(10): + client.send(msg) + self.fail("socket.timeout not raised") + except socket.timeout, e: + self.assert_(hasattr(e, 'args')) + self.assertEqual(e.args[0], 'timed out') + + gt.wait() + + def test_sendall_timeout(self): + listener = greenio.GreenSocket(socket.socket()) + listener.bind(('', 0)) + listener.listen(50) + + def server(): + # accept the connection in another greenlet + sock, addr = listener.accept() + + eventlet.sleep(.5) + + gt = eventlet.spawn(server) + + addr = listener.getsockname() + + client = greenio.GreenSocket(socket.socket()) + client.settimeout(0.1) + + client.connect(addr) + + try: + msg = "A"*(8*1024*1024) + + # want to exceed the size of the OS buffer so it'll block + client.sendall(msg) + self.fail("socket.timeout not raised") + except socket.timeout, e: + self.assert_(hasattr(e, 'args')) + self.assertEqual(e.args[0], 'timed out') + + gt.wait() + def test_close_with_makefile(self): def accept_close_early(listener): # verify that the makefile and the socket are truly independent diff --git a/tests/test__greenness.py b/tests/test__greenness.py index 714bc62..c737b0b 100644 --- a/tests/test__greenness.py +++ b/tests/test__greenness.py @@ -14,8 +14,10 @@ def start_http_server(): #print "Serving HTTP on", sa[0], "port", sa[1], "..." httpd.request_count = 0 def serve(): - httpd.handle_request() + # increment the request_count before handling the request because + # the send() for the response blocks (or at least appeared to be) httpd.request_count += 1 + httpd.handle_request() return spawn(serve), httpd, sa[1] class TestGreenness(unittest.TestCase):