317 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			317 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from eventlet import coros, proc, api
 | |
| from eventlet.semaphore import Semaphore
 | |
| 
 | |
| import warnings
 | |
| warnings.warn("The pool module is deprecated.  Please use the "
 | |
|         "eventlet.GreenPool and eventlet.GreenPile classes instead.",
 | |
|         DeprecationWarning, stacklevel=2)
 | |
| 
 | |
| class Pool(object):
 | |
|     def __init__(self, min_size=0, max_size=4, track_events=False):
 | |
|         if min_size > max_size:
 | |
|             raise ValueError('min_size cannot be bigger than max_size')
 | |
|         self.max_size = max_size
 | |
|         self.sem = Semaphore(max_size)
 | |
|         self.procs = proc.RunningProcSet()
 | |
|         if track_events:
 | |
|             self.results = coros.queue()
 | |
|         else:
 | |
|             self.results = None
 | |
| 
 | |
|     def resize(self, new_max_size):
 | |
|         """ Change the :attr:`max_size` of the pool.
 | |
| 
 | |
|         If the pool gets resized when there are more than *new_max_size*
 | |
|         coroutines checked out, when they are returned to the pool they will be
 | |
|         discarded.  The return value of :meth:`free` will be negative in this
 | |
|         situation.
 | |
|         """
 | |
|         max_size_delta = new_max_size - self.max_size 
 | |
|         self.sem.counter += max_size_delta
 | |
|         self.max_size = new_max_size
 | |
| 
 | |
|     @property
 | |
|     def current_size(self):
 | |
|         """ The number of coroutines that are currently executing jobs. """
 | |
|         return len(self.procs)
 | |
| 
 | |
|     def free(self):
 | |
|         """ Returns the number of coroutines that are available for doing
 | |
|         work."""
 | |
|         return self.sem.counter
 | |
| 
 | |
|     def execute(self, func, *args, **kwargs):
 | |
|         """Execute func in one of the coroutines maintained
 | |
|         by the pool, when one is free.
 | |
| 
 | |
|         Immediately returns a :class:`~eventlet.proc.Proc` object which can be
 | |
|         queried for the func's result.
 | |
| 
 | |
|         >>> pool = Pool()
 | |
|         >>> task = pool.execute(lambda a: ('foo', a), 1)
 | |
|         >>> task.wait()
 | |
|         ('foo', 1)
 | |
|         """
 | |
|         # if reentering an empty pool, don't try to wait on a coroutine freeing
 | |
|         # itself -- instead, just execute in the current coroutine
 | |
|         if self.sem.locked() and api.getcurrent() in self.procs:
 | |
|             p = proc.spawn(func, *args, **kwargs)
 | |
|             try:
 | |
|                 p.wait()
 | |
|             except:
 | |
|                 pass
 | |
|         else:
 | |
|             self.sem.acquire()
 | |
|             p = self.procs.spawn(func, *args, **kwargs)
 | |
|             # assuming the above line cannot raise
 | |
|             p.link(lambda p: self.sem.release())
 | |
|         if self.results is not None:
 | |
|             p.link(self.results)
 | |
|         return p
 | |
| 
 | |
|     execute_async = execute
 | |
| 
 | |
|     def _execute(self, evt, func, args, kw):
 | |
|         p = self.execute(func, *args, **kw)
 | |
|         p.link(evt)
 | |
|         return p
 | |
| 
 | |
|     def waitall(self):
 | |
|         """ Calling this function blocks until every coroutine 
 | |
|         completes its work (i.e. there are 0 running coroutines)."""
 | |
|         return self.procs.waitall()
 | |
| 
 | |
|     wait_all = waitall
 | |
| 
 | |
|     def wait(self):
 | |
|         """Wait for the next execute in the pool to complete,
 | |
|         and return the result."""
 | |
|         return self.results.wait()
 | |
|         
 | |
|     def waiting(self):
 | |
|         """Return the number of coroutines waiting to execute.
 | |
|         """
 | |
|         if self.sem.balance < 0:
 | |
|             return -self.sem.balance
 | |
|         else:
 | |
|             return 0
 | |
| 
 | |
|     def killall(self):
 | |
|         """ Kill every running coroutine as immediately as possible."""
 | |
|         return self.procs.killall()
 | |
| 
 | |
|     def launch_all(self, function, iterable):
 | |
|         """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
 | |
|         in its own coroutine -- like ``itertools.starmap()``, but in parallel.
 | |
|         Discard values returned by ``function()``. You should call
 | |
|         ``wait_all()`` to wait for all coroutines, newly-launched plus any
 | |
|         previously-submitted :meth:`execute` or :meth:`execute_async` calls, to
 | |
|         complete.
 | |
| 
 | |
|         >>> pool = Pool()
 | |
|         >>> def saw(x):
 | |
|         ...     print "I saw %s!" % x
 | |
|         ...
 | |
|         >>> pool.launch_all(saw, "ABC")
 | |
|         >>> pool.wait_all()
 | |
|         I saw A!
 | |
|         I saw B!
 | |
|         I saw C!
 | |
|         """
 | |
