diff --git a/eventlet/pools.py b/eventlet/pools.py index f24ca2b..0889fde 100644 --- a/eventlet/pools.py +++ b/eventlet/pools.py @@ -1,7 +1,7 @@ import collections from eventlet import api -from eventlet import coros +from eventlet import queue __all__ = ['Pool', 'TokenPool'] @@ -74,7 +74,7 @@ class Pool(object): self.max_size = max_size self.order_as_stack = order_as_stack self.current_size = 0 - self.channel = coros.queue(0) + self.channel = queue.LightQueue(0) self.free_items = collections.deque() for x in xrange(min_size): self.current_size += 1 @@ -89,7 +89,7 @@ class Pool(object): created = self.create() self.current_size += 1 return created - return self.channel.wait() + return self.channel.get() if item_impl is not None: item = item_impl @@ -102,7 +102,7 @@ class Pool(object): return if self.waiting(): - self.channel.send(item) + self.channel.put(item) else: if self.order_as_stack: self.free_items.appendleft(item) @@ -122,7 +122,7 @@ class Pool(object): def waiting(self): """Return the number of routines waiting for a pool item. """ - return self.channel.waiting() + return max(0, self.channel.getting() - self.channel.putting()) def create(self): """Generate a new pool item diff --git a/eventlet/queue.py b/eventlet/queue.py index 9e26cf8..cb655ec 100644 --- a/eventlet/queue.py +++ b/eventlet/queue.py @@ -1,13 +1,22 @@ # Copyright (c) 2009 Denis Bilenko. See LICENSE for details. """Synchronized queues. -The :mod:`eventlet.queue` module implements multi-producer, multi-consumer queues that work across greenlets, with the API similar to the classes found in the standard :mod:`Queue` and :class:`multiprocessing ` modules. +The :mod:`eventlet.queue` module implements multi-producer, multi-consumer +queues that work across greenlets, with the API similar to the classes found in +the standard :mod:`Queue` and :class:`multiprocessing ` +modules. A major difference is that queues in this module operate as channels when initialized with *maxsize* of zero. In such case, both :meth:`Queue.empty` -and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until a call to :meth:`Queue.get` retrieves the item. +and :meth:`Queue.full` return ``True`` and :meth:`Queue.put` always blocks until +a call to :meth:`Queue.get` retrieves the item. -Another interesting difference is that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be used as indicators of whether the subsequent :meth:`Queue.get` or :meth:`Queue.put` will not block. +An interesting difference, made possible because of greenthreads, is +that :meth:`Queue.qsize`, :meth:`Queue.empty`, and :meth:`Queue.full` *can* be +used as indicators of whether the subsequent :meth:`Queue.get` +or :meth:`Queue.put` will not block. The new methods :meth:`Queue.getting` +and :meth:`Queue.putting` report on the number of greenthreads blocking +in :meth:`put ` or :meth:`get ` respectively. """ import sys @@ -149,6 +158,16 @@ class LightQueue(object): def qsize(self): """Return the size of the queue.""" return len(self.queue) + + def putting(self): + """Returns the number of greenthreads that are blocked waiting to put + items into the queue.""" + return len(self.putters) + + def getting(self): + """Returns the number of greenthreads that are blocked waiting on an + empty queue.""" + return len(self.getters) def empty(self): """Return ``True`` if the queue is empty, ``False`` otherwise.""" diff --git a/tests/queue_test.py b/tests/queue_test.py index 91de3e7..19ec9d3 100644 --- a/tests/queue_test.py +++ b/tests/queue_test.py @@ -138,16 +138,15 @@ class TestQueue(LimitedTestCase): self.assertEquals(q.get(), 'sent') def test_waiting(self): - return # TODO add this to the new queue q = eventlet.Queue() gt1 = eventlet.spawn(q.get) eventlet.sleep(0) - self.assertEquals(1, q.waiting()) + self.assertEquals(1, q.getting()) q.put('hi') eventlet.sleep(0) - self.assertEquals(0, q.waiting()) + self.assertEquals(0, q.getting()) self.assertEquals('hi', gt1.wait()) - self.assertEquals(0, q.waiting()) + self.assertEquals(0, q.getting()) def test_channel_send(self): channel = eventlet.Queue(0) @@ -194,18 +193,15 @@ class TestQueue(LimitedTestCase): w2 = eventlet.spawn(c.get) w3 = eventlet.spawn(c.get) eventlet.sleep(0) - # TODO add waiting method to queue - #self.assertEquals(c.waiting(), 3) + self.assertEquals(c.getting(), 3) s1 = eventlet.spawn(c.put, 1) s2 = eventlet.spawn(c.put, 2) s3 = eventlet.spawn(c.put, 3) - eventlet.sleep(0) # this gets all the sends into a waiting state - # TODO add waiting method to queue - #self.assertEquals(c.waiting(), 0) s1.wait() s2.wait() s3.wait() + self.assertEquals(c.getting(), 0) # NOTE: we don't guarantee that waiters are served in order results = sorted([w1.wait(), w2.wait(), w3.wait()]) self.assertEquals(results, [1,2,3])