Updated NEWS, added GreenPool.starmap, fixed the annoying intermittent test failure.

This commit is contained in:
Ryan Williams
2010-01-18 14:11:14 -08:00
parent 02a47eaa7e
commit 9d07716c35
3 changed files with 48 additions and 16 deletions

12
NEWS
View File

@@ -1,6 +1,18 @@
0.9.3 0.9.3
===== =====
* New debug module, used for enabling verbosity within Eventlet that can help debug applications or Eventlet itself.
* Bugfixes in tpool, green.select
* Moved primary api module to __init__ from api. It shouldn't be necessary to import eventlet.api anymore; import eventlet should do the same job.
* Proc module deprecated in favor of greenthread
* New module greenthread, with new class GreenThread.
* New GreenPool class that replaces pool.Pool.
* Deprecated coros.execute
* Deprecated coros.semaphore
* Moved coros.BoundedSemaphore to semaphore.BoundedSemaphore
* Moved coros.Semaphore to semaphore.Semaphore
* Moved coros.event to event.Event
* Deprecated api.tcp_listener, api.connect_tcp, api.ssl_listener
* Moved get_hub, use_hub, get_default_hub to eventlet.hubs * Moved get_hub, use_hub, get_default_hub to eventlet.hubs
* Renamed libevent hub to pyevent. * Renamed libevent hub to pyevent.
* Renamed coros.event to coros.Event * Renamed coros.event to coros.Event

View File

@@ -125,25 +125,30 @@ class GreenPool(object):
else: else:
return 0 return 0
def _do_imap(self, func, it, gi): def _do_map(self, func, it, gi):
for args in it: for args in it:
gi.spawn(func, *args) gi.spawn(func, *args)
gi.spawn(raise_stop_iteration) gi.spawn(raise_stop_iteration)
def imap(self, function, *iterables): def starmap(self, function, iterable):
"""This is the same as itertools.imap, except that *func* is """This is the same as :func:`itertools.starmap`, except that *func* is
executed in separate green threads, with the concurrency controlled by executed in a separate green thread for each item, with the concurrency
the pool. In operation, imap consumes a constant amount of memory, limited by the pool's size. In operation, starmap consumes a constant
proportional to the size of the pool, and is thus suited for iterating amount of memory, proportional to the size of the pool, and is thus
over extremely long input lists. suited for iterating over extremely long input lists.
""" """
if function is None: if function is None:
function = lambda *a: a function = lambda *a: a
it = itertools.izip(*iterables) gi = GreenMap(self.size)
gi = GreenImap(self.size) greenthread.spawn_n(self._do_map, function, iterable, gi)
greenthread.spawn_n(self._do_imap, function, it, gi)
return gi return gi
def imap(self, function, *iterables):
"""This is the same as :func:`itertools.imap`, and has the same
concurrency and memory behavior as :meth:`starmap`.
"""
return self.starmap(function, itertools.izip(*iterables))
def raise_stop_iteration(): def raise_stop_iteration():
raise StopIteration() raise StopIteration()
@@ -158,7 +163,11 @@ class GreenPile(object):
A GreenPile can also be constructed standalone, not associated with any A GreenPile can also be constructed standalone, not associated with any
GreenPool. To do this, construct it with an integer size parameter instead GreenPool. To do this, construct it with an integer size parameter instead
of a GreenPool of a GreenPool.
It is not advisable to iterate over a GreenPile in a different greenthread
than the one which is calling spawn. The iterator will exit early in that
situation.
""" """
def __init__(self, size_or_pool=1000): def __init__(self, size_or_pool=1000):
if isinstance(size_or_pool, GreenPool): if isinstance(size_or_pool, GreenPool):
@@ -195,8 +204,15 @@ class GreenPile(object):
self.counter -= 1 self.counter -= 1
# this is identical to GreenPile but it blocks on spawn if the results # this is identical to GreenPile but it blocks on spawn if the results
# aren't consumed # aren't consumed, and it doesn't generate its own StopIteration exception,
class GreenImap(GreenPile): # instead relying on the spawning process to send one in when it's done
class GreenMap(GreenPile):
def __init__(self, size_or_pool): def __init__(self, size_or_pool):
super(GreenImap, self).__init__(size_or_pool) super(GreenMap, self).__init__(size_or_pool)
self.waiters = coros.Channel(max_size=self.pool.size) self.waiters = coros.Channel(max_size=self.pool.size)
def next(self):
try:
return self.waiters.wait().wait()
finally:
self.counter -= 1

View File

@@ -275,6 +275,10 @@ class GreenPool(tests.LimitedTestCase):
break break
self.assertEquals(results, [0,'r',2,3,4,5,6,'r',8,9]) self.assertEquals(results, [0,'r',2,3,4,5,6,'r',8,9])
def test_starmap(self):
p = greenpool.GreenPool(4)
result_list = list(p.starmap(passthru, [(x,) for x in xrange(10)]))
self.assertEquals(result_list, range(10))
class GreenPile(tests.LimitedTestCase): class GreenPile(tests.LimitedTestCase):
def test_pile(self): def test_pile(self):