diff --git a/benchmarks/hub_timers.py b/benchmarks/hub_timers.py new file mode 100644 index 0000000..1df7386 --- /dev/null +++ b/benchmarks/hub_timers.py @@ -0,0 +1,41 @@ +#! /usr/bin/env python + +# test timer adds & expires on hubs.hub.BaseHub + +import sys +import eventlet +import random +import time +from eventlet.hubs import timer, get_hub + +timer_count = 100000 + +if len(sys.argv) >= 2: + timer_count = int(sys.argv[1]) + +l = [] + +def work(n): + l.append(n) + +timeouts = [random.uniform(0, 10) for x in xrange(timer_count)] + +hub = get_hub() + +start = time.time() + +scheduled = [] + +for timeout in timeouts: + t = timer.Timer(timeout, work, timeout) + t.schedule() + + scheduled.append(t) + +hub.prepare_timers() +hub.fire_timers(time.time()+11) +hub.prepare_timers() + +end = time.time() + +print "Duration: %f" % (end-start,) diff --git a/eventlet/greenio.py b/eventlet/greenio.py index 7214bc6..da26dc6 100644 --- a/eventlet/greenio.py +++ b/eventlet/greenio.py @@ -281,24 +281,31 @@ class GreenSocket(object): fd = self.fd if self.act_non_blocking: return fd.send(data, flags) - try: + + # blocking socket behavior - sends all, blocks if the buffer is full + total_sent = 0 + len_data = len(data) + + while 1: + try: + total_sent += fd.send(data[total_sent:], flags) + except socket.error, e: + if e[0] not in SOCKET_BLOCKING: + raise + + if total_sent == len_data: + break + trampoline(self.fd, write=True, timeout=self.gettimeout(), timeout_exc=socket.timeout("timed out")) - return fd.send(data, flags) - except socket.error, e: - if e[0] in SOCKET_BLOCKING: - return 0 - raise + + return total_sent def sendall(self, data, flags=0): fd = self.fd tail = self.send(data, flags) len_data = len(data) while tail < len_data: - trampoline(fd, - write=True, - timeout=self.timeout, - timeout_exc=socket.timeout("timed out")) tail += self.send(data[tail:], flags) def sendto(self, *args): diff --git a/eventlet/hubs/hub.py b/eventlet/hubs/hub.py index c5e885e..90a1ad7 100644 --- a/eventlet/hubs/hub.py +++ b/eventlet/hubs/hub.py @@ -1,4 +1,4 @@ -import bisect +import heapq import sys import traceback @@ -196,10 +196,10 @@ class BaseHub(object): self.timer_finished(timer) def prepare_timers(self): - ins = bisect.insort_right + heappush = heapq.heappush t = self.timers for item in self.next_timers: - ins(t, item) + heappush(t, item) del self.next_timers[:] def schedule_call_local(self, seconds, cb, *args, **kw): @@ -229,10 +229,21 @@ class BaseHub(object): def fire_timers(self, when): t = self.timers - last = bisect.bisect_right(t, (when, 1)) + heappop = heapq.heappop + i = 0 - for i in xrange(last): - timer = t[i][2] + + while t: + next = t[0] + + exp = next[0] + timer = next[2] + + if when < exp: + break + + heappop(t) + try: try: timer() @@ -242,7 +253,6 @@ class BaseHub(object): self.squelch_timer_exception(timer, sys.exc_info()) finally: self.timer_finished(timer) - del t[:last] # for debugging: diff --git a/tests/greenio_test.py b/tests/greenio_test.py index 0aa8823..f71e40c 100644 --- a/tests/greenio_test.py +++ b/tests/greenio_test.py @@ -173,11 +173,12 @@ class TestGreenIo(LimitedTestCase): client.connect(addr) try: - msg = "A"*(8*1024*1024) + msg = "A"*8192*1024 + total_sent = 0 # want to exceed the size of the OS buffer so it'll block for x in range(10): - client.send(msg) + total_sent += client.send(msg) self.fail("socket.timeout not raised") except socket.timeout, e: self.assert_(hasattr(e, 'args'))