This commit is contained in:
Ryan Williams
2010-02-24 22:21:03 -05:00
7 changed files with 91 additions and 48 deletions

41
benchmarks/hub_timers.py Normal file
View File

@@ -0,0 +1,41 @@
#! /usr/bin/env python
# test timer adds & expires on hubs.hub.BaseHub
import sys
import eventlet
import random
import time
from eventlet.hubs import timer, get_hub
timer_count = 100000
if len(sys.argv) >= 2:
timer_count = int(sys.argv[1])
l = []
def work(n):
l.append(n)
timeouts = [random.uniform(0, 10) for x in xrange(timer_count)]
hub = get_hub()
start = time.time()
scheduled = []
for timeout in timeouts:
t = timer.Timer(timeout, work, timeout)
t.schedule()
scheduled.append(t)
hub.prepare_timers()
hub.fire_timers(time.time()+11)
hub.prepare_timers()
end = time.time()
print "Duration: %f" % (end-start,)

View File

@@ -281,24 +281,31 @@ class GreenSocket(object):
fd = self.fd
if self.act_non_blocking:
return fd.send(data, flags)
try:
# blocking socket behavior - sends all, blocks if the buffer is full
total_sent = 0
len_data = len(data)
while 1:
try:
total_sent += fd.send(data[total_sent:], flags)
except socket.error, e:
if e[0] not in SOCKET_BLOCKING:
raise
if total_sent == len_data:
break
trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return fd.send(data, flags)
except socket.error, e:
if e[0] in SOCKET_BLOCKING:
return 0
raise
return total_sent
def sendall(self, data, flags=0):
fd = self.fd
tail = self.send(data, flags)
len_data = len(data)
while tail < len_data:
trampoline(fd,
write=True,
timeout=self.timeout,
timeout_exc=socket.timeout("timed out"))
tail += self.send(data[tail:], flags)
def sendto(self, *args):

View File

@@ -1,4 +1,4 @@
import bisect
import heapq
import sys
import traceback
@@ -102,12 +102,12 @@ class BaseHub(object):
switch_out()
except:
self.squelch_generic_exception(sys.exc_info())
sys.exc_clear()
if self.greenlet.dead:
self.greenlet = greenlet.greenlet(self.run)
try:
current = greenlet.getcurrent()
if self.greenlet.parent is not current:
current.parent = self.greenlet
if self.greenlet.parent is not cur:
cur.parent = self.greenlet
except ValueError:
pass # gets raised if there is a greenlet parent cycle
sys.exc_clear()
@@ -196,10 +196,10 @@ class BaseHub(object):
self.timer_finished(timer)
def prepare_timers(self):
ins = bisect.insort_right
heappush = heapq.heappush
t = self.timers
for item in self.next_timers:
ins(t, item)
heappush(t, item)
del self.next_timers[:]
def schedule_call_local(self, seconds, cb, *args, **kw):
@@ -229,10 +229,21 @@ class BaseHub(object):
def fire_timers(self, when):
t = self.timers
last = bisect.bisect_right(t, (when, 1))
heappop = heapq.heappop
i = 0
for i in xrange(last):
timer = t[i][2]
while t:
next = t[0]
exp = next[0]
timer = next[2]
if when < exp:
break
heappop(t)
try:
try:
timer()
@@ -240,9 +251,9 @@ class BaseHub(object):
raise
except:
self.squelch_timer_exception(timer, sys.exc_info())
sys.exc_clear()
finally:
self.timer_finished(timer)
del t[:last]
# for debugging:

View File

@@ -108,3 +108,4 @@ class Hub(BaseHub):
raise
except:
self.squelch_exception(fileno, sys.exc_info())
sys.exc_clear()

View File

@@ -45,23 +45,6 @@ class Hub(BaseHub):
self.signal_exc_info = None
self.signal(2, lambda signalnum, frame: self.greenlet.parent.throw(KeyboardInterrupt))
self.events_to_add = []
def switch(self):
cur = greenlet.getcurrent()
assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
switch_out = getattr(cur, 'switch_out', None)
if switch_out is not None:
try:
switch_out()
except:
traceback.print_exception(*sys.exc_info())
if self.greenlet.dead:
self.greenlet = greenlet.greenlet(self.run)
try:
greenlet.getcurrent().parent = self.greenlet
except ValueError:
pass
return self.greenlet.switch()
def dispatch(self):
loop = event.loop

View File

@@ -57,3 +57,4 @@ class Hub(BaseHub):
raise
except:
self.squelch_exception(fileno, sys.exc_info())
sys.exc_clear()

View File

@@ -153,31 +153,29 @@ class TestGreenIo(LimitedTestCase):
gt.wait()
def test_send_timeout(self):
listener = greenio.GreenSocket(socket.socket())
listener.bind(('', 0))
listener.listen(50)
listener = bufsized(eventlet.listen(('', 0)))
def server():
# accept the connection in another greenlet
sock, addr = listener.accept()
sock = bufsized(sock)
eventlet.sleep(.5)
gt = eventlet.spawn(server)
addr = listener.getsockname()
client = greenio.GreenSocket(socket.socket())
client.settimeout(0.1)
client = bufsized(greenio.GreenSocket(socket.socket()))
client.connect(addr)
try:
msg = "A"*(8*1024*1024)
client.settimeout(0.00001)
msg = "A"*(100000) # large enough number to overwhelm most buffers
# want to exceed the size of the OS buffer so it'll block
total_sent = 0
# want to exceed the size of the OS buffer so it'll block in a
# single send
for x in range(10):
client.send(msg)
total_sent += client.send(msg)
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
@@ -472,6 +470,7 @@ class TestGreenIo(LimitedTestCase):
gt.wait()
class TestGreenIoLong(LimitedTestCase):
TEST_TIMEOUT=10 # the test here might take a while depending on the OS
@skip_with_pyevent