Added eventlet.green.Queue, and refactored the queue module so that it's actually stdlib-compatible.

This commit is contained in:
Ryan Williams
2010-01-23 19:44:04 -05:00
parent cbab881530
commit 3a1c7673fc
4 changed files with 70 additions and 44 deletions

1
eventlet/green/Queue.py Normal file
View File

@@ -0,0 +1 @@
from eventlet.queue import *

View File

@@ -21,7 +21,7 @@ from eventlet.hubs import get_hub
from eventlet.greenthread import getcurrent, exc_after
from eventlet.event import Event
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'JoinableQueue']
__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'LightQueue', 'Full', 'Empty']
class Waiter(object):
"""A low level synchronization class.
@@ -99,14 +99,12 @@ class Waiter(object):
self.greenlet = None
class Queue(object):
"""Create a queue object with a given maximum size.
If *maxsize* is less than zero or ``None``, the queue size is infinite.
``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks until the
item is delivered. (This is unlike the standard :class:`Queue`, where 0 means
infinite size).
class LightQueue(object):
"""
This is a variant of Queue that behaves mostly like the standard
:class:`Queue`. It differs by not supporting the
:meth:`task_done <Queue.task_done>` or :meth:`join <Queue.join>` methods,
and is a little faster for not having that overhead.
"""
def __init__(self, maxsize=None):
@@ -300,7 +298,6 @@ class Queue(object):
def _schedule_unlock(self):
if self._event_unlock is None:
self._event_unlock = get_hub().schedule_call_global(0, self._unlock)
# QQQ re-activate event (with event_active libevent call) instead of creating a new one each time
class ItemWaiter(Waiter):
@@ -309,53 +306,36 @@ class ItemWaiter(Waiter):
def __init__(self, item):
Waiter.__init__(self)
self.item = item
class Queue(LightQueue):
'''Create a queue object with a given maximum size.
class PriorityQueue(Queue):
'''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
If *maxsize* is less than zero or ``None``, the queue size is infinite.
Entries are typically tuples of the form: ``(priority number, data)``.
``Queue(0)`` is a channel, that is, its :meth:`put` method always blocks
until the item is delivered. (This is unlike the standard :class:`Queue`,
where 0 means infinite size).
In all other respects, this Queue class resembled the standard library,
:class:`Queue`.
'''
def _init(self, maxsize):
self.queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
class LifoQueue(Queue):
'''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _put(self, item):
self.queue.append(item)
def _get(self):
return self.queue.pop()
class JoinableQueue(Queue):
'''A subclass of :class:`Queue` that additionally has :meth:`task_done` and :meth:`join` methods.'''
def __init__(self, maxsize=None):
Queue.__init__(self, maxsize)
LightQueue.__init__(self, maxsize)
self.unfinished_tasks = 0
self._cond = Event()
def _format(self):
result = Queue._format(self)
result = LightQueue._format(self)
if self.unfinished_tasks:
result += ' tasks=%s _cond=%s' % (self.unfinished_tasks, self._cond)
return result
def _put(self, item):
Queue._put(self, item)
LightQueue._put(self, item)
self._put_bookkeeping()
def _put_bookkeeping(self):
self.unfinished_tasks += 1
if self._cond.ready():
self._cond.reset()
@@ -371,6 +351,7 @@ class JoinableQueue(Queue):
Raises a :exc:`ValueError` if called more times than there were items placed in the queue.
'''
if self.unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self.unfinished_tasks -= 1
@@ -387,3 +368,34 @@ class JoinableQueue(Queue):
'''
self._cond.wait()
class PriorityQueue(Queue):
'''A subclass of :class:`Queue` that retrieves entries in priority order (lowest first).
Entries are typically tuples of the form: ``(priority number, data)``.
'''
def _init(self, maxsize):
self.queue = []
def _put(self, item, heappush=heapq.heappush):
heappush(self.queue, item)
self._put_bookkeeping()
def _get(self, heappop=heapq.heappop):
return heappop(self.queue)
class LifoQueue(Queue):
'''A subclass of :class:`Queue` that retrieves most recently added entries first.'''
def _init(self, maxsize):
self.queue = []
def _put(self, item):
self.queue.append(item)
self._put_bookkeeping()
def _get(self):
return self.queue.pop()

View File

@@ -218,7 +218,7 @@ class TestQueue(LimitedTestCase):
def test_task_done(self):
from eventlet import queue, debug
channel = queue.JoinableQueue(0)
channel = queue.Queue(0)
X = object()
gt = eventlet.spawn(channel.put, X)
result = channel.get()

View File

@@ -0,0 +1,13 @@
from eventlet import patcher
from eventlet.green import Queue
from eventlet.green import threading
from eventlet.green import time
patcher.inject('test.test_queue',
globals(),
('Queue', Queue),
('threading', threading),
('time', time))
if __name__ == "__main__":
test_main()