219 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			219 lines
		
	
	
		
			8.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import itertools
 | |
| import traceback
 | |
| 
 | |
| from eventlet import coros
 | |
| from eventlet import event
 | |
| from eventlet import greenthread
 | |
| from eventlet import semaphore
 | |
| 
 | |
| __all__ = ['GreenPool', 'GreenPile']
 | |
|                 
 | |
| try:
 | |
|     next
 | |
| except NameError:
 | |
|     def next(it):
 | |
|         try:
 | |
|             return it.next()
 | |
|         except AttributeError:
 | |
|             raise TypeError("%s object is not an iterator" % type(it))
 | |
| 
 | |
| class GreenPool(object):
 | |
|     """ The GreenPool class is a pool of green threads.
 | |
|     """
 | |
|     def __init__(self, size=1000):
 | |
|         self.size = size
 | |
|         self.coroutines_running = set()
 | |
|         self.sem = semaphore.Semaphore(size)
 | |
|         self.no_coros_running = event.Event()
 | |
|             
 | |
|     def resize(self, new_size):
 | |
|         """ Change the max number of greenthreads doing work at any given time.
 | |
|     
 | |
|         If resize is called when there are more than *new_size* greenthreads 
 | |
|         already working on tasks, they will be allowed to complete but no new 
 | |
|         tasks will be allowed to get launched until enough greenthreads finish 
 | |
|         their tasks to drop the overall quantity below *new_size*.  Until 
 | |
|         then, the return value of free() will be negative.
 | |
|         """
 | |
|         size_delta = new_size - self.size 
 | |
|         self.sem.counter += size_delta
 | |
|         self.size = new_size
 | |
|     
 | |
|     def running(self):
 | |
|         """ Returns the number of greenthreads that are currently executing
 | |
|         functions in the Parallel's pool."""
 | |
|         return len(self.coroutines_running)
 | |
| 
 | |
|     def free(self):
 | |
|         """ Returns the number of greenthreads available for use.
 | |
|         
 | |
|         If zero or less, the next call to :meth:`spawn` or :meth:`spawn_n` will 
 | |
|         block the calling greenthread until a slot becomes available."""
 | |
|         return self.sem.counter
 | |
| 
 | |
|     def spawn(self, function, *args, **kwargs):
 | |
|         """Run the *function* with its arguments in its own green thread.
 | |
|         Returns the GreenThread object that is running the function, which can
 | |
|         be used to retrieve the results.
 | |
|         """
 | |
|         # if reentering an empty pool, don't try to wait on a coroutine freeing
 | |
|         # itself -- instead, just execute in the current coroutine
 | |
|         current = greenthread.getcurrent()
 | |
|         if self.sem.locked() and current in self.coroutines_running:
 | |
|             # a bit hacky to use the GT without switching to it
 | |
|             gt = greenthread.GreenThread(current)
 | |
|             gt.main(function, args, kwargs)
 | |
|             return gt
 | |
|         else:
 | |
|             self.sem.acquire()
 | |
|             gt = greenthread.spawn(function, *args, **kwargs)
 | |
|             if not self.coroutines_running:
 | |
|                 self.no_coros_running = event.Event()
 | |
|             self.coroutines_running.add(gt)
 | |
|             gt.link(self._spawn_done)
 | |
|         return gt
 | |
|     
 | |
|     def _spawn_n_impl(self, func, args, kwargs, coro=None):
 | |
|         try:
 | |
|             try:
 | |
|                 func(*args, **kwargs)
 | |
|             except (KeyboardInterrupt, SystemExit, GreenletExit):
 | |
|                 raise
 | |
|             except:
 | |
|                 traceback.print_exc()
 | |
|         finally:
 | |
|             if coro is None:
 | |
|                 return
 | |
|             else:
 | |
|                 coro = greenthread.getcurrent()
 | |
|                 self._spawn_done(coro)
 | |
|     
 | |
|     def spawn_n(self, func, *args, **kwargs):
 | |
|         """ Create a greenthread to run the *function*.  Returns None; the 
 | |
|         results of the function are not retrievable.
 | |
|         """
 | |
|         # if reentering an empty pool, don't try to wait on a coroutine freeing
 | |
|         # itself -- instead, just execute in the current coroutine
 | |
|         current = greenthread.getcurrent()
 | |
|         if self.sem.locked() and current in self.coroutines_running:
 | |
|             self._spawn_n_impl(func, args, kwargs)
 | |
|         else:
 | |
|             self.sem.acquire()
 | |
|             g = greenthread.spawn_n(self._spawn_n_impl, 
 | |
|                 func, args, kwargs, coro=True)
 | |
|             if not self.coroutines_running:
 | |
|                 self.no_coros_running = event.Event()
 | |
|             self.coroutines_running.add(g)
 | |
| 
 | |
|     def waitall(self):
 | |
|         """Waits until all greenthreads in the pool are finished working."""
 | |
