This commit is contained in:
Ryan Williams
2010-02-23 17:47:51 -05:00
6 changed files with 303 additions and 17 deletions

1
NEWS
View File

@@ -5,7 +5,6 @@
* patcher.monkey_patch() method replacing util.wrap_*
* monkeypatch threading support
* removed api.named
* move api_tests into greenthread_tests and hub_tests, or wherever's appropriate
* imported timeout module from gevent, replace exc_after and with_timeout()
* replace call_after with spawn_after; this is so that users don't see the Timer class
* added cancel() method to GreenThread to support the semantic of "abort if not already in the middle of something"

92
benchmarks/context.py Normal file
View File

@@ -0,0 +1,92 @@
"""Test context switching performance of threading and eventlet"""
import threading
import time
import eventlet
from eventlet import hubs
from eventlet.hubs import pyevent, epolls, poll, selects
CONTEXT_SWITCHES = 100000
def run(event, wait_event):
counter = 0
while counter <= CONTEXT_SWITCHES:
wait_event.wait()
wait_event.reset()
counter += 1
event.send()
def test_eventlet():
event1 = eventlet.event.Event()
event2 = eventlet.event.Event()
event1.send()
thread1 = eventlet.spawn(run, event1, event2)
thread2 = eventlet.spawn(run, event2, event1)
thread1.wait()
thread2.wait()
class BenchThread(threading.Thread):
def __init__(self, event, wait_event):
threading.Thread.__init__(self)
self.counter = 0
self.event = event
self.wait_event = wait_event
def run(self):
while self.counter <= CONTEXT_SWITCHES:
self.wait_event.wait()
self.wait_event.clear()
self.counter += 1
self.event.set()
def test_thread():
event1 = threading.Event()
event2 = threading.Event()
event1.set()
thread1 = BenchThread(event1, event2)
thread2 = BenchThread(event2, event1)
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print "Testing with %d context switches" % CONTEXT_SWITCHES
start = time.time()
test_thread()
print "threading: %.02f seconds" % (time.time() - start)
try:
hubs.use_hub(pyevent)
start = time.time()
test_eventlet()
print "pyevent: %.02f seconds" % (time.time() - start)
except:
print "pyevent hub unavailable"
try:
hubs.use_hub(epolls)
start = time.time()
test_eventlet()
print "epoll: %.02f seconds" % (time.time() - start)
except:
print "epoll hub unavailable"
try:
hubs.use_hub(poll)
start = time.time()
test_eventlet()
print "poll: %.02f seconds" % (time.time() - start)
except:
print "poll hub unavailable"
try:
hubs.use_hub(selects)
start = time.time()
test_eventlet()
print "select: %.02f seconds" % (time.time() - start)
except:
print "select hub unavailable"

View File