|         for tup in iterable:
 | |
|             self.execute(function, *tup)
 | |
| 
 | |
|     def process_all(self, function, iterable):
 | |
|         """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
 | |
|         in its own coroutine -- like ``itertools.starmap()``, but in parallel.
 | |
|         Discard values returned by ``function()``. Don't return until all
 | |
|         coroutines, newly-launched plus any previously-submitted :meth:`execute()`
 | |
|         or :meth:`execute_async` calls, have completed.
 | |
| 
 | |
|         >>> from eventlet import coros
 | |
|         >>> pool = coros.CoroutinePool()
 | |
|         >>> def saw(x): print "I saw %s!" % x
 | |
|         ...
 | |
|         >>> pool.process_all(saw, "DEF")
 | |
|         I saw D!
 | |
|         I saw E!
 | |
|         I saw F!
 | |
|         """
 | |
|         self.launch_all(function, iterable)
 | |
|         self.wait_all()
 | |
| 
 | |
|     def generate_results(self, function, iterable, qsize=None):
 | |
|         """For each tuple (sequence) in *iterable*, launch ``function(*tuple)``
 | |
|         in its own coroutine -- like ``itertools.starmap()``, but in parallel.
 | |
|         Yield each of the values returned by ``function()``, in the order
 | |
|         they're completed rather than the order the coroutines were launched.
 | |
| 
 | |
|         Iteration stops when we've yielded results for each arguments tuple in
 | |
|         *iterable*. Unlike :meth:`wait_all` and :meth:`process_all`, this
 | |
|         function does not wait for any previously-submitted :meth:`execute` or
 | |
|         :meth:`execute_async` calls.
 | |
| 
 | |
|         Results are temporarily buffered in a queue. If you pass *qsize=*, this
 | |
|         value is used to limit the max size of the queue: an attempt to buffer
 | |
|         too many results will suspend the completed :class:`CoroutinePool`
 | |
|         coroutine until the requesting coroutine (the caller of
 | |
|         :meth:`generate_results`) has retrieved one or more results by calling
 | |
|         this generator-iterator's ``next()``.
 | |
| 
 | |
|         If any coroutine raises an uncaught exception, that exception will
 | |
|         propagate to the requesting coroutine via the corresponding ``next()``
 | |
|         call.
 | |
| 
 | |
|         What I particularly want these tests to illustrate is that using this
 | |
|         generator function::
 | |
| 
 | |
|             for result in generate_results(function, iterable):
 | |
|                 # ... do something with result ...
 | |
|                 pass
 | |
| 
 | |
|         executes coroutines at least as aggressively as the classic eventlet
 | |
|         idiom::
 | |
| 
 | |
|             events = [pool.execute(function, *args) for args in iterable]
 | |
|             for event in events:
 | |
|                 result = event.wait()
 | |
|                 # ... do something with result ...
 | |
| 
 | |
|         even without a distinct event object for every arg tuple in *iterable*,
 | |
|         and despite the funny flow control from interleaving launches of new
 | |
|         coroutines with yields of completed coroutines' results.
 | |
| 
 | |
|         (The use case that makes this function preferable to the classic idiom
 | |
|         above is when the *iterable*, which may itself be a generator, produces
 | |
|         millions of items.)
 | |
| 
 | |
|         >>> from eventlet import coros
 | |
|         >>> import string
 | |
|         >>> pool = coros.CoroutinePool(max_size=5)
 | |
|         >>> pausers = [coros.Event() for x in xrange(2)]
 | |
|         >>> def longtask(evt, desc):
 | |
|         ...     print "%s woke up with %s" % (desc, evt.wait())
 | |
|         ...
 | |
|         >>> pool.launch_all(longtask, zip(pausers, "AB"))
 | |
|         >>> def quicktask(desc):
 | |
|         ...     print "returning %s" % desc
 | |
|         ...     return desc
 | |
|         ...
 | |
| 
 | |
|         (Instead of using a ``for`` loop, step through :meth:`generate_results`
 | |
|         items individually to illustrate timing)
 | |
| 
 | |
|         >>> step = iter(pool.generate_results(quicktask, string.ascii_lowercase))
 | |
|         >>> print step.next()
 | |
|         returning a
 | |
|         returning b
 | |
|         returning c
 | |
|         a
 | |
|         >>> print step.next()
 | |
|         b
 | |
|         >>> print step.next()
 | |
|         c
 | |
|         >>> print step.next()
 | |
|         returning d
 | |
|         returning e
 | |
|         returning f
 | |
|         d
 | |
|         >>> pausers[0].send("A")
 | |
|         >>> print step.next()
 | |
|         e
 | |
|         >>> print step.next()
 | |
|         f
 | |
|         >>> print step.next()
 | |
|         A woke up with A
 | |
|         returning g
 | |
|         returning h
 | |
|         returning i
 | |
|         g
 | |
|         >>> print "".join([step.next() for x in xrange(3)])
 | |
|         returning j
 | |
|         returning k
 | |
|         returning l
 | |
|         returning m
 | |
|         hij
 | |
|         >>> pausers[1].send("B")
 | |
|         >>> print "".join([step.next() for x in xrange(4)])
 | |
|         B woke up with B
 | |
|         returning n
 | |
|         returning o
 | |
|         returning p
 | |
|         returning q
 | |
|         klmn
 | |
|         """
 | |
