diff --git a/eventlet/green/Queue.py b/eventlet/green/Queue.py new file mode 100644 index 0000000..e0f98df --- /dev/null +++ b/eventlet/green/Queue.py @@ -0,0 +1 @@ +from eventlet.queue import * diff --git a/eventlet/queue.py b/eventlet/queue.py index f6b93df..9e26cf8 100644 --- a/eventlet/queue.py +++ b/eventlet/queue.py @@ -21,7 +21,7 @@ from eventlet.hubs import get_hub from eventlet.greenthread import getcurrent, exc_after from eventlet.event import Event -__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue'] +__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty'] class Waiter(object): """A low level synchronization class. @@ -99,14 +99,12 @@ class Waiter(object): self.greenlet = None -class Queue(object): - """Create a queue object with a given maximum size. - - If *maxsize* is less than zero or ``None``, the queue size is infinite. - - ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks until the - item is delivered. (This is unlike the standard :class:`Queue`, where 0 means - infinite size). +class LightQueue(object): + """ + This is a variant of Queue that behaves mostly like the standard + :class:`Queue`. It differs by not supporting the + :meth:`task_done ` or :meth:`join ` methods, + and is a little faster for not having that overhead. """ def __init__(self, maxsize=None): @@ -300,7 +298,6 @@ class Queue(object): def _schedule_unlock(self): if self._event_unlock is None: self._event_unlock = get_hub().schedule_call_global(0, self._unlock) - # QQQ re-activate event (with event_active libevent call) instead of creating a new one each time class ItemWaiter(Waiter): @@ -309,53 +306,36 @@ class ItemWaiter(Waiter): def __init__(self, item): Waiter.__init__(self) self.item = item + +class Queue(LightQueue): + '''Create a queue object with a given maximum size. -class PriorityQueue(Queue): - '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first). + If *maxsize* is less than zero or ``None``, the queue size is infinite. - Entries are typically tuples of the form: ``(priority number, data)``. + ``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks + until the item is delivered. (This is unlike the standard :class:`Queue`, + where 0 means infinite size). + + In all other respects, this Queue class resembled the standard library, + :class:`Queue`. ''' - - def _init(self, maxsize): - self.queue = [] - - def _put(self, item, heappush=heapq.heappush): - heappush(self.queue, item) - - def _get(self, heappop=heapq.heappop): - return heappop(self.queue) - - -class LifoQueue(Queue): - '''A subclass of :class:`Queue` that retrieves most recently added entries first.''' - - def _init(self, maxsize): - self.queue = [] - - def _put(self, item): - self.queue.append(item) - - def _get(self): - return self.queue.pop() - - -class JoinableQueue(Queue): - '''A subclass of :class:`Queue` that additionally has :meth:`task_done` and :meth:`join` methods.''' - def __init__(self, maxsize=None): - Queue.__init__(self, maxsize) + LightQueue.__init__(self, maxsize) self.unfinished_tasks = 0 self._cond = Event() def _format(self): - result = Queue._format(self) + result = LightQueue._format(self) if self.unfinished_tasks: result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond) return result def _put(self, item): - Queue._put(self, item) + LightQueue._put(self, item) + self._put_bookkeeping() + + def _put_bookkeeping(self): self.unfinished_tasks += 1 if self._cond.ready(): self._cond.reset() @@ -371,6 +351,7 @@ class JoinableQueue(Queue): Raises a :exc:`ValueError` if called more times than there were items placed in the queue. ''' + if self.unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self.unfinished_tasks -= 1 @@ -387,3 +368,34 @@ class JoinableQueue(Queue): ''' self._cond.wait() + +class PriorityQueue(Queue): + '''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first). + + Entries are typically tuples of the form: ``(priority number, data)``. + ''' + + def _init(self, maxsize): + self.queue = [] + + def _put(self, item, heappush=heapq.heappush): + heappush(self.queue, item) + self._put_bookkeeping() + + def _get(self, heappop=heapq.heappop): + return heappop(self.queue) + + +class LifoQueue(Queue): + '''A subclass of :class:`Queue` that retrieves most recently added entries first.''' + + def _init(self, maxsize): + self.queue = [] + + def _put(self, item): + self.queue.append(item) + self._put_bookkeeping() + + def _get(self): + return self.queue.pop() + diff --git a/tests/queue_test.py b/tests/queue_test.py index c85f22a..91de3e7 100644 --- a/tests/queue_test.py +++ b/tests/queue_test.py @@ -218,7 +218,7 @@ class TestQueue(LimitedTestCase): def test_task_done(self): from eventlet import queue, debug - channel = queue.JoinableQueue(0) + channel = queue.Queue(0) X = object() gt = eventlet.spawn(channel.put, X) result = channel.get() diff --git a/tests/stdlib/test_queue.py b/tests/stdlib/test_queue.py new file mode 100644 index 0000000..39cfc13 --- /dev/null +++ b/tests/stdlib/test_queue.py @@ -0,0 +1,13 @@ +from eventlet import patcher +from eventlet.green import Queue +from eventlet.green import threading +from eventlet.green import time + +patcher.inject('test.test_queue', + globals(), + ('Queue', Queue), + ('threading', threading), + ('time', time)) + +if __name__ == "__main__": + test_main()