move CoroutinePool from coros to pools

This commit is contained in:
Denis Bilenko
2009-05-19 12:00:04 +07:00
parent c6f18d1de9
commit 4ab1f8c16c
2 changed files with 553 additions and 475 deletions

View File

@@ -23,33 +23,15 @@ THE SOFTWARE.
"""
import collections
import sys
import time
import traceback
import weakref
from eventlet import api
from eventlet import pools
DEBUG = False
try:
set
except NameError: # python 2.3 compatibility
from sets import Set as set
class Cancelled(RuntimeError):
pass
class ExceptionWrapper(object):
def __init__(self, e):
self.e = e
class NOT_USED:
def __repr__(self):
return 'NOT_USED'
@@ -466,461 +448,9 @@ def execute(func, *args, **kw):
return evt
class CoroutinePool(pools.Pool):
""" Like a thread pool, but with coroutines.
Coroutine pools are useful for splitting up tasks or globally controlling
concurrency. You don't retrieve the coroutines directly with get() --
instead use the execute() and execute_async() methods to run code.
>>> from eventlet import coros, api
>>> p = coros.CoroutinePool(max_size=2)
>>> def foo(a):
... print "foo", a
...
>>> evt = p.execute(foo, 1)
>>> evt.wait()
foo 1
Once the pool is exhausted, calling an execute forces a yield.
>>> p.execute_async(foo, 2)
>>> p.execute_async(foo, 3)
>>> p.free()
0
>>> p.execute_async(foo, 4)
foo 2
foo 3
>>> api.sleep(0)
foo 4
"""
def __init__(self, min_size=0, max_size=4, track_events=False):
self._greenlets = set()
if track_events:
self._tracked_events = []
self._next_event = None
else:
self._tracked_events = None
self.requested = metaphore()
super(CoroutinePool, self).__init__(min_size, max_size)
## This doesn't yet pass its own doctest -- but I'm not even sure it's a
## wonderful idea.
## def __del__(self):
## """Experimental: try to prevent the calling script from exiting until
## all coroutines in this pool have run to completion.
## >>> from eventlet import coros
## >>> pool = coros.CoroutinePool()
## >>> def saw(x): print "I saw %s!"
## ...
## >>> pool.launch_all(saw, "GHI")
## >>> del pool
## I saw G!
## I saw H!
## I saw I!
## """
## self.wait_all()
def _main_loop(self, sender):
""" Private, infinite loop run by a pooled coroutine. """
try:
while True:
recvd = sender.wait()
# Delete the sender's result here because the very
# first event through the loop is referenced by
# spawn_startup, and therefore is not itself deleted.
# This means that we have to free up its argument
# because otherwise said argument persists in memory
# forever. This is generally only a problem in unit
# tests.
sender._result = NOT_USED
sender = event()
(evt, func, args, kw) = recvd
self._safe_apply(evt, func, args, kw)
#api.get_hub().cancel_timers(api.getcurrent())
# Likewise, delete these variables or else they will
# be referenced by this frame until replaced by the
# next recvd, which may or may not be a long time from
# now.
del evt, func, args, kw, recvd
self.put(sender)
finally:
# if we get here, something broke badly, and all we can really
# do is try to keep the pool from leaking items.
# Shouldn't even try to print the exception.
self.put(self.create())
def _safe_apply(self, evt, func, args, kw):
""" Private method that runs the function, catches exceptions, and
passes back the return value in the event."""
try:
result = func(*args, **kw)
if evt is not None:
evt.send(result)
if self._tracked_events is not None:
if self._next_event is None:
self._tracked_events.append(result)
else:
ne = self._next_event
self._next_event = None
ne.send(result)
except api.GreenletExit, e:
# we're printing this out to see if it ever happens
# in practice
print "GreenletExit raised in coroutine pool", e
if evt is not None:
evt.send(e) # sent as a return value, not an exception
except KeyboardInterrupt:
raise # allow program to exit
except Exception, e:
traceback.print_exc()
if evt is not None:
evt.send(exc=e)
if self._tracked_events is not None:
if self._next_event is None:
self._tracked_events.append(ExceptionWrapper(e))
else:
ne = self._next_event
self._next_event = None
ne.send(exc=e)
def _execute(self, evt, func, args, kw):
""" Private implementation of the execute methods.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
if self.free() == 0 and api.getcurrent() in self._greenlets:
self._safe_apply(evt, func, args, kw)
else:
sender = self.get()
sender.send((evt, func, args, kw))
def create(self):
"""Private implementation of eventlet.pools.Pool
interface. Creates an event and spawns the
_main_loop coroutine, passing the event.
The event is used to send a callable into the
new coroutine, to be executed.
"""
sender = event()
self._greenlets.add(api.spawn(self._main_loop, sender))
return sender
def get(self):
"""Override of eventlet.pools.Pool interface"""
# Track the number of requested CoroutinePool coroutines
self.requested.inc()
# forward call to base class
return super(CoroutinePool, self).get()
def put(self, item):
"""Override of eventlet.pools.Pool interface"""
# forward call to base class
super(CoroutinePool, self).put(item)
# Track the number of outstanding CoroutinePool coroutines
self.requested.dec()
def execute(self, func, *args, **kw):
"""Execute func in one of the coroutines maintained
by the pool, when one is free.
Immediately returns an eventlet.coros.event object which
func's result will be sent to when it is available.
>>> from eventlet import coros
>>> p = coros.CoroutinePool()
>>> evt = p.execute(lambda a: ('foo', a), 1)
>>> evt.wait()
('foo', 1)
"""
receiver = event()
self._execute(receiver, func, args, kw)
return receiver
def execute_async(self, func, *args, **kw):
"""Execute func in one of the coroutines maintained
by the pool, when one is free.
No return value is provided.
>>> from eventlet import coros, api
>>> p = coros.CoroutinePool()
>>> def foo(a):
... print "foo", a
...
>>> p.execute_async(foo, 1)
>>> api.sleep(0)
foo 1
"""
self._execute(None, func, args, kw)
def wait(self):
"""Wait for the next execute in the pool to complete,
and return the result.
You must pass track_events=True to the CoroutinePool constructor
in order to use this method.
"""
assert self._tracked_events is not None, (
"Must pass track_events=True to the constructor to use CoroutinePool.wait()")
if self._next_event is not None:
return self._next_event.wait()
if not self._tracked_events:
self._next_event = event()
return self._next_event.wait()
result = self._tracked_events.pop(0)
if isinstance(result, ExceptionWrapper):
raise result.e
if not self._tracked_events:
self._next_event = event()
return result
def killall(self):
for g in self._greenlets:
api.kill(g)
def wait_all(self):
"""Wait until all coroutines started either by execute() or
execute_async() have completed. If you kept the event objects returned
by execute(), you can then call their individual wait() methods to
retrieve results with no further actual waiting.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> pool.wait_all()
>>> def hi(name):
... print "Hello, %s!" % name
... return name
...
>>> evt = pool.execute(hi, "world")
>>> pool.execute_async(hi, "darkness, my old friend")
>>> pool.wait_all()
Hello, world!
Hello, darkness, my old friend!
>>> evt.wait()
'world'
>>> pool.wait_all()
"""
self.requested.wait()
def launch_all(self, function, iterable):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Discard values returned by function(). You should call wait_all() to
wait for all coroutines, newly-launched plus any previously-submitted
execute() or execute_async() calls, to complete.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> def saw(x):
... print "I saw %s!" % x
...
>>> pool.launch_all(saw, "ABC")
>>> pool.wait_all()
I saw A!
I saw B!
I saw C!
"""
for tup in iterable:
self.execute_async(function, *tup)
def process_all(self, function, iterable):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Discard values returned by function(). Don't return until all
coroutines, newly-launched plus any previously-submitted execute() or
execute_async() calls, have completed.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> def saw(x): print "I saw %s!" % x
...
>>> pool.process_all(saw, "DEF")
I saw D!
I saw E!
I saw F!
"""
self.launch_all(function, iterable)
self.wait_all()
def generate_results(self, function, iterable, qsize=None):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Yield each of the values returned by function(), in the order they're
completed rather than the order the coroutines were launched.
Iteration stops when we've yielded results for each arguments tuple in
iterable. Unlike wait_all() and process_all(), this function does not
wait for any previously-submitted execute() or execute_async() calls.
Results are temporarily buffered in a queue. If you pass qsize=, this
value is used to limit the max size of the queue: an attempt to buffer
too many results will suspend the completed CoroutinePool coroutine
until the requesting coroutine (the caller of generate_results()) has
retrieved one or more results by calling this generator-iterator's
next().
If any coroutine raises an uncaught exception, that exception will
propagate to the requesting coroutine via the corresponding next() call.
What I particularly want these tests to illustrate is that using this
generator function:
for result in generate_results(function, iterable):
# ... do something with result ...
executes coroutines at least as aggressively as the classic eventlet
idiom:
events = [pool.execute(function, *args) for args in iterable]
for event in events:
result = event.wait()
# ... do something with result ...
even without a distinct event object for every arg tuple in iterable,
and despite the funny flow control from interleaving launches of new
coroutines with yields of completed coroutines' results.
(The use case that makes this function preferable to the classic idiom
above is when the iterable, which may itself be a generator, produces
millions of items.)
>>> from eventlet import coros
>>> import string
>>> pool = coros.CoroutinePool(max_size=5)
>>> pausers = [coros.event() for x in xrange(2)]
>>> def longtask(evt, desc):
... print "%s woke up with %s" % (desc, evt.wait())
...
>>> pool.launch_all(longtask, zip(pausers, "AB"))
>>> def quicktask(desc):
... print "returning %s" % desc
... return desc
...
(Instead of using a for loop, step through generate_results()
items individually to illustrate timing)
>>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
>>> print step.next()
returning a
returning b
returning c
a
>>> print step.next()
b
>>> print step.next()
c
>>> print step.next()
returning d
returning e
returning f
d
>>> pausers[0].send("A")
>>> print step.next()
e
>>> print step.next()
f
>>> print step.next()
A woke up with A
returning g
returning h
returning i
g
>>> print "".join([step.next() for x in xrange(3)])
returning j
returning k
returning l
returning m
hij
>>> pausers[1].send("B")
>>> print "".join([step.next() for x in xrange(4)])
B woke up with B
returning n
returning o
returning p
returning q
klmn
"""
# Get an iterator because of our funny nested loop below. Wrap the
# iterable in enumerate() so we count items that come through.
tuples = iter(enumerate(iterable))
# If the iterable is empty, this whole function is a no-op, and we can
# save ourselves some grief by just quitting out. In particular, once
# we enter the outer loop below, we're going to wait on the queue --
# but if we launched no coroutines with that queue as the destination,
# we could end up waiting a very long time.
try:
index, args = tuples.next()
except StopIteration:
return
# From this point forward, 'args' is the current arguments tuple and
# 'index+1' counts how many such tuples we've seen.
# This implementation relies on the fact that _execute() accepts an
# event-like object, and -- unless it's None -- the completed
# coroutine calls send(result). We slyly pass a queue rather than an
# event -- the same queue instance for all coroutines. This is why our
# queue interface intentionally resembles the event interface.
q = queue(max_size=qsize)
# How many results have we yielded so far?
finished = 0
# This first loop is only until we've launched all the coroutines. Its
# complexity is because if iterable contains more args tuples than the
# size of our pool, attempting to _execute() the (poolsize+1)th
# coroutine would suspend until something completes and send()s its
# result to our queue. But to keep down queue overhead and to maximize
# responsiveness to our caller, we'd rather suspend on reading the
# queue. So we stuff the pool as full as we can, then wait for
# something to finish, then stuff more coroutines into the pool.
try:
while True:
# Before each yield, start as many new coroutines as we can fit.
# (The self.free() test isn't 100% accurate: if we happen to be
# executing in one of the pool's coroutines, we could _execute()
# without waiting even if self.free() reports 0. See _execute().)
# The point is that we don't want to wait in the _execute() call,
# we want to wait in the q.wait() call.
# IMPORTANT: at start, and whenever we've caught up with all
# coroutines we've launched so far, we MUST iterate this inner
# loop at least once, regardless of self.free() -- otherwise the
# q.wait() call below will deadlock!
# Recall that index is the index of the NEXT args tuple that we
# haven't yet launched. Therefore it counts how many args tuples
# we've launched so far.
while self.free() > 0 or finished == index:
# Just like the implementation of execute_async(), save that
# we're passing our queue instead of None as the "event" to
# which to send() the result.
self._execute(q, function, args, {})
# We've consumed that args tuple, advance to next.
index, args = tuples.next()
# Okay, we've filled up the pool again, yield a result -- which
# will probably wait for a coroutine to complete. Although we do
# have q.ready(), so we could iterate without waiting, we avoid
# that because every yield could involve considerable real time.
# We don't know how long it takes to return from yield, so every
# time we do, take the opportunity to stuff more requests into the
# pool before yielding again.
yield q.wait()
# Be sure to count results so we know when to stop!
finished += 1
except StopIteration:
pass
# Here we've exhausted the input iterable. index+1 is the total number
# of coroutines we've launched. We probably haven't yielded that many
# results yet. Wait for the rest of the results, yielding them as they
# arrive.
while finished < index + 1:
yield q.wait()
finished += 1
def CoroutinePool(*args, **kwargs):
from pools import CoroutinePool
return CoroutinePool(*args, **kwargs)
class pipe(object):

