From 74d0de691a592dcbd4cb1695d2ed4e8592315002 Mon Sep 17 00:00:00 2001 From: Eugene Oden Date: Tue, 23 Feb 2010 16:38:59 -0500 Subject: [PATCH 1/5] changed the BaseHub timer list management to use heapq instead of bisect better overall performance (especially for many outstanding timers) new benchmark --- benchmarks/hub_timers.py | 41 ++++++++++++++++++++++++++++++++++++++++ eventlet/hubs/hub.py | 24 ++++++++++++++++------- 2 files changed, 58 insertions(+), 7 deletions(-) create mode 100644 benchmarks/hub_timers.py diff --git a/benchmarks/hub_timers.py b/benchmarks/hub_timers.py new file mode 100644 index 0000000..1df7386 --- /dev/null +++ b/benchmarks/hub_timers.py @@ -0,0 +1,41 @@ +#! /usr/bin/env python + +# test timer adds & expires on hubs.hub.BaseHub + +import sys +import eventlet +import random +import time +from eventlet.hubs import timer, get_hub + +timer_count = 100000 + +if len(sys.argv) >= 2: + timer_count = int(sys.argv[1]) + +l = [] + +def work(n): + l.append(n) + +timeouts = [random.uniform(0, 10) for x in xrange(timer_count)] + +hub = get_hub() + +start = time.time() + +scheduled = [] + +for timeout in timeouts: + t = timer.Timer(timeout, work, timeout) + t.schedule() + + scheduled.append(t) + +hub.prepare_timers() +hub.fire_timers(time.time()+11) +hub.prepare_timers() + +end = time.time() + +print "Duration: %f" % (end-start,) diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index c5e885e..90a1ad7 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -1,4 +1,4 @@ -import bisect +import heapq import sys import traceback @@ -196,10 +196,10 @@ class BaseHub(object): self.timer_finished(timer) def prepare_timers(self): - ins = bisect.insort_right + heappush = heapq.heappush t = self.timers for item in self.next_timers: - ins(t, item) + heappush(t, item) del self.next_timers[:] def schedule_call_local(self, seconds, cb, *args, **kw): @@ -229,10 +229,21 @@ class BaseHub(object): def fire_timers(self, when): t = self.timers - last = bisect.bisect_right(t, (when, 1)) + heappop = heapq.heappop + i = 0 - for i in xrange(last): - timer = t[i][2] + + while t: + next = t[0] + + exp = next[0] + timer = next[2] + + if when < exp: + break + + heappop(t) + try: try: timer() @@ -242,7 +253,6 @@ class BaseHub(object): self.squelch_timer_exception(timer, sys.exc_info()) finally: self.timer_finished(timer) - del t[:last] # for debugging: From 495939982bfebf856332f36c0d8a04ce89d4b889 Mon Sep 17 00:00:00 2001 From: Eugene Oden Date: Wed, 24 Feb 2010 10:55:20 -0500 Subject: [PATCH 2/5] working on blocking send, which still isn't quite right reopens #38 --- eventlet/greenio.py | 32 ++++++++++++++++++++++---------- tests/greenio_test.py | 4 +++- 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 1d305ea..ea970ac 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -283,24 +283,36 @@ class GreenSocket(object): fd = self.fd if self.act_non_blocking: return fd.send(data, flags) - try: + + # XXX: need to deal with the exceptions that could be raised if the + # buffer is full (specifically the try/except below) + + # need to test all of the conditions + + # blocking socket behavior - sends all, blocks if the buffer is full + total_sent = 0 + len_data = len(data) + + while 1: + try: + total_sent += fd.send(data[total_sent:], flags) + except socket.error, e: + if e[0] not in SOCKET_BLOCKING: + raise + + if total_sent == len_data: + break + trampoline(self.fd, write=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) - return fd.send(data, flags) - except socket.error, e: - if e[0] in SOCKET_BLOCKING: - return 0 - raise + + return total_sent def sendall(self, data, flags=0): fd = self.fd tail = self.send(data, flags) len_data = len(data) while tail < len_data: - trampoline(fd, - write=True, - timeout=self.timeout, - timeout_exc=socket.timeout("timed out")) tail += self.send(data[tail:], flags) def sendto(self, *args): diff --git a/tests/greenio_test.py b/tests/greenio_test.py index 0aa8823..7f5354c 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -160,6 +160,7 @@ class TestGreenIo(LimitedTestCase): def server(): # accept the connection in another greenlet sock, addr = listener.accept() + bufsized(sock, 1) eventlet.sleep(.5) @@ -171,9 +172,10 @@ class TestGreenIo(LimitedTestCase): client.settimeout(0.1) client.connect(addr) + bufsized(client, 1) try: - msg = "A"*(8*1024*1024) + msg = "A"*min_buf_size() # want to exceed the size of the OS buffer so it'll block for x in range(10): From 9496f6b97491ae1041de964e6c50b2fe10b9e574 Mon Sep 17 00:00:00 2001 From: Eugene Oden Date: Wed, 24 Feb 2010 14:06:37 -0500 Subject: [PATCH 3/5] fixes #38. send() and sendall() seem to be good now (i think). --- eventlet/greenio.py | 5 ----- tests/greenio_test.py | 7 +++---- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/eventlet/greenio.py b/eventlet/greenio.py index b8de16f..da26dc6 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -282,11 +282,6 @@ class GreenSocket(object): if self.act_non_blocking: return fd.send(data, flags) - # XXX: need to deal with the exceptions that could be raised if the - # buffer is full (specifically the try/except below) - - # need to test all of the conditions - # blocking socket behavior - sends all, blocks if the buffer is full total_sent = 0 len_data = len(data) diff --git a/tests/greenio_test.py b/tests/greenio_test.py index 7f5354c..f71e40c 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -160,7 +160,6 @@ class TestGreenIo(LimitedTestCase): def server(): # accept the connection in another greenlet sock, addr = listener.accept() - bufsized(sock, 1) eventlet.sleep(.5) @@ -172,14 +171,14 @@ class TestGreenIo(LimitedTestCase): client.settimeout(0.1) client.connect(addr) - bufsized(client, 1) try: - msg = "A"*min_buf_size() + msg = "A"*8192*1024 + total_sent = 0 # want to exceed the size of the OS buffer so it'll block for x in range(10): - client.send(msg) + total_sent += client.send(msg) self.fail("socket.timeout not raised") except socket.timeout, e: self.assert_(hasattr(e, 'args')) From ee71f87f7407ab8f0d455253b56139b7ba5490a1 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 24 Feb 2010 21:51:07 -0500 Subject: [PATCH 4/5] Tweaked the send_timeout test a little, mostly just so I understand it. Clearing exc_info right after squelching exceptions in hubs. --- eventlet/hubs/hub.py | 2 ++ tests/greenio_test.py | 18 ++++++++---------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index 90a1ad7..c1d2eb3 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -102,6 +102,7 @@ class BaseHub(object): switch_out() except: self.squelch_generic_exception(sys.exc_info()) + sys.exc_clear() if self.greenlet.dead: self.greenlet = greenlet.greenlet(self.run) try: @@ -251,6 +252,7 @@ class BaseHub(object): raise except: self.squelch_timer_exception(timer, sys.exc_info()) + sys.exc_clear() finally: self.timer_finished(timer) diff --git a/tests/greenio_test.py b/tests/greenio_test.py index f71e40c..174bdb9 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -153,30 +153,27 @@ class TestGreenIo(LimitedTestCase): gt.wait() def test_send_timeout(self): - listener = greenio.GreenSocket(socket.socket()) - listener.bind(('', 0)) - listener.listen(50) + listener = bufsized(eventlet.listen(('', 0))) def server(): # accept the connection in another greenlet sock, addr = listener.accept() - + sock = bufsized(sock) eventlet.sleep(.5) gt = eventlet.spawn(server) addr = listener.getsockname() - client = greenio.GreenSocket(socket.socket()) - client.settimeout(0.1) - + client = bufsized(greenio.GreenSocket(socket.socket())) client.connect(addr) - try: - msg = "A"*8192*1024 + client.settimeout(0.00001) + msg = "A"*(100000) # large enough number to overwhelm most buffers total_sent = 0 - # want to exceed the size of the OS buffer so it'll block + # want to exceed the size of the OS buffer so it'll block in a + # single send for x in range(10): total_sent += client.send(msg) self.fail("socket.timeout not raised") @@ -473,6 +470,7 @@ class TestGreenIo(LimitedTestCase): gt.wait() + class TestGreenIoLong(LimitedTestCase): TEST_TIMEOUT=10 # the test here might take a while depending on the OS @skip_with_pyevent From b2f58bdf978333640a6af74543c84bdc7d3eaa82 Mon Sep 17 00:00:00 2001 From: Ryan Williams Date: Wed, 24 Feb 2010 22:19:57 -0500 Subject: [PATCH 5/5] Removed redundant switch impl in pyevent hub. Added more exc_clears where needed. Slightly optimized switch a bit. --- eventlet/hubs/hub.py | 5 ++--- eventlet/hubs/poll.py | 1 + eventlet/hubs/pyevent.py | 17 ----------------- eventlet/hubs/selects.py | 1 + 4 files changed, 4 insertions(+), 20 deletions(-) diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index c1d2eb3..c2a1145 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -106,9 +106,8 @@ class BaseHub(object): if self.greenlet.dead: self.greenlet = greenlet.greenlet(self.run) try: - current = greenlet.getcurrent() - if self.greenlet.parent is not current: - current.parent = self.greenlet + if self.greenlet.parent is not cur: + cur.parent = self.greenlet except ValueError: pass # gets raised if there is a greenlet parent cycle sys.exc_clear() diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index e6b3c27..512b008 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -108,3 +108,4 @@ class Hub(BaseHub): raise except: self.squelch_exception(fileno, sys.exc_info()) + sys.exc_clear() diff --git a/eventlet/hubs/pyevent.py b/eventlet/hubs/pyevent.py index 1f75093..f9d2496 100644 --- a/eventlet/hubs/pyevent.py +++ b/eventlet/hubs/pyevent.py @@ -45,23 +45,6 @@ class Hub(BaseHub): self.signal_exc_info = None self.signal(2, lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt)) self.events_to_add = [] - - def switch(self): - cur = greenlet.getcurrent() - assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP' - switch_out = getattr(cur, 'switch_out', None) - if switch_out is not None: - try: - switch_out() - except: - traceback.print_exception(*sys.exc_info()) - if self.greenlet.dead: - self.greenlet = greenlet.greenlet(self.run) - try: - greenlet.getcurrent().parent = self.greenlet - except ValueError: - pass - return self.greenlet.switch() def dispatch(self): loop = event.loop diff --git a/eventlet/hubs/selects.py b/eventlet/hubs/selects.py index d20f3cf..8dcc35f 100644 --- a/eventlet/hubs/selects.py +++ b/eventlet/hubs/selects.py @@ -57,3 +57,4 @@ class Hub(BaseHub): raise except: self.squelch_exception(fileno, sys.exc_info()) + sys.exc_clear() \ No newline at end of file