Files
deb-python-eventlet/eventlet/parallel.py
2009-10-04 00:17:49 -07:00

87 lines
3.3 KiB
Python

from eventlet.coros import Semaphore, Queue
from eventlet.api import spawn, getcurrent
import sys
__all__ = ['Parallel']
class Parallel(object):
""" The Parallel class allows you to easily control coroutine concurrency.
"""
def __init__(self, max_size):
self.max_size = max_size
self.coroutines_running = set()
self.sem = Semaphore(max_size)
self._results = Queue()
def resize(self, new_max_size):
""" Change the max number of coroutines doing work at any given time.
If resize is called when there are more than *new_max_size*
coroutines already working on tasks, they will be allowed to complete but no
new tasks will be allowed to get launched until enough coroutines finish their
tasks to drop the overall quantity below *new_max_size*. Until then, the
return value of free() will be negative.
"""
max_size_delta = new_max_size - self.max_size
self.sem.counter += max_size_delta
self.max_size = new_max_size
@property
def current_size(self):
""" The current size is the number of coroutines that are currently
executing functions in the Parallel's pool."""
return len(self.coroutines_running)
def free(self):
""" Returns the number of coroutines available for use."""
return self.sem.counter
def _coro_done(self, coro, result, exc=None):
self.sem.release()
self.coroutines_running.remove(coro)
self._results.send(result)
# if done processing (no more work is being done),
# send StopIteration so that the queue knows it's done
if self.sem.balance == self.max_size:
self._results.send_exception(StopIteration)
def spawn(self, func, *args, **kwargs):
""" Create a coroutine to run func(*args, **kwargs). Returns a
Coro object that can be used to retrieve the results of the function.
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
current = getcurrent()
if self.sem.locked() and current in self.coroutines_running:
func(*args, **kwargs)
else:
self.sem.acquire()
p = spawn(func, *args, **kwargs)
self.coroutines_running.add(p)
p.link(self._coro_done)
return p
def wait(self):
"""Wait for the next execute in the pool to complete,
and return the result."""
return self.results.wait()
def results(self):
""" Returns an iterator over the results from the worker coroutines."""
return self._results
def _do_spawn_all(self, func, iterable):
for i in iterable:
if not isinstance(i, tuple):
self.spawn(func, i)
else:
self.spawn(func, *i)
def spawn_all(self, func, iterable):
""" Applies *func* over every item in *iterable* using the concurrency
present in the pool. This function is a generator which yields the
results of *func* as applied to the members of the iterable."""
spawn(self._do_spawn_all, func, iterable)
return self.results()