View File

@@ -23,10 +23,10 @@ THE SOFTWARE.
"""
import collections
import os
import socket
import traceback
from eventlet import api
from eventlet import coros
from eventlet import channel
@@ -133,3 +133,551 @@ class ConnectionPool(Pool):
def put(self, item):
## Discard item, create a new connection for the pool
Pool.put(self, self.create())
class ExceptionWrapper(object):
def __init__(self, e):
self.e = e
class CoroutinePool(Pool):
""" Like a thread pool, but with coroutines.
Coroutine pools are useful for splitting up tasks or globally controlling
concurrency. You don't retrieve the coroutines directly with get() --
instead use the execute() and execute_async() methods to run code.
>>> from eventlet import coros, api
>>> p = coros.CoroutinePool(max_size=2)
>>> def foo(a):
... print "foo", a
...
>>> evt = p.execute(foo, 1)
>>> evt.wait()
foo 1
Once the pool is exhausted, calling an execute forces a yield.
>>> p.execute_async(foo, 2)
>>> p.execute_async(foo, 3)
>>> p.free()
0
>>> p.execute_async(foo, 4)
foo 2
foo 3
>>> api.sleep(0)
foo 4
"""
def __init__(self, min_size=0, max_size=4, track_events=False):
self._greenlets = set()
if track_events:
self._tracked_events = []
self._next_event = None
else:
self._tracked_events = None
self.requested = coros.metaphore()
super(CoroutinePool, self).__init__(min_size, max_size)
## This doesn't yet pass its own doctest -- but I'm not even sure it's a
## wonderful idea.
## def __del__(self):
## """Experimental: try to prevent the calling script from exiting until
## all coroutines in this pool have run to completion.
## >>> from eventlet import coros
## >>> pool = coros.CoroutinePool()
## >>> def saw(x): print "I saw %s!"
## ...
## >>> pool.launch_all(saw, "GHI")
## >>> del pool
## I saw G!
## I saw H!
## I saw I!
## """
## self.wait_all()
def _main_loop(self, sender):
""" Private, infinite loop run by a pooled coroutine. """
try:
while True:
recvd = sender.wait()
# Delete the sender's result here because the very
# first event through the loop is referenced by
# spawn_startup, and therefore is not itself deleted.
# This means that we have to free up its argument
# because otherwise said argument persists in memory
# forever. This is generally only a problem in unit
# tests.
sender._result = coros.NOT_USED
sender = coros.event()
(evt, func, args, kw) = recvd
self._safe_apply(evt, func, args, kw)
#api.get_hub().cancel_timers(api.getcurrent())
# Likewise, delete these variables or else they will
# be referenced by this frame until replaced by the
# next recvd, which may or may not be a long time from
# now.
del evt, func, args, kw, recvd
self.put(sender)
finally:
# if we get here, something broke badly, and all we can really
# do is try to keep the pool from leaking items.
# Shouldn't even try to print the exception.
self.put(self.create())
def _safe_apply(self, evt, func, args, kw):
""" Private method that runs the function, catches exceptions, and
passes back the return value in the event."""
try:
result = func(*args, **kw)
if evt is not None:
evt.send(result)
if self._tracked_events is not None:
if self._next_event is None:
self._tracked_events.append(result)
else:
ne = self._next_event
self._next_event = None
ne.send(result)
except api.GreenletExit, e:
# we're printing this out to see if it ever happens
# in practice
print "GreenletExit raised in coroutine pool", e
if evt is not None:
evt.send(e) # sent as a return value, not an exception
except KeyboardInterrupt:
raise # allow program to exit
except Exception, e:
traceback.print_exc()
if evt is not None:
evt.send(exc=e)
if self._tracked_events is not None:
if self._next_event is None:
self._tracked_events.append(ExceptionWrapper(e))
else:
ne = self._next_event
self._next_event = None
ne.send(exc=e)
def _execute(self, evt, func, args, kw):
""" Private implementation of the execute methods.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
if self.free() == 0 and api.getcurrent() in self._greenlets:
self._safe_apply(evt, func, args, kw)
else:
sender = self.get()
sender.send((evt, func, args, kw))
def create(self):
"""Private implementation of eventlet.pools.Pool
interface. Creates an event and spawns the
_main_loop coroutine, passing the event.
The event is used to send a callable into the
new coroutine, to be executed.
"""
sender = coros.event()
self._greenlets.add(api.spawn(self._main_loop, sender))
return sender
def get(self):
"""Override of eventlet.pools.Pool interface"""
# Track the number of requested CoroutinePool coroutines
self.requested.inc()
# forward call to base class
return super(CoroutinePool, self).get()
def put(self, item):
"""Override of eventlet.pools.Pool interface"""
# forward call to base class
super(CoroutinePool, self).put(item)
# Track the number of outstanding CoroutinePool coroutines
self.requested.dec()
def execute(self, func, *args, **kw):
"""Execute func in one of the coroutines maintained
by the pool, when one is free.
Immediately returns an eventlet.coros.event object which
func's result will be sent to when it is available.
>>> from eventlet import coros
>>> p = coros.CoroutinePool()
>>> evt = p.execute(lambda a: ('foo', a), 1)
>>> evt.wait()
('foo', 1)
"""
receiver = coros.event()
self._execute(receiver, func, args, kw)
return receiver
def execute_async(self, func, *args, **kw):
"""Execute func in one of the coroutines maintained
by the pool, when one is free.
No return value is provided.
>>> from eventlet import coros, api
>>> p = coros.CoroutinePool()
>>> def foo(a):
... print "foo", a
...
>>> p.execute_async(foo, 1)
>>> api.sleep(0)
foo 1
"""
self._execute(None, func, args, kw)
def wait(self):
"""Wait for the next execute in the pool to complete,
and return the result.
You must pass track_events=True to the CoroutinePool constructor
in order to use this method.
"""
assert self._tracked_events is not None, (
"Must pass track_events=True to the constructor to use CoroutinePool.wait()")
if self._next_event is not None:
return self._next_event.wait()
if not self._tracked_events:
self._next_event = coros.event()
return self._next_event.wait()
result = self._tracked_events.pop(0)
if isinstance(result, ExceptionWrapper):
raise result.e
if not self._tracked_events:
self._next_event = coros.event()
return result
def killall(self):
for g in self._greenlets:
api.kill(g)
def wait_all(self):
"""Wait until all coroutines started either by execute() or
execute_async() have completed. If you kept the event objects returned
by execute(), you can then call their individual wait() methods to
retrieve results with no further actual waiting.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> pool.wait_all()
>>> def hi(name):
... print "Hello, %s!" % name
... return name
...
>>> evt = pool.execute(hi, "world")
>>> pool.execute_async(hi, "darkness, my old friend")
>>> pool.wait_all()
Hello, world!
Hello, darkness, my old friend!
>>> evt.wait()
'world'
>>> pool.wait_all()
"""
self.requested.wait()
def launch_all(self, function, iterable):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Discard values returned by function(). You should call wait_all() to
wait for all coroutines, newly-launched plus any previously-submitted
execute() or execute_async() calls, to complete.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> def saw(x):
... print "I saw %s!" % x
...
>>> pool.launch_all(saw, "ABC")
>>> pool.wait_all()
I saw A!
I saw B!
I saw C!
"""
for tup in iterable:
self.execute_async(function, *tup)
def process_all(self, function, iterable):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Discard values returned by function(). Don't return until all
coroutines, newly-launched plus any previously-submitted execute() or
execute_async() calls, have completed.
>>> from eventlet import coros
>>> pool = coros.CoroutinePool()
>>> def saw(x): print "I saw %s!" % x
...
>>> pool.process_all(saw, "DEF")
I saw D!
I saw E!
I saw F!
"""
self.launch_all(function, iterable)
self.wait_all()
def generate_results(self, function, iterable, qsize=None):
"""For each tuple (sequence) in iterable, launch function(*tuple) in
its own coroutine -- like itertools.starmap(), but in parallel.
Yield each of the values returned by function(), in the order they're
completed rather than the order the coroutines were launched.
Iteration stops when we've yielded results for each arguments tuple in
iterable. Unlike wait_all() and process_all(), this function does not
wait for any previously-submitted execute() or execute_async() calls.
Results are temporarily buffered in a queue. If you pass qsize=, this
value is used to limit the max size of the queue: an attempt to buffer
too many results will suspend the completed CoroutinePool coroutine
until the requesting coroutine (the caller of generate_results()) has
retrieved one or more results by calling this generator-iterator's
next().
If any coroutine raises an uncaught exception, that exception will
propagate to the requesting coroutine via the corresponding next() call.
What I particularly want these tests to illustrate is that using this
generator function:
for result in generate_results(function, iterable):
# ... do something with result ...
executes coroutines at least as aggressively as the classic eventlet
idiom:
events = [pool.execute(function, *args) for args in iterable]
for event in events:
result = event.wait()
# ... do something with result ...
even without a distinct event object for every arg tuple in iterable,
and despite the funny flow control from interleaving launches of new
coroutines with yields of completed coroutines' results.
(The use case that makes this function preferable to the classic idiom
above is when the iterable, which may itself be a generator, produces
millions of items.)
>>> from eventlet import coros
>>> import string
>>> pool = coros.CoroutinePool(max_size=5)
>>> pausers = [coros.event() for x in xrange(2)]
>>> def longtask(evt, desc):
... print "%s woke up with %s" % (desc, evt.wait())
...
>>> pool.launch_all(longtask, zip(pausers, "AB"))
>>> def quicktask(desc):
... print "returning %s" % desc
... return desc
...
(Instead of using a for loop, step through generate_results()
items individually to illustrate timing)
>>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
>>> print step.next()
returning a
returning b
returning c
a
>>> print step.next()
b
>>> print step.next()
c
>>> print step.next()
returning d
returning e
returning f
d
>>> pausers[0].send("A")
>>> print step.next()
e
>>> print step.next()
f
>>> print step.next()
A woke up with A
returning g
returning h
returning i
g
>>> print "".join([step.next() for x in xrange(3)])
returning j
returning k
returning l
returning m
hij
>>> pausers[1].send("B")
>>> print "".join([step.next() for x in xrange(4)])
B woke up with B
returning n
returning o
returning p
returning q
klmn
"""
# Get an iterator because of our funny nested loop below. Wrap the
# iterable in enumerate() so we count items that come through.
tuples = iter(enumerate(iterable))
# If the iterable is empty, this whole function is a no-op, and we can
# save ourselves some grief by just quitting out. In particular, once
# we enter the outer loop below, we're going to wait on the queue --
# but if we launched no coroutines with that queue as the destination,
# we could end up waiting a very long time.
try:
index, args = tuples.next()
except StopIteration:
return
# From this point forward, 'args' is the current arguments tuple and
# 'index+1' counts how many such tuples we've seen.
# This implementation relies on the fact that _execute() accepts an
# event-like object, and -- unless it's None -- the completed
# coroutine calls send(result). We slyly pass a queue rather than an
# event -- the same queue instance for all coroutines. This is why our
# queue interface intentionally resembles the event interface.
q = coros.queue(max_size=qsize)
# How many results have we yielded so far?
finished = 0
# This first loop is only until we've launched all the coroutines. Its
# complexity is because if iterable contains more args tuples than the
# size of our pool, attempting to _execute() the (poolsize+1)th
# coroutine would suspend until something completes and send()s its
# result to our queue. But to keep down queue overhead and to maximize
# responsiveness to our caller, we'd rather suspend on reading the
# queue. So we stuff the pool as full as we can, then wait for
# something to finish, then stuff more coroutines into the pool.
try:
while True:
# Before each yield, start as many new coroutines as we can fit.
# (The self.free() test isn't 100% accurate: if we happen to be
# executing in one of the pool's coroutines, we could _execute()
# without waiting even if self.free() reports 0. See _execute().)
# The point is that we don't want to wait in the _execute() call,
# we want to wait in the q.wait() call.
# IMPORTANT: at start, and whenever we've caught up with all
# coroutines we've launched so far, we MUST iterate this inner
# loop at least once, regardless of self.free() -- otherwise the
# q.wait() call below will deadlock!
# Recall that index is the index of the NEXT args tuple that we
# haven't yet launched. Therefore it counts how many args tuples
# we've launched so far.
while self.free() > 0 or finished == index:
# Just like the implementation of execute_async(), save that
# we're passing our queue instead of None as the "event" to
# which to send() the result.
self._execute(q, function, args, {})
# We've consumed that args tuple, advance to next.
index, args = tuples.next()
# Okay, we've filled up the pool again, yield a result -- which
# will probably wait for a coroutine to complete. Although we do
# have q.ready(), so we could iterate without waiting, we avoid
# that because every yield could involve considerable real time.
# We don't know how long it takes to return from yield, so every
# time we do, take the opportunity to stuff more requests into the
# pool before yielding again.
yield q.wait()
# Be sure to count results so we know when to stop!
finished += 1
except StopIteration:
pass
# Here we've exhausted the input iterable. index+1 is the total number
# of coroutines we've launched. We probably haven't yielded that many
# results yet. Wait for the rest of the results, yielding them as they
# arrive.
while finished < index + 1:
yield q.wait()
finished += 1
class Actor(object):
""" A free-running coroutine that accepts and processes messages.
Kind of the equivalent of an Erlang process, really. It processes
a queue of messages in the order that they were sent. You must
subclass this and implement your own version of receive().
The actor's reference count will never drop to zero while the
coroutine exists; if you lose all references to the actor object
it will never be freed.
"""
def __init__(self, concurrency = 1):
""" Constructs an Actor, kicking off a new coroutine to process the messages.
The concurrency argument specifies how many messages the actor will try
to process concurrently. If it is 1, the actor will process messages
serially.
"""
self._mailbox = collections.deque()
self._event = coros.event()
self._killer = api.spawn(self.run_forever)
self._pool = CoroutinePool(min_size=0, max_size=concurrency)
def run_forever(self):
""" Loops forever, continually checking the mailbox. """
while True:
if not self._mailbox:
self._event.wait()
self._event = coros.event()
else:
# leave the message in the mailbox until after it's
# been processed so the event doesn't get triggered
# while in the received method
self._pool.execute_async(
self.received, self._mailbox[0])
self._mailbox.popleft()
def cast(self, message):
""" Send a message to the actor.
If the actor is busy, the message will be enqueued for later
consumption. There is no return value.
>>> a = Actor()
>>> a.received = lambda msg: msg
>>> a.cast("hello")
"""
self._mailbox.append(message)
# if this is the only message, the coro could be waiting
if len(self._mailbox) == 1:
self._event.send()
def received(self, message):
""" Called to process each incoming message.
The default implementation just raises an exception, so
replace it with something useful!
>>> class Greeter(Actor):
... def received(self, (message, evt) ):
... print "received", message
... if evt: evt.send()
...
>>> a = Greeter()
This example uses events to synchronize between the actor and the main
coroutine in a predictable manner, but this kinda defeats the point of
the Actor, so don't do it in a real application.
>>> evt = event()
>>> a.cast( ("message 1", evt) )
>>> evt.wait() # force it to run at this exact moment
received message 1
>>> evt.reset()
>>> a.cast( ("message 2", None) )
>>> a.cast( ("message 3", evt) )
>>> evt.wait()
received message 2
received message 3
>>> api.kill(a._killer) # test cleanup
"""
raise NotImplementedError()