This commit is contained in:
Ryan Williams
2010-01-23 20:07:41 -05:00
12 changed files with 297 additions and 60 deletions

24
benchmarks/__init__.py Normal file
View File

@@ -0,0 +1,24 @@
import gc
import timeit
import random
def measure_best(repeat, iters,
common_setup='pass',
common_cleanup='pass',
*funcs):
funcs = list(funcs)
results = dict([(f,[]) for f in funcs])
for i in xrange(repeat):
random.shuffle(funcs)
for func in funcs:
gc.collect()
t = timeit.Timer(func, setup=common_setup)
results[func].append(t.timeit(iters))
common_cleanup()
best_results = {}
for func, times in results.iteritems():
best_results[func] = min(times)
return best_results

View File

@@ -0,0 +1,100 @@
"""Benchmark evaluating eventlet's performance at speaking to itself over a localhost socket."""
import time
import benchmarks
BYTES=1000
SIZE=1
CONCURRENCY=50
def reader(sock):
expect = BYTES
while expect > 0:
d = sock.recv(min(expect, SIZE))
expect -= len(d)
def writer(addr, socket_impl):
sock = socket_impl(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(addr)
sent = 0
while sent < BYTES:
d = 'xy' * (max(min(SIZE/2, BYTES-sent), 1))
sock.sendall(d)
sent += len(d)
def green_accepter(server_sock, pool):
for i in xrange(CONCURRENCY):
sock, addr = server_sock.accept()
pool.spawn_n(reader, sock)
def heavy_accepter(server_sock, pool):
for i in xrange(CONCURRENCY):
sock, addr = server_sock.accept()
t = threading.Thread(None, reader, "reader thread", (sock,))
t.start()
pool.append(t)
import eventlet.green.socket
import eventlet
from eventlet import debug
debug.hub_exceptions(True)
def launch_green_threads():
pool = eventlet.GreenPool(CONCURRENCY * 2 + 1)
server_sock = eventlet.green.socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(('localhost', 0))
server_sock.listen(50)
addr = ('localhost', server_sock.getsockname()[1])
pool.spawn_n(green_accepter, server_sock, pool)
for i in xrange(CONCURRENCY):
pool.spawn_n(writer, addr, eventlet.green.socket.socket)
pool.waitall()
import threading
import socket
def launch_heavy_threads():
threads = []
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_sock.bind(('localhost', 0))
server_sock.listen(50)
addr = ('localhost', server_sock.getsockname()[1])
accepter_thread = threading.Thread(None, heavy_accepter, "accepter thread", (server_sock, threads))
accepter_thread.start()
threads.append(accepter_thread)
for i in xrange(CONCURRENCY):
client_thread = threading.Thread(None, writer, "writer thread", (addr, socket.socket))
client_thread.start()
threads.append(client_thread)
for t in threads:
t.join()
if __name__ == "__main__":
import optparse
parser = optparse.OptionParser()
parser.add_option('--compare-threading', action='store_true', dest='threading', default=False)
parser.add_option('-b', '--bytes', type='int', dest='bytes',
default=BYTES)
parser.add_option('-s', '--size', type='int', dest='size',
default=SIZE)
parser.add_option('-c', '--concurrency', type='int', dest='concurrency',
default=CONCURRENCY)
opts, args = parser.parse_args()
BYTES=opts.bytes
SIZE=opts.size
CONCURRENCY=opts.concurrency
funcs = [launch_green_threads]
if opts.threading:
funcs = [launch_green_threads, launch_heavy_threads]
results = benchmarks.measure_best(3, 3,
lambda: None, lambda: None,
*funcs)
print "green:", results[launch_green_threads]
if opts.threading:
print "threads:", results[launch_heavy_threads]
print "%", (results[launch_green_threads]-results[launch_heavy_threads])/results[launch_heavy_threads] * 100

56
benchmarks/spawn.py Normal file
View File

@@ -0,0 +1,56 @@
"""Compare spawn to spawn_n"""
import eventlet
import benchmarks
def dummy(i=None):
return i
def run_spawn():
eventlet.spawn(dummy, 1)
def run_spawn_n():
eventlet.spawn_n(dummy, 1)
def run_spawn_n_kw():
eventlet.spawn_n(dummy, i=1)
def cleanup():
eventlet.sleep(0.2)
iters = 10000
best = benchmarks.measure_best(5, iters,
'pass',
cleanup,
run_spawn_n,
run_spawn,
run_spawn_n_kw)
print "eventlet.spawn", best[run_spawn]
print "eventlet.spawn_n", best[run_spawn_n]
print "eventlet.spawn_n(**kw)", best[run_spawn_n_kw]
print "%% %0.1f" % ((best[run_spawn]-best[run_spawn_n])/best[run_spawn_n] * 100)
pool = None
def setup():
global pool
pool = eventlet.GreenPool(iters)
def run_pool_spawn():
pool.spawn(dummy, 1)
def run_pool_spawn_n():
pool.spawn_n(dummy, 1)
def cleanup_pool():
pool.waitall()
best = benchmarks.measure_best(3, iters,
setup,
cleanup_pool,
run_pool_spawn,
run_pool_spawn_n,
)
print "eventlet.GreenPool.spawn", best[run_pool_spawn]
print "eventlet.GreenPool.spawn_n", best[run_pool_spawn_n]
print "%% %0.1f" % ((best[run_pool_spawn]-best[run_pool_spawn_n])/best[run_pool_spawn_n] * 100)

View File

@@ -37,12 +37,14 @@ easy_install eventlet
</ul> </ul>
</p> </p>
<h3>Mailing List</h3> <h3>Discussion</h3>
<p><a href="https://lists.secondlife.com/cgi-bin/mailman/listinfo/eventletdev">eventletdev at lists.secondlife.com</a></p> <p><a href="https://lists.secondlife.com/cgi-bin/mailman/listinfo/eventletdev">eventletdev at lists.secondlife.com</a></p>
<p>This is a relatively low-traffic list about using and developing eventlet. Look through the <a href="https://lists.secondlife.com/pipermail/eventletdev/">archives</a> for some useful information and possible answers to questions.</p> <p>This is a relatively low-traffic list about using and developing eventlet. Look through the <a href="https://lists.secondlife.com/pipermail/eventletdev/">archives</a> for some useful information and possible answers to questions.</p>
<p>There's an IRC channel dedicated to eventlet: <a href="irc://kubrick.freenode.net/#eventlet">#eventlet on freenode</a></p>. It's a pretty chill place to hang out!
<h3>Development</h3> <h3>Development</h3>
<p><a href="http://bitbucket.org/which_linden/eventlet/">trunk repository</a></p> <p><a href="http://bitbucket.org/which_linden/eventlet/">trunk repository</a></p>
@@ -102,6 +104,7 @@ easy_install eventlet
<li><a class="reference external" href="https://lists.secondlife.com/pipermail/eventletdev/">Mailing List Archives</a></li> <li><a class="reference external" href="https://lists.secondlife.com/pipermail/eventletdev/">Mailing List Archives</a></li>
<li><a class="reference external" href="http://eventlet.net/hudson/">Continuous Builds</a></li> <li><a class="reference external" href="http://eventlet.net/hudson/">Continuous Builds</a></li>
<li><a class="reference external" href="http://bitbucket.org/which_linden/eventlet/issues/new/">Bug Report Form</a></li> <li><a class="reference external" href="http://bitbucket.org/which_linden/eventlet/issues/new/">Bug Report Form</a></li>
<li><a class="reference external" href="irc://kubrick.freenode.net/#eventlet">irc channel</a></li>
</ul> </ul>
</div> </div>
</div> </div>

View File

@@ -92,6 +92,8 @@ def hub_exceptions(state):
""" """
from eventlet import hubs from eventlet import hubs
hubs.get_hub().set_timer_exceptions(state) hubs.get_hub().set_timer_exceptions(state)
from eventlet import greenpool
greenpool.DEBUG = state
def tpool_exceptions(state): def tpool_exceptions(state):
"""Toggles whether tpool itself prints exceptions that are raised from """Toggles whether tpool itself prints exceptions that are raised from

1
eventlet/green/Queue.py Normal file
View File

@@ -0,0 +1 @@
from eventlet.queue import *

View File

@@ -5,9 +5,12 @@ from eventlet import coros
from eventlet import event from eventlet import event
from eventlet import greenthread from eventlet import greenthread
from eventlet import semaphore from eventlet import semaphore
from eventlet.support import greenlets as greenlet
__all__ = ['GreenPool', 'GreenPile'] __all__ = ['GreenPool', 'GreenPile']
DEBUG = False
try: try:
next next
except NameError: except NameError:
@@ -84,9 +87,10 @@ class GreenPool(object):
try: try:
try: try:
func(*args, **kwargs) func(*args, **kwargs)
except (KeyboardInterrupt, SystemExit, GreenletExit): except (KeyboardInterrupt, SystemExit, greenlet.GreenletExit):
raise raise
except: except:
if DEBUG:
traceback.print_exc() traceback.print_exc()
finally: finally:
if coro is None: if coro is None:

View File

@@ -125,11 +125,13 @@ class BaseHub(object):
def squelch_exception(self, fileno, exc_info): def squelch_exception(self, fileno, exc_info):
traceback.print_exception(*exc_info) traceback.print_exception(*exc_info)
print >>sys.stderr, "Removing descriptor: %r" % (fileno,) sys.stderr.write("Removing descriptor: %r\n" % (fileno,))
sys.stderr.flush()
try: try:
self.remove_descriptor(fileno) self.remove_descriptor(fileno)
except Exception, e: except Exception, e:
print >>sys.stderr, "Exception while removing descriptor! %r" % (e,) sys.stderr.write("Exception while removing descriptor! %r\n" % (e,))
sys.stderr.flush()
def wait(self, seconds=None): def wait(self, seconds=None):
raise NotImplementedError("Implement this in a subclass") raise NotImplementedError("Implement this in a subclass")
@@ -211,7 +213,8 @@ class BaseHub(object):
def squelch_observer_exception(self, observer, exc_info): def squelch_observer_exception(self, observer, exc_info):
traceback.print_exception(*exc_info) traceback.print_exception(*exc_info)
print >>sys.stderr, "Removing observer: %r" % (observer,) sys.stderr.write("Removing observer: %r\n" % (observer,))
sys.stderr.flush()
self.remove_observer(observer) self.remove_observer(observer)
def fire_observers(self, activity): def fire_observers(self, activity):
@@ -228,7 +231,8 @@ class BaseHub(object):
def _debug_squelch_timer_exception(self, timer, exc_info): def _debug_squelch_timer_exception(self, timer, exc_info):
traceback.print_exception(*exc_info) traceback.print_exception(*exc_info)
print >>sys.stderr, "Timer raised: %r" % (timer,) sys.stderr.write("Timer raised: %r\n" % (timer,))
sys.stderr.flush()
squelch_timer_exception = _silent_squelch_timer_exception squelch_timer_exception = _silent_squelch_timer_exception
@@ -305,7 +309,7 @@ class BaseHub(object):
return self.listeners[WRITE].values() return self.listeners[WRITE].values()
def get_timers_count(hub): def get_timers_count(hub):
return max(len(x) for x in [hub.timers, hub.next_timers]) return max(len(hub.timers), len(hub.next_timers))
def set_debug_listeners(self, value): def set_debug_listeners(self, value):
if value: if value:

View File

@@ -21,7 +21,7 @@ from eventlet.hubs import get_hub
from eventlet.greenthread import getcurrent, exc_after from eventlet.greenthread import getcurrent, exc_after
from eventlet.event import Event from eventlet.event import Event
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue'] __all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty']
class Waiter(object): class Waiter(object):
"""A low level synchronization class. """A low level synchronization class.
@@ -99,14 +99,12 @@ class Waiter(object):
self.greenlet = None self.greenlet = None
class Queue(object): class LightQueue(object):
"""Create a queue object with a given maximum size. """
This is a variant of Queue that behaves mostly like the standard
If *maxsize* is less than zero or ``None``, the queue size is infinite. :class:`Queue`. It differs by not supporting the
:meth:`task_done <Queue.task_done>` or :meth:`join <Queue.join>` methods,
``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks until the and is a little faster for not having that overhead.
item is delivered. (This is unlike the standard :class:`Queue`, where 0 means
infinite size).
""" """
def __init__(self, maxsize=None): def __init__(self, maxsize=None):
@@ -300,7 +298,6 @@ class Queue(object):
def _schedule_unlock(self): def _schedule_unlock(self):
if self._event_unlock is None: if self._event_unlock is None:
self._event_unlock = get_hub().schedule_call_global(0, self._unlock) self._event_unlock = get_hub().schedule_call_global(0, self._unlock)
# QQQ re-activate event (with event_active libevent call) instead of creating a new one each time
class ItemWaiter(Waiter): class ItemWaiter(Waiter):
@@ -311,51 +308,34 @@ class ItemWaiter(Waiter):
self.item = item self.item = item
class PriorityQueue(Queue): class Queue(LightQueue):
'''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first). '''Create a queue object with a given maximum size.
Entries are typically tuples of the form: ``(priority number, data)``. If *maxsize* is less than zero or ``None``, the queue size is infinite.
``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks
until the item is delivered. (This is unlike the standard :class:`Queue`,
where 0 means infinite size).
In all other respects, this Queue class resembled the standard library,
:class:`Queue`.
''' '''
def _init(self, maxsize):
self.queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
class LifoQueue(Queue):
'''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
class JoinableQueue(Queue):
'''A subclass of :class:`Queue` that additionally has :meth:`task_done` and :meth:`join` methods.'''
def __init__(self, maxsize=None): def __init__(self, maxsize=None):
Queue.__init__(self, maxsize) LightQueue.__init__(self, maxsize)
self.unfinished_tasks = 0 self.unfinished_tasks = 0
self._cond = Event() self._cond = Event()
def _format(self): def _format(self):
result = Queue._format(self) result = LightQueue._format(self)
if self.unfinished_tasks: if self.unfinished_tasks:
result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond) result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
return result return result
def _put(self, item): def _put(self, item):
Queue._put(self, item) LightQueue._put(self, item)
self._put_bookkeeping()
def _put_bookkeeping(self):
self.unfinished_tasks += 1 self.unfinished_tasks += 1
if self._cond.ready(): if self._cond.ready():
self._cond.reset() self._cond.reset()
@@ -371,6 +351,7 @@ class JoinableQueue(Queue):
Raises a :exc:`ValueError` if called more times than there were items placed in the queue. Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
''' '''
if self.unfinished_tasks <= 0: if self.unfinished_tasks <= 0:
raise ValueError('task_done() called too many times') raise ValueError('task_done() called too many times')
self.unfinished_tasks -= 1 self.unfinished_tasks -= 1
@@ -387,3 +368,34 @@ class JoinableQueue(Queue):
''' '''
self._cond.wait() self._cond.wait()
class PriorityQueue(Queue):
'''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
Entries are typically tuples of the form: ``(priority number, data)``.
'''
def _init(self, maxsize):
self.queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
self._put_bookkeeping()
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
class LifoQueue(Queue):
'''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _put(self, item):
self.queue.append(item)
self._put_bookkeeping()
def _get(self):
return self.queue.pop()

