diff --git a/benchmarks/hub_timers.py b/benchmarks/hub_timers.py new file mode 100644 index 0000000..1df7386 --- /dev/null +++ b/benchmarks/hub_timers.py @@ -0,0 +1,41 @@ +#! /usr/bin/env python + +# test timer adds & expires on hubs.hub.BaseHub + +import sys +import eventlet +import random +import time +from eventlet.hubs import timer, get_hub + +timer_count = 100000 + +if len(sys.argv) >= 2: + timer_count = int(sys.argv[1]) + +l = [] + +def work(n): + l.append(n) + +timeouts = [random.uniform(0, 10) for x in xrange(timer_count)] + +hub = get_hub() + +start = time.time() + +scheduled = [] + +for timeout in timeouts: + t = timer.Timer(timeout, work, timeout) + t.schedule() + + scheduled.append(t) + +hub.prepare_timers() +hub.fire_timers(time.time()+11) +hub.prepare_timers() + +end = time.time() + +print "Duration: %f" % (end-start,) diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 7214bc6..da26dc6 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -281,24 +281,31 @@ class GreenSocket(object): fd = self.fd if self.act_non_blocking: return fd.send(data, flags) - try: + + # blocking socket behavior - sends all, blocks if the buffer is full + total_sent = 0 + len_data = len(data) + + while 1: + try: + total_sent += fd.send(data[total_sent:], flags) + except socket.error, e: + if e[0] not in SOCKET_BLOCKING: + raise + + if total_sent == len_data: + break + trampoline(self.fd, write=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) - return fd.send(data, flags) - except socket.error, e: - if e[0] in SOCKET_BLOCKING: - return 0 - raise + + return total_sent def sendall(self, data, flags=0): fd = self.fd tail = self.send(data, flags) len_data = len(data) while tail < len_data: - trampoline(fd, - write=True, - timeout=self.timeout, - timeout_exc=socket.timeout("timed out")) tail += self.send(data[tail:], flags) def sendto(self, *args): diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index c5e885e..c2a1145 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -1,4 +1,4 @@ -import bisect +import heapq import sys import traceback @@ -102,12 +102,12 @@ class BaseHub(object): switch_out() except: self.squelch_generic_exception(sys.exc_info()) + sys.exc_clear() if self.greenlet.dead: self.greenlet = greenlet.greenlet(self.run) try: - current = greenlet.getcurrent() - if self.greenlet.parent is not current: - current.parent = self.greenlet + if self.greenlet.parent is not cur: + cur.parent = self.greenlet except ValueError: pass # gets raised if there is a greenlet parent cycle sys.exc_clear() @@ -196,10 +196,10 @@ class BaseHub(object): self.timer_finished(timer) def prepare_timers(self): - ins = bisect.insort_right + heappush = heapq.heappush t = self.timers for item in self.next_timers: - ins(t, item) + heappush(t, item) del self.next_timers[:] def schedule_call_local(self, seconds, cb, *args, **kw): @@ -229,10 +229,21 @@ class BaseHub(object): def fire_timers(self, when): t = self.timers - last = bisect.bisect_right(t, (when, 1)) + heappop = heapq.heappop + i = 0 - for i in xrange(last): - timer = t[i][2] + + while t: + next = t[0] + + exp = next[0] + timer = next[2] + + if when < exp: + break + + heappop(t) + try: try: timer() @@ -240,9 +251,9 @@ class BaseHub(object): raise except: self.squelch_timer_exception(timer, sys.exc_info()) + sys.exc_clear() finally: self.timer_finished(timer) - del t[:last] # for debugging: diff --git a/eventlet/hubs/poll.py b/eventlet/hubs/poll.py index e6b3c27..512b008 100644 --- a/eventlet/hubs/poll.py +++ b/eventlet/hubs/poll.py @@ -108,3 +108,4 @@ class Hub(BaseHub): raise except: self.squelch_exception(fileno, sys.exc_info()) + sys.exc_clear() diff --git a/eventlet/hubs/pyevent.py b/eventlet/hubs/pyevent.py index 1f75093..f9d2496 100644 --- a/eventlet/hubs/pyevent.py +++ b/eventlet/hubs/pyevent.py @@ -45,23 +45,6 @@ class Hub(BaseHub): self.signal_exc_info = None self.signal(2, lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt)) self.events_to_add = [] - - def switch(self): - cur = greenlet.getcurrent() - assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP' - switch_out = getattr(cur, 'switch_out', None) - if switch_out is not None: - try: - switch_out() - except: - traceback.print_exception(*sys.exc_info()) - if self.greenlet.dead: - self.greenlet = greenlet.greenlet(self.run) - try: - greenlet.getcurrent().parent = self.greenlet - except ValueError: - pass - return self.greenlet.switch() def dispatch(self): loop = event.loop diff --git a/eventlet/hubs/selects.py b/eventlet/hubs/selects.py index d20f3cf..8dcc35f 100644 --- a/eventlet/hubs/selects.py +++ b/eventlet/hubs/selects.py @@ -57,3 +57,4 @@ class Hub(BaseHub): raise except: self.squelch_exception(fileno, sys.exc_info()) + sys.exc_clear() \ No newline at end of file diff --git a/tests/greenio_test.py b/tests/greenio_test.py index 0aa8823..174bdb9 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -153,31 +153,29 @@ class TestGreenIo(LimitedTestCase): gt.wait() def test_send_timeout(self): - listener = greenio.GreenSocket(socket.socket()) - listener.bind(('', 0)) - listener.listen(50) + listener = bufsized(eventlet.listen(('', 0))) def server(): # accept the connection in another greenlet sock, addr = listener.accept() - + sock = bufsized(sock) eventlet.sleep(.5) gt = eventlet.spawn(server) addr = listener.getsockname() - client = greenio.GreenSocket(socket.socket()) - client.settimeout(0.1) - + client = bufsized(greenio.GreenSocket(socket.socket())) client.connect(addr) - try: - msg = "A"*(8*1024*1024) + client.settimeout(0.00001) + msg = "A"*(100000) # large enough number to overwhelm most buffers - # want to exceed the size of the OS buffer so it'll block + total_sent = 0 + # want to exceed the size of the OS buffer so it'll block in a + # single send for x in range(10): - client.send(msg) + total_sent += client.send(msg) self.fail("socket.timeout not raised") except socket.timeout, e: self.assert_(hasattr(e, 'args')) @@ -472,6 +470,7 @@ class TestGreenIo(LimitedTestCase): gt.wait() + class TestGreenIoLong(LimitedTestCase): TEST_TIMEOUT=10 # the test here might take a while depending on the OS @skip_with_pyevent