@@ -1,3 +1,4 @@
import eventlet
from eventlet.hubs import trampoline
from eventlet.hubs import get_hub
@@ -151,7 +152,7 @@ class GreenSocket(object):
set_nonblocking(client)
return type(self)(client), addr
trampoline(fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout)
timeout_exc=socket.timeout("timed out"))
def bind(self, *args, **kw):
fn = self.bind = self.fd.bind
@@ -170,15 +171,17 @@ class GreenSocket(object):
fd = self.fd
if self.gettimeout() is None:
while not socket_connect(fd, address):
trampoline(fd, write=True, timeout_exc=socket.timeout)
trampoline(fd, write=True,
timeout_exc=socket.timeout("timed out"))
else:
end = time.time() + self.gettimeout()
while True:
if socket_connect(fd, address):
return
if time.time() >= end:
raise socket.timeout
trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout)
raise socket.timeout("timed out")
trampoline(fd, write=True, timeout=end-time.time(),
timeout_exc=socket.timeout("timed out"))
def connect_ex(self, address):
if self.act_non_blocking:
@@ -187,7 +190,8 @@ class GreenSocket(object):
if self.gettimeout() is None:
while not socket_connect(fd, address):
try:
trampoline(fd, write=True, timeout_exc=socket.timeout)
trampoline(fd, write=True,
timeout_exc=socket.timeout(errno.EAGAIN))
except socket.error, ex:
return ex[0]
else:
@@ -196,9 +200,10 @@ class GreenSocket(object):
if socket_connect(fd, address):
return 0
if time.time() >= end:
raise socket.timeout
raise socket.timeout(errno.EAGAIN)
try:
trampoline(fd, write=True, timeout=end-time.time(), timeout_exc=socket.timeout)
trampoline(fd, write=True, timeout=end-time.time(),
timeout_exc=socket.timeout(errno.EAGAIN))
except socket.error, ex:
return ex[0]
@@ -254,21 +259,24 @@ class GreenSocket(object):
trampoline(fd,
read=True,
timeout=self.timeout,
timeout_exc=socket.timeout)
timeout_exc=socket.timeout("timed out"))
def recvfrom(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recvfrom(*args)
def recvfrom_into(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recvfrom_into(*args)
def recv_into(self, *args):
if not self.act_non_blocking:
trampoline(self.fd, read=True, timeout=self.gettimeout(), timeout_exc=socket.timeout)
trampoline(self.fd, read=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return self.fd.recv_into(*args)
def send(self, data, flags=0):
@@ -276,6 +284,8 @@ class GreenSocket(object):
if self.act_non_blocking:
return fd.send(data, flags)
try:
trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=socket.timeout("timed out"))
return fd.send(data, flags)
except socket.error, e:
if e[0] in SOCKET_BLOCKING:
@@ -290,11 +300,11 @@ class GreenSocket(object):
trampoline(fd,
write=True,
timeout=self.timeout,
timeout_exc=socket.timeout)
timeout_exc=socket.timeout("timed out"))
tail += self.send(data[tail:], flags)
def sendto(self, *args):
trampoline(self.fd, write=True, timeout_exc=socket.timeout)
trampoline(self.fd, write=True, timeout_exc=socket.timeout("timed out"))
return self.fd.sendto(*args)
def setblocking(self, flag):

View File

@@ -118,11 +118,15 @@ class TestDebug(LimitedTestCase):
eventlet.sleep(0)
client.send(' ')
eventlet.sleep(0)
# allow the "hurl" greenlet to trigger the KeyError
# not sure why the extra context switch is needed
eventlet.sleep(0)
finally:
sys.stderr = orig
self.assertRaises(KeyError, gt.wait)
debug.hub_exceptions(False)
self.assert_('Traceback' in fake.getvalue(),
# look for the KeyError exception in the traceback
self.assert_('KeyError: 1' in fake.getvalue(),
"Traceback not in:\n" + fake.getvalue())
if __name__ == "__main__":

View File

@@ -8,6 +8,7 @@ import errno
import eventlet
import os
import sys
import array
def bufsized(sock, size=1):
""" Resize both send and receive buffers on a socket.
@@ -32,12 +33,190 @@ class TestGreenIo(LimitedTestCase):
s.settimeout(0.1)
gs = greenio.GreenSocket(s)
try:
self.assertRaises(socket.timeout, gs.connect, ('192.0.2.1', 80))
gs.connect(('192.0.2.1', 80))
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
self.assertEqual(e.args[0], 'timed out')
except socket.error, e:
# unreachable is also a valid outcome
if e[0] != errno.EHOSTUNREACH:
raise
def test_accept_timeout(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(('', 0))
s.listen(50)
s.settimeout(0.1)
gs = greenio.GreenSocket(s)
try:
gs.accept()
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
self.assertEqual(e.args[0], 'timed out')
def test_connect_ex_timeout(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(0.1)
gs = greenio.GreenSocket(s)
e = gs.connect_ex(('192.0.2.1', 80))
self.assertEquals(e, errno.EAGAIN)
def test_recv_timeout(self):
listener = greenio.GreenSocket(socket.socket())
listener.bind(('', 0))
listener.listen(50)
def server():
# accept the connection in another greenlet
sock, addr = listener.accept()
eventlet.sleep(.2)
gt = eventlet.spawn(server)
addr = listener.getsockname()
client = greenio.GreenSocket(socket.socket())
client.settimeout(0.1)
client.connect(addr)
try:
r = client.recv(8192)
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
self.assertEqual(e.args[0], 'timed out')
gt.wait()
def test_recvfrom_timeout(self):
gs = greenio.GreenSocket(
socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
gs.settimeout(.1)
gs.bind(('', 0))
try:
gs.recvfrom(8192)
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
self.assertEqual(e.args[0], 'timed out')
def test_recvfrom_into_timeout(self):
buf = buffer(array.array('B'))
gs = greenio.GreenSocket(
socket.socket(socket.AF_INET, socket.SOCK_DGRAM))
gs.settimeout(.1)
gs.bind(('', 0))
try:
gs.recvfrom_into(buf)
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
self.assertEqual(e.args[0], 'timed out')
def test_recv_into_timeout(self):
buf = buffer(array.array('B'))
listener = greenio.GreenSocket(socket.socket())
listener.bind(('', 0))
listener.listen(50)
def server():
# accept the connection in another greenlet
sock, addr = listener.accept()
eventlet.sleep(.2)
gt = eventlet.spawn(server)
addr = listener.getsockname()
client = greenio.GreenSocket(socket.socket())
client.settimeout(0.1)
client.connect(addr)
try:
r = client.recv_into(buf)
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
self.assertEqual(e.args[0], 'timed out')
gt.wait()
def test_send_timeout(self):
listener = greenio.GreenSocket(socket.socket())
listener.bind(('', 0))
listener.listen(50)
def server():
# accept the connection in another greenlet
sock, addr = listener.accept()
eventlet.sleep(.5)
gt = eventlet.spawn(server)
addr = listener.getsockname()
client = greenio.GreenSocket(socket.socket())
client.settimeout(0.1)
client.connect(addr)
try:
msg = "A"*(8*1024*1024)
# want to exceed the size of the OS buffer so it'll block
for x in range(10):
client.send(msg)
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
self.assertEqual(e.args[0], 'timed out')
gt.wait()
def test_sendall_timeout(self):
listener = greenio.GreenSocket(socket.socket())
listener.bind(('', 0))
listener.listen(50)
def server():
# accept the connection in another greenlet
sock, addr = listener.accept()
eventlet.sleep(.5)
gt = eventlet.spawn(server)
addr = listener.getsockname()
client = greenio.GreenSocket(socket.socket())
client.settimeout(0.1)
client.connect(addr)
try:
msg = "A"*(8*1024*1024)
# want to exceed the size of the OS buffer so it'll block
client.sendall(msg)
self.fail("socket.timeout not raised")
except socket.timeout, e:
self.assert_(hasattr(e, 'args'))
self.assertEqual(e.args[0], 'timed out')
gt.wait()
def test_close_with_makefile(self):
def accept_close_early(listener):
# verify that the makefile and the socket are truly independent

View File

@@ -14,8 +14,10 @@ def start_http_server():
#print "Serving HTTP on", sa[0], "port", sa[1], "..."
httpd.request_count = 0
def serve():
httpd.handle_request()
# increment the request_count before handling the request because
# the send() for the response blocks (or at least appeared to be)
httpd.request_count += 1
httpd.handle_request()
return spawn(serve), httpd, sa[1]
class TestGreenness(unittest.TestCase):