This commit is contained in:
Ryan Williams
2010-02-24 17:02:33 -05:00
4 changed files with 78 additions and 19 deletions

41
benchmarks/hub_timers.py Normal file
View File

@@ -0,0 +1,41 @@
#! /usr/bin/env python
# test timer adds & expires on hubs.hub.BaseHub
import sys
import eventlet
import random
import time
from eventlet.hubs import timer, get_hub
timer_count = 100000
if len(sys.argv) >= 2:
timer_count = int(sys.argv[1])
l = []
def work(n):
l.append(n)
timeouts = [random.uniform(0, 10) for x in xrange(timer_count)]
hub = get_hub()
start = time.time()
scheduled = []
for timeout in timeouts:
t = timer.Timer(timeout, work, timeout)
t.schedule()
scheduled.append(t)
hub.prepare_timers()
hub.fire_timers(time.time()+11)
hub.prepare_timers()
end = time.time()
print "Duration: %f" % (end-start,)

View File

@@ -281,24 +281,31 @@ class GreenSocket(object):
fd = self.fd
if self.act_non_blocking:
return fd.send(data, flags)
try:
# blocking socket behavior - sends all, blocks if the buffer is full
total_sent = 0
len_data = len(data)
while 1:
try:
total_sent += fd.send(data[total_sent:], flags)
except socket.error, e:
if e[0] not in SOCKET_BLOCKING:
raise
if total_sent == len_data:
break
trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return fd.send(data, flags)
except socket.error, e:
if e[0] in SOCKET_BLOCKING:
return 0
raise
return total_sent
def sendall(self, data, flags=0):
fd = self.fd
tail = self.send(data, flags)
len_data = len(data)
while tail < len_data:
trampoline(fd,
write=True,
timeout=self.timeout,
timeout_exc=socket.timeout("timed out"))
tail += self.send(data[tail:], flags)
def sendto(self, *args):

View File

@@ -1,4 +1,4 @@
import bisect
import heapq
import sys
import traceback
@@ -196,10 +196,10 @@ class BaseHub(object):
self.timer_finished(timer)
def prepare_timers(self):
ins = bisect.insort_right
heappush = heapq.heappush
t = self.timers
for item in self.next_timers:
ins(t, item)
heappush(t, item)
del self.next_timers[:]
def schedule_call_local(self, seconds, cb, *args, **kw):
@@ -229,10 +229,21 @@ class BaseHub(object):
def fire_timers(self, when):
t = self.timers
last = bisect.bisect_right(t, (when, 1))
heappop = heapq.heappop
i = 0
for i in xrange(last):
timer = t[i][2]
while t:
next = t[0]
exp = next[0]
timer = next[2]
if when < exp:
break
heappop(t)
try:
try:
timer()
@@ -242,7 +253,6 @@ class BaseHub(object):
self.squelch_timer_exception(timer, sys.exc_info())
finally:
self.timer_finished(timer)
del t[:last]
# for debugging:

View File

@@ -173,11 +173,12 @@ class TestGreenIo(LimitedTestCase):
client.connect(addr)
try:
msg = "A"*(8*1024*1024)
msg = "A"*8192*1024
total_sent = 0
# want to exceed the size of the OS buffer so it'll block
for x in range(10):
client.send(msg)
total_sent += client.send(msg)
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))