|         # Get an iterator because of our funny nested loop below. Wrap the
 | |
|         # iterable in enumerate() so we count items that come through.
 | |
|         tuples = iter(enumerate(iterable))
 | |
|         # If the iterable is empty, this whole function is a no-op, and we can
 | |
|         # save ourselves some grief by just quitting out. In particular, once
 | |
|         # we enter the outer loop below, we're going to wait on the queue --
 | |
|         # but if we launched no coroutines with that queue as the destination,
 | |
|         # we could end up waiting a very long time.
 | |
|         try:
 | |
|             index, args = tuples.next()
 | |
|         except StopIteration:
 | |
|             return
 | |
|         # From this point forward, 'args' is the current arguments tuple and
 | |
|         # 'index+1' counts how many such tuples we've seen.
 | |
|         # This implementation relies on the fact that _execute() accepts an
 | |
|         # event-like object, and -- unless it's None -- the completed
 | |
|         # coroutine calls send(result). We slyly pass a queue rather than an
 | |
|         # event -- the same queue instance for all coroutines. This is why our
 | |
|         # queue interface intentionally resembles the event interface.
 | |
|         q = coros.queue(max_size=qsize)
 | |
|         # How many results have we yielded so far?
 | |
|         finished = 0
 | |
|         # This first loop is only until we've launched all the coroutines. Its
 | |
|         # complexity is because if iterable contains more args tuples than the
 | |
|         # size of our pool, attempting to _execute() the (poolsize+1)th
 | |
|         # coroutine would suspend until something completes and send()s its
 | |
|         # result to our queue. But to keep down queue overhead and to maximize
 | |
|         # responsiveness to our caller, we'd rather suspend on reading the
 | |
|         # queue. So we stuff the pool as full as we can, then wait for
 | |
|         # something to finish, then stuff more coroutines into the pool.
 | |
|         try:
 | |
|             while True:
 | |
|                 # Before each yield, start as many new coroutines as we can fit.
 | |
|                 # (The self.free() test isn't 100% accurate: if we happen to be
 | |
|                 # executing in one of the pool's coroutines, we could _execute()
 | |
|                 # without waiting even if self.free() reports 0. See _execute().)
 | |
|                 # The point is that we don't want to wait in the _execute() call,
 | |
|                 # we want to wait in the q.wait() call.
 | |
|                 # IMPORTANT: at start, and whenever we've caught up with all
 | |
|                 # coroutines we've launched so far, we MUST iterate this inner
 | |
|                 # loop at least once, regardless of self.free() -- otherwise the
 | |
|                 # q.wait() call below will deadlock!
 | |
|                 # Recall that index is the index of the NEXT args tuple that we
 | |
|                 # haven't yet launched. Therefore it counts how many args tuples
 | |
|                 # we've launched so far.
 | |
|                 while self.free() > 0 or finished == index:
 | |
|                     # Just like the implementation of execute_async(), save that
 | |
|                     # we're passing our queue instead of None as the "event" to
 | |
|                     # which to send() the result.
 | |
|                     self._execute(q, function, args, {})
 | |
|                     # We've consumed that args tuple, advance to next.
 | |
|                     index, args = tuples.next()
 | |
|                 # Okay, we've filled up the pool again, yield a result -- which
 | |
|                 # will probably wait for a coroutine to complete. Although we do
 | |
|                 # have q.ready(), so we could iterate without waiting, we avoid
 | |
|                 # that because every yield could involve considerable real time.
 | |
|                 # We don't know how long it takes to return from yield, so every
 | |
|                 # time we do, take the opportunity to stuff more requests into the
 | |
|                 # pool before yielding again.
 | |
|                 yield q.wait()
 | |
|                 # Be sure to count results so we know when to stop!
 | |
|                 finished += 1
 | |
|         except StopIteration:
 | |
|             pass
 | |
|         # Here we've exhausted the input iterable. index+1 is the total number
 | |
|         # of coroutines we've launched. We probably haven't yielded that many
 | |
|         # results yet. Wait for the rest of the results, yielding them as they
 | |
|         # arrive.
 | |
|         while finished < index + 1:
 | |
|             yield q.wait()
 | |
|             finished += 1
 | |
| 
 | 