|         self.no_coros_running.wait()
 | |
|     
 | |
|     def _spawn_done(self, coro):
 | |
|         self.sem.release()
 | |
|         if coro is not None:
 | |
|             self.coroutines_running.remove(coro)
 | |
|         # if done processing (no more work is waiting for processing),
 | |
|         # we can finish off any waitall() calls that might be pending
 | |
|         if self.sem.balance == self.size:
 | |
|             self.no_coros_running.send(None)
 | |
|             
 | |
|     def waiting(self):
 | |
|         """Return the number of greenthreads waiting to spawn.
 | |
|         """
 | |
|         if self.sem.balance < 0:
 | |
|             return -self.sem.balance
 | |
|         else:
 | |
|             return 0           
 | |
|             
 | |
|     def _do_map(self, func, it, gi):
 | |
|         for args in it:
 | |
|             gi.spawn(func, *args)
 | |
|         gi.spawn(raise_stop_iteration)
 | |
|         
 | |
|     def starmap(self, function, iterable):
 | |
|         """This is the same as :func:`itertools.starmap`, except that *func* is 
 | |
|         executed in a separate green thread for each item, with the concurrency 
 | |
|         limited by the pool's size. In operation, starmap consumes a constant 
 | |
|         amount of memory, proportional to the size of the pool, and is thus 
 | |
|         suited for iterating over extremely long input lists.
 | |
|         """
 | |
|         if function is None:
 | |
|             function = lambda *a: a
 | |
|         gi = GreenMap(self.size)
 | |
|         greenthread.spawn_n(self._do_map, function, iterable, gi)
 | |
|         return gi
 | |
| 
 | |
|     def imap(self, function, *iterables):
 | |
|         """This is the same as :func:`itertools.imap`, and has the same
 | |
|         concurrency and memory behavior as :meth:`starmap`.
 | |
|         """
 | |
|         return self.starmap(function, itertools.izip(*iterables))
 | |
| 
 | |
|                                         
 | |
| def raise_stop_iteration():
 | |
|     raise StopIteration()
 | |
| 
 | |
|         
 | |
| class GreenPile(object):
 | |
|     """GreenPile is an abstraction representing a bunch of I/O-related tasks.
 | |
|     
 | |
|     Construct a GreenPile with an existing GreenPool object.  The GreenPile will
 | |
|     then use that pool's concurrency as it processes its jobs.  There can be 
 | |
|     many GreenPiles associated with a single GreenPool.
 | |
|     
 | |
|     A GreenPile can also be constructed standalone, not associated with any 
 | |
|     GreenPool.  To do this, construct it with an integer size parameter instead 
 | |
|     of a GreenPool.
 | |
|     
 | |
|     It is not advisable to iterate over a GreenPile in a different greenthread
 | |
|     than the one which is calling spawn.  The iterator will exit early in that
 | |
|     situation.
 | |
|     """
 | |
|     def __init__(self, size_or_pool=1000):
 | |
|         if isinstance(size_or_pool, GreenPool):
 | |
|             self.pool = size_or_pool
 | |
|         else:
 | |
|             self.pool = GreenPool(size_or_pool)
 | |
|         self.waiters = coros.Queue()
 | |
|         self.used = False
 | |
|         self.counter = 0
 | |
|             
 | |
|     def spawn(self, func, *args, **kw):
 | |
|         """Runs *func* in its own green thread, with the result available by 
 | |
|         iterating over the GreenPile object."""
 | |
|         self.used =  True
 | |
|         self.counter += 1
 | |
|         try:
 | |
|             gt = self.pool.spawn(func, *args, **kw)
 | |
|             self.waiters.send(gt)
 | |
|         except:
 | |
|             self.counter -= 1
 | |
|             raise
 | |
|         
 | |
|     def __iter__(self):
 | |
|         return self
 | |
|     
 | |
|     def next(self):
 | |
|         """Wait for the next result, suspending the current greenthread until it
 | |
|         is available.  Raises StopIteration when there are no more results."""
 | |
|         if self.counter == 0 and self.used:
 | |
|             raise StopIteration()
 | |
|         try:
 | |
|             return self.waiters.wait().wait()
 | |
|         finally:
 | |
|             self.counter -= 1
 | |
|             
 | |
| # this is identical to GreenPile but it blocks on spawn if the results 
 | |
| # aren't consumed, and it doesn't generate its own StopIteration exception,
 | |
| # instead relying on the spawning process to send one in when it's done
 | |
| class GreenMap(GreenPile):
 | |
|     def __init__(self, size_or_pool):
 | |
|         super(GreenMap, self).__init__(size_or_pool)
 | |
|         self.waiters = coros.Channel(max_size=self.pool.size)
 | |
|         
 | |
|     def next(self):
 | |
|         try:
 | |
|             return self.waiters.wait().wait()
 | |
|         finally:
 | |
|             self.counter -= 1 | 