View File

@@ -4,8 +4,9 @@ import os
import random import random
import eventlet import eventlet
from eventlet import api from eventlet import debug
from eventlet import hubs, greenpool, coros, event from eventlet import hubs, greenpool, coros, event
from eventlet.support import greenlets as greenlet
import tests import tests
def passthru(a): def passthru(a):
@@ -16,6 +17,9 @@ def passthru2(a, b):
eventlet.sleep(0.01) eventlet.sleep(0.01)
return a,b return a,b
def raiser(exc):
raise exc
class GreenPool(tests.LimitedTestCase): class GreenPool(tests.LimitedTestCase):
def test_spawn(self): def test_spawn(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
@@ -113,9 +117,10 @@ class GreenPool(tests.LimitedTestCase):
self.assertEquals('done', evt.wait()) self.assertEquals('done', evt.wait())
def assert_pool_has_free(self, pool, num_free): def assert_pool_has_free(self, pool, num_free):
self.assertEquals(pool.free(), num_free)
def wait_long_time(e): def wait_long_time(e):
e.wait() e.wait()
timer = api.exc_after(1, api.TimeoutError) timer = eventlet.exc_after(1, eventlet.TimeoutError)
try: try:
evt = event.Event() evt = event.Event()
for x in xrange(num_free): for x in xrange(num_free):
@@ -127,7 +132,7 @@ class GreenPool(tests.LimitedTestCase):
# if the runtime error is not raised it means the pool had # if the runtime error is not raised it means the pool had
# some unexpected free items # some unexpected free items
timer = api.exc_after(0, RuntimeError) timer = eventlet.exc_after(0, RuntimeError())
try: try:
self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt) self.assertRaises(RuntimeError, pool.spawn, wait_long_time, evt)
finally: finally:
@@ -177,7 +182,7 @@ class GreenPool(tests.LimitedTestCase):
tp = pools.TokenPool(max_size=1) tp = pools.TokenPool(max_size=1)
token = tp.get() # empty out the pool token = tp.get() # empty out the pool
def do_receive(tp): def do_receive(tp):
timer = api.exc_after(0, RuntimeError()) timer = eventlet.exc_after(0, RuntimeError())
try: try:
t = tp.get() t = tp.get()
self.fail("Shouldn't have recieved anything from the pool") self.fail("Shouldn't have recieved anything from the pool")
@@ -234,6 +239,19 @@ class GreenPool(tests.LimitedTestCase):
eventlet.sleep(0) eventlet.sleep(0)
self.assertEqual(set(r), set([1,2,3,4])) self.assertEqual(set(r), set([1,2,3,4]))
def test_exceptions(self):
p = greenpool.GreenPool(2)
for m in (p.spawn, p.spawn_n):
self.assert_pool_has_free(p, 2)
m(raiser, RuntimeError())
self.assert_pool_has_free(p, 1)
p.waitall()
self.assert_pool_has_free(p, 2)
m(raiser, greenlet.GreenletExit)
self.assert_pool_has_free(p, 1)
p.waitall()
self.assert_pool_has_free(p, 2)
def test_imap(self): def test_imap(self):
p = greenpool.GreenPool(4) p = greenpool.GreenPool(4)
result_list = list(p.imap(passthru, xrange(10))) result_list = list(p.imap(passthru, xrange(10)))
@@ -293,8 +311,8 @@ class GreenPile(tests.LimitedTestCase):
for i in xrange(4): for i in xrange(4):
p.spawn(passthru, i) p.spawn(passthru, i)
# now it should be full and this should time out # now it should be full and this should time out
api.exc_after(0, api.TimeoutError) eventlet.exc_after(0, eventlet.TimeoutError)
self.assertRaises(api.TimeoutError, p.spawn, passthru, "time out") self.assertRaises(eventlet.TimeoutError, p.spawn, passthru, "time out")
# verify that the spawn breakage didn't interrupt the sequence # verify that the spawn breakage didn't interrupt the sequence
# and terminates properly # and terminates properly
for i in xrange(4,10): for i in xrange(4,10):
@@ -361,7 +379,7 @@ class Stress(tests.LimitedTestCase):
break break
received += 1 received += 1
if received % 5 == 0: if received % 5 == 0:
api.sleep(0.0001) eventlet.sleep(0.0001)
unique, order = i unique, order = i
self.assert_(latest[unique] < order) self.assert_(latest[unique] < order)
latest[unique] = order latest[unique] = order
@@ -395,7 +413,7 @@ class Stress(tests.LimitedTestCase):
self.assert_(i > latest) self.assert_(i > latest)
latest = i latest = i
if latest % 5 == 0: if latest % 5 == 0:
api.sleep(0.001) eventlet.sleep(0.001)
if latest % 10 == 0: if latest % 10 == 0:
gc.collect() gc.collect()
objs_created = len(gc.get_objects()) - initial_obj_count objs_created = len(gc.get_objects()) - initial_obj_count

View File

@@ -218,7 +218,7 @@ class TestQueue(LimitedTestCase):
def test_task_done(self): def test_task_done(self):
from eventlet import queue, debug from eventlet import queue, debug
channel = queue.JoinableQueue(0) channel = queue.Queue(0)
X = object() X = object()
gt = eventlet.spawn(channel.put, X) gt = eventlet.spawn(channel.put, X)
result = channel.get() result = channel.get()

View File

@@ -0,0 +1,13 @@
from eventlet import patcher
from eventlet.green import Queue
from eventlet.green import threading
from eventlet.green import time
patcher.inject('test.test_queue',
globals(),
('Queue', Queue),
('threading', threading),
('time', time))
if __name__ == "__main__":
test_main()