changed the BaseHub timer list management to use heapq instead of bisect
better overall performance (especially for many outstanding timers) new benchmark
This commit is contained in:
@@ -1,3 +1,4 @@
|
||||
import eventlet
|
||||
from eventlet.hubs import trampoline
|
||||
from eventlet.hubs import get_hub
|
||||
|
||||
@@ -151,7 +152,7 @@ class GreenSocket(object):
|
||||
set_nonblocking(client)
|
||||
return type(self)(client), addr
|
||||
trampoline(fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout)
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
|
||||
def bind(self, *args, **kw):
|
||||
fn = self.bind = self.fd.bind
|
||||
@@ -170,15 +171,17 @@ class GreenSocket(object):
|
||||
fd = self.fd
|
||||
if self.gettimeout() is None:
|
||||
while not socket_connect(fd, address):
|
||||
trampoline(fd, write=True, timeout_exc=socket.timeout)
|
||||
trampoline(fd, write=True,
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
else:
|
||||
end = time.time() + self.gettimeout()
|
||||
while True:
|
||||
if socket_connect(fd, address):
|
||||
return
|
||||
if time.time() >= end:
|
||||
raise socket.timeout
|
||||
trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout)
|
||||
raise socket.timeout("timed out")
|
||||
trampoline(fd, write=True, timeout=end-time.time(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
|
||||
def connect_ex(self, address):
|
||||
if self.act_non_blocking:
|
||||
@@ -187,7 +190,8 @@ class GreenSocket(object):
|
||||
if self.gettimeout() is None:
|
||||
while not socket_connect(fd, address):
|
||||
try:
|
||||
trampoline(fd, write=True, timeout_exc=socket.timeout)
|
||||
trampoline(fd, write=True,
|
||||
timeout_exc=socket.timeout(errno.EAGAIN))
|
||||
except socket.error, ex:
|
||||
return ex[0]
|
||||
else:
|
||||
@@ -196,9 +200,10 @@ class GreenSocket(object):
|
||||
if socket_connect(fd, address):
|
||||
return 0
|
||||
if time.time() >= end:
|
||||
raise socket.timeout
|
||||
raise socket.timeout(errno.EAGAIN)
|
||||
try:
|
||||
trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout)
|
||||
trampoline(fd, write=True, timeout=end-time.time(),
|
||||
timeout_exc=socket.timeout(errno.EAGAIN))
|
||||
except socket.error, ex:
|
||||
return ex[0]
|
||||
|
||||
@@ -254,21 +259,24 @@ class GreenSocket(object):
|
||||
trampoline(fd,
|
||||
read=True,
|
||||
timeout=self.timeout,
|
||||
timeout_exc=socket.timeout)
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
|
||||
def recvfrom(self, *args):
|
||||
if not self.act_non_blocking:
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
return self.fd.recvfrom(*args)
|
||||
|
||||
def recvfrom_into(self, *args):
|
||||
if not self.act_non_blocking:
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
return self.fd.recvfrom_into(*args)
|
||||
|
||||
def recv_into(self, *args):
|
||||
if not self.act_non_blocking:
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
|
||||
trampoline(self.fd, read=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
return self.fd.recv_into(*args)
|
||||
|
||||
def send(self, data, flags=0):
|
||||
@@ -276,6 +284,8 @@ class GreenSocket(object):
|
||||
if self.act_non_blocking:
|
||||
return fd.send(data, flags)
|
||||
try:
|
||||
trampoline(self.fd, write=True, timeout=self.gettimeout(),
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
return fd.send(data, flags)
|
||||
except socket.error, e:
|
||||
if e[0] in SOCKET_BLOCKING:
|
||||
@@ -290,11 +300,11 @@ class GreenSocket(object):
|
||||
trampoline(fd,
|
||||
write=True,
|
||||
timeout=self.timeout,
|
||||
timeout_exc=socket.timeout)
|
||||
timeout_exc=socket.timeout("timed out"))
|
||||
tail += self.send(data[tail:], flags)
|
||||
|
||||
def sendto(self, *args):
|
||||
trampoline(self.fd, write=True, timeout_exc=socket.timeout)
|
||||
trampoline(self.fd, write=True, timeout_exc=socket.timeout("timed out"))
|
||||
return self.fd.sendto(*args)
|
||||
|
||||
def setblocking(self, flag):
|
||||
|
@@ -118,11 +118,15 @@ class TestDebug(LimitedTestCase):
|
||||
eventlet.sleep(0)
|
||||
client.send(' ')
|
||||
eventlet.sleep(0)
|
||||
# allow the "hurl" greenlet to trigger the KeyError
|
||||
# not sure why the extra context switch is needed
|
||||
eventlet.sleep(0)
|
||||
finally:
|
||||
sys.stderr = orig
|
||||
self.assertRaises(KeyError, gt.wait)
|
||||
debug.hub_exceptions(False)
|
||||
self.assert_('Traceback' in fake.getvalue(),
|
||||
# look for the KeyError exception in the traceback
|
||||
self.assert_('KeyError: 1' in fake.getvalue(),
|
||||
"Traceback not in:\n" + fake.getvalue())
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@@ -8,6 +8,7 @@ import errno
|
||||
import eventlet
|
||||
import os
|
||||
import sys
|
||||
import array
|
||||
|
||||
def bufsized(sock, size=1):
|
||||
""" Resize both send and receive buffers on a socket.
|
||||
@@ -32,12 +33,190 @@ class TestGreenIo(LimitedTestCase):
|
||||
s.settimeout(0.1)
|
||||
gs = greenio.GreenSocket(s)
|
||||
try:
|
||||
self.assertRaises(socket.timeout, gs.connect, ('192.0.2.1', 80))
|
||||
gs.connect(('192.0.2.1', 80))
|
||||
self.fail("socket.timeout not raised")
|
||||
except socket.timeout, e:
|
||||
self.assert_(hasattr(e, 'args'))
|
||||
self.assertEqual(e.args[0], 'timed out')
|
||||
except socket.error, e:
|
||||
# unreachable is also a valid outcome
|
||||
if e[0] != errno.EHOSTUNREACH:
|
||||
raise
|
||||
|
||||
def test_accept_timeout(self):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.bind(('', 0))
|
||||
s.listen(50)
|
||||
|
||||
s.settimeout(0.1)
|
||||
gs = greenio.GreenSocket(s)
|
||||
try:
|
||||
gs.accept()
|
||||
self.fail("socket.timeout not raised")
|
||||
except socket.timeout, e:
|
||||
self.assert_(hasattr(e, 'args'))
|
||||
self.assertEqual(e.args[0], 'timed out')
|
||||
|
||||
def test_connect_ex_timeout(self):
|
||||
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
s.settimeout(0.1)
|
||||
gs = greenio.GreenSocket(s)
|
||||
e = gs.connect_ex(('192.0.2.1', 80))
|
||||
self.assertEquals(e, errno.EAGAIN)
|
||||
|
||||
def test_recv_timeout(self):
|
||||
listener = greenio.GreenSocket(socket.socket())
|
||||
listener.bind(('', 0))
|
||||
listener.listen(50)
|
||||
|
||||
def server():
|
||||
# accept the connection in another greenlet
|
||||
sock, addr = listener.accept()
|
||||
|
||||
eventlet.sleep(.2)
|
||||
|
||||
gt = eventlet.spawn(server)
|
||||
|
||||
addr = listener.getsockname()
|
||||
|
||||
client = greenio.GreenSocket(socket.socket())
|
||||
client.settimeout(0.1)
|
||||
|
||||
client.connect(addr)
|
||||
|
||||
try:
|
||||
r = client.recv(8192)
|
||||
self.fail("socket.timeout not raised")
|
||||
except socket.timeout, e:
|
||||
self.assert_(hasattr(e, 'args'))
|
||||
self.assertEqual(e.args[0], 'timed out')
|
||||
|
||||
gt.wait()
|
||||
|
||||
def test_recvfrom_timeout(self):
|
||||
gs = greenio.GreenSocket(
|
||||
socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
|
||||
gs.settimeout(.1)
|
||||
gs.bind(('', 0))
|
||||
|
||||
try:
|
||||
gs.recvfrom(8192)
|
||||
self.fail("socket.timeout not raised")
|
||||
except socket.timeout, e:
|
||||
self.assert_(hasattr(e, 'args'))
|
||||
self.assertEqual(e.args[0], 'timed out')
|
||||
|
||||
def test_recvfrom_into_timeout(self):
|
||||
buf = buffer(array.array('B'))
|
||||
|
||||
gs = greenio.GreenSocket(
|
||||
socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
|
||||
gs.settimeout(.1)
|
||||
gs.bind(('', 0))
|
||||
|
||||
try:
|
||||
gs.recvfrom_into(buf)
|
||||
self.fail("socket.timeout not raised")
|
||||
except socket.timeout, e:
|
||||
self.assert_(hasattr(e, 'args'))
|
||||
self.assertEqual(e.args[0], 'timed out')
|
||||
|
||||
def test_recv_into_timeout(self):
|
||||
buf = buffer(array.array('B'))
|
||||
|
||||
listener = greenio.GreenSocket(socket.socket())
|
||||
listener.bind(('', 0))
|
||||
listener.listen(50)
|
||||
|
||||
def server():
|
||||
# accept the connection in another greenlet
|
||||
sock, addr = listener.accept()
|
||||
|
||||
eventlet.sleep(.2)
|
||||
|
||||
gt = eventlet.spawn(server)
|
||||
|
||||
addr = listener.getsockname()
|
||||
|
||||
client = greenio.GreenSocket(socket.socket())
|
||||
client.settimeout(0.1)
|
||||
|
||||
client.connect(addr)
|
||||
|
||||
try:
|
||||
r = client.recv_into(buf)
|
||||
self.fail("socket.timeout not raised")
|
||||
except socket.timeout, e:
|
||||
self.assert_(hasattr(e, 'args'))
|
||||
self.assertEqual(e.args[0], 'timed out')
|
||||
|
||||
gt.wait()
|
||||
|
||||
def test_send_timeout(self):
|
||||
listener = greenio.GreenSocket(socket.socket())
|
||||
listener.bind(('', 0))
|
||||
listener.listen(50)
|
||||
|
||||
def server():
|
||||
# accept the connection in another greenlet
|
||||
sock, addr = listener.accept()
|
||||
|
||||
eventlet.sleep(.5)
|
||||
|
||||
gt = eventlet.spawn(server)
|
||||
|
||||
addr = listener.getsockname()
|
||||
|
||||
client = greenio.GreenSocket(socket.socket())
|
||||
client.settimeout(0.1)
|
||||
|
||||
client.connect(addr)
|
||||
|
||||
try:
|
||||
msg = "A"*(8*1024*1024)
|
||||
|
||||
# want to exceed the size of the OS buffer so it'll block
|
||||
for x in range(10):
|
||||
client.send(msg)
|
||||
self.fail("socket.timeout not raised")
|
||||
except socket.timeout, e:
|
||||
self.assert_(hasattr(e, 'args'))
|
||||
self.assertEqual(e.args[0], 'timed out')
|
||||
|
||||
gt.wait()
|
||||
|
||||
def test_sendall_timeout(self):
|
||||
listener = greenio.GreenSocket(socket.socket())
|
||||
listener.bind(('', 0))
|
||||
listener.listen(50)
|
||||
|
||||
def server():
|
||||
# accept the connection in another greenlet
|
||||
sock, addr = listener.accept()
|
||||
|
||||
eventlet.sleep(.5)
|
||||
|
||||
gt = eventlet.spawn(server)
|
||||
|
||||
addr = listener.getsockname()
|
||||
|
||||
client = greenio.GreenSocket(socket.socket())
|
||||
client.settimeout(0.1)
|
||||
|
||||
client.connect(addr)
|
||||
|
||||
try:
|
||||
msg = "A"*(8*1024*1024)
|
||||
|
||||
# want to exceed the size of the OS buffer so it'll block
|
||||
client.sendall(msg)
|
||||
self.fail("socket.timeout not raised")
|
||||
except socket.timeout, e:
|
||||
self.assert_(hasattr(e, 'args'))
|
||||
self.assertEqual(e.args[0], 'timed out')
|
||||
|
||||
gt.wait()
|
||||
|
||||
def test_close_with_makefile(self):
|
||||
def accept_close_early(listener):
|
||||
# verify that the makefile and the socket are truly independent
|
||||
|
@@ -14,8 +14,10 @@ def start_http_server():
|
||||
#print "Serving HTTP on", sa[0], "port", sa[1], "..."
|
||||
httpd.request_count = 0
|
||||
def serve():
|
||||
httpd.handle_request()
|
||||
# increment the request_count before handling the request because
|
||||
# the send() for the response blocks (or at least appeared to be)
|
||||
httpd.request_count += 1
|
||||
httpd.handle_request()
|
||||
return spawn(serve), httpd, sa[1]
|
||||
|
||||
class TestGreenness(unittest.TestCase):
|
||||
|
Reference in New Issue
Block a user