coros: add 2 new class: Queue and Channel to replace the existing queue

- Queue() is the same as previous queue()
- Channel(N) is the same as previous queue(N)
- queue(N=None) is a compatibility function that returns an appropriate instance based on N
This commit is contained in:
Denis Bilenko
2009-06-22 15:55:38 +07:00
parent f9318e441a
commit a99f133868

View File

@@ -21,6 +21,7 @@
import collections import collections
import time import time
import traceback
from eventlet import api from eventlet import api
@@ -419,43 +420,11 @@ def CoroutinePool(*args, **kwargs):
return Pool(*args, **kwargs) return Pool(*args, **kwargs)
class queue(object): class Queue(object):
"""Cross-coroutine queue, using semaphore to synchronize.
The API is like a generalization of event to be able to hold more than one
item at a time (without reset() or cancel()).
>>> from eventlet import coros def __init__(self):
>>> q = coros.queue(max_size=2)
>>> def putter(q):
... q.send("first")
...
>>> _ = api.spawn(putter, q)
>>> q.ready()
False
>>> q.wait()
'first'
>>> q.ready()
False
>>> q.send("second")
>>> q.ready()
True
>>> q.send("third")
>>> def getter(q):
... print q.wait()
...
>>> _ = api.spawn(getter, q)
>>> q.send("fourth")
second
"""
def __init__(self, max_size=None):
"""If you omit max_size, the queue will attempt to store an unlimited
number of items.
Specifying max_size means that when the queue already contains
max_size items, an attempt to send() one more item will suspend the
calling coroutine until someone else retrieves one.
"""
self.items = collections.deque() self.items = collections.deque()
self.sem = semaphore(count=0, limit=max_size) self._waiters = set()
def __nonzero__(self): def __nonzero__(self):
return len(self.items)>0 return len(self.items)>0
@@ -463,41 +432,154 @@ class queue(object):
def __len__(self): def __len__(self):
return len(self.items) return len(self.items)
def __str__(self): def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), self.sem, len(self.items)) params = (self.__class__.__name__, hex(id(self)), len(self.items), len(self._waiters))
return '<%s at %s sem=%s items[%d]>' % params return '<%s at %s items[%d] _waiters[%s]>' % params
def send(self, result=None, exc=None): def send(self, result=None, exc=None):
"""If you send(exc=SomeExceptionClass), the corresponding wait() call
will raise that exception.
Otherwise, the corresponding wait() will return result (default None).
"""
if exc is not None and not isinstance(exc, tuple): if exc is not None and not isinstance(exc, tuple):
exc = (exc, ) exc = (exc, )
self.items.append((result, exc)) self.items.append((result, exc))
self.sem.release() if self._waiters:
api.get_hub().schedule_call_global(0, self._do_send)
def send_exception(self, *args): def send_exception(self, *args):
# the arguments are the same as for greenlet.throw # the arguments are the same as for greenlet.throw
return self.send(exc=args) return self.send(exc=args)
def wait(self): def _do_send(self):
"""Wait for an item sent by a send() call, in FIFO order. if self._waiters and self.items:
If the corresponding send() specifies exc=SomeExceptionClass, this waiter = self._waiters.pop()
wait() will raise that exception.
Otherwise, this wait() will return the corresponding send() call's
result= parameter.
"""
self.sem.acquire()
result, exc = self.items.popleft() result, exc = self.items.popleft()
if exc is not None: waiter.switch((result, exc))
api.getcurrent().throw(*exc)
def wait(self):
if self.items:
result, exc = self.items.popleft()
if exc is None:
return result return result
else:
api.getcurrent().throw(*exc)
else:
self._waiters.add(api.getcurrent())
try:
result, exc = api.get_hub().switch()
if exc is None:
return result
else:
api.getcurrent().throw(*exc)
finally:
self._waiters.discard(api.getcurrent())
def ready(self): def ready(self):
# could also base this on self.sem.counter...
return len(self.items) > 0 return len(self.items) > 0
def full(self):
# for consistency with Channel
return False
def waiting(self):
return len(self._waiters)
class Channel(object):
def __init__(self, max_size=0):
self.max_size = max_size
self.items = collections.deque()
self._waiters = set()
self._senders = set()
def __nonzero__(self):
return len(self.items)>0
def __len__(self):
return len(self.items)
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), self.max_size, len(self.items), len(self._waiters), len(self._senders))
return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params
def send(self, result=None, exc=None):
if exc is not None and not isinstance(exc, tuple):
exc = (exc, )
if api.getcurrent() is api.get_hub().greenlet:
self.items.append((result, exc))
else:
# if self._waiters and self._senders:
# api.sleep(0)
self.items.append((result, exc))
# note that send() does not work well with timeouts. if your timeout fires
# after this point, the item will remain in the queue
if self._waiters:
api.get_hub().schedule_call_global(0, self._do_switch)
if len(self.items) > self.max_size:
self._senders.add(api.getcurrent())
try:
api.get_hub().switch()
finally:
self._senders.discard(api.getcurrent())
def send_exception(self, *args):
# the arguments are the same as for greenlet.throw
return self.send(exc=args)
def _do_switch(self):
while True:
if self._waiters and self.items:
waiter = self._waiters.pop()
result, exc = self.items.popleft()
try:
waiter.switch((result, exc))
except:
traceback.print_exc()
elif self._senders and len(self.items) <= self.max_size:
sender = self._senders.pop()
try:
sender.switch()
except:
traceback.print_exc()
else:
break
def wait(self):
if self.items:
result, exc = self.items.popleft()
if len(self.items) <= self.max_size:
api.get_hub().schedule_call_global(0, self._do_switch)
if exc is None:
return result
else:
api.getcurrent().throw(*exc)
else:
if self._senders:
api.get_hub().schedule_call_global(0, self._do_switch)
self._waiters.add(api.getcurrent())
try:
result, exc = api.get_hub().switch()
if exc is None:
return result
else:
api.getcurrent().throw(*exc)
finally:
self._waiters.discard(api.getcurrent())
def ready(self):
return len(self.items) > 0
def full(self):
return len(self.items) >= self.max_size
def waiting(self):
return max(0, len(self._waiters) - len(self.items))
def queue(max_size=None):
if max_size is None:
return Queue()
else:
return Channel(max_size)
class Actor(object): class Actor(object):
""" A free-running coroutine that accepts and processes messages. """ A free-running coroutine that accepts and processes messages.