From a99f1338682932cc6e18c948a965642477041396 Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Mon, 22 Jun 2009 15:55:38 +0700 Subject: [PATCH] coros: add 2 new class: Queue and Channel to replace the existing queue - Queue() is the same as previous queue() - Channel(N) is the same as previous queue(N) - queue(N=None) is a compatibility function that returns an appropriate instance based on N --- eventlet/coros.py | 198 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 140 insertions(+), 58 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index f5fabc6..7c0d82d 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -1,5 +1,5 @@ # @author Donovan Preston -# +# # Copyright (c) 2007, Linden Research, Inc. # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -7,10 +7,10 @@ # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: -# +# # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. -# +# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -21,6 +21,7 @@ import collections import time +import traceback from eventlet import api @@ -419,43 +420,11 @@ def CoroutinePool(*args, **kwargs): return Pool(*args, **kwargs) -class queue(object): - """Cross-coroutine queue, using semaphore to synchronize. - The API is like a generalization of event to be able to hold more than one - item at a time (without reset() or cancel()). +class Queue(object): - >>> from eventlet import coros - >>> q = coros.queue(max_size=2) - >>> def putter(q): - ... q.send("first") - ... - >>> _ = api.spawn(putter, q) - >>> q.ready() - False - >>> q.wait() - 'first' - >>> q.ready() - False - >>> q.send("second") - >>> q.ready() - True - >>> q.send("third") - >>> def getter(q): - ... print q.wait() - ... - >>> _ = api.spawn(getter, q) - >>> q.send("fourth") - second - """ - def __init__(self, max_size=None): - """If you omit max_size, the queue will attempt to store an unlimited - number of items. - Specifying max_size means that when the queue already contains - max_size items, an attempt to send() one more item will suspend the - calling coroutine until someone else retrieves one. - """ + def __init__(self): self.items = collections.deque() - self.sem = semaphore(count=0, limit=max_size) + self._waiters = set() def __nonzero__(self): return len(self.items)>0 @@ -463,41 +432,154 @@ class queue(object): def __len__(self): return len(self.items) - def __str__(self): - params = (self.__class__.__name__, hex(id(self)), self.sem, len(self.items)) - return '<%s at %s sem=%s items[%d]>' % params + def __repr__(self): + params = (self.__class__.__name__, hex(id(self)), len(self.items), len(self._waiters)) + return '<%s at %s items[%d] _waiters[%s]>' % params def send(self, result=None, exc=None): - """If you send(exc=SomeExceptionClass), the corresponding wait() call - will raise that exception. - Otherwise, the corresponding wait() will return result (default None). - """ if exc is not None and not isinstance(exc, tuple): exc = (exc, ) self.items.append((result, exc)) - self.sem.release() + if self._waiters: + api.get_hub().schedule_call_global(0, self._do_send) def send_exception(self, *args): # the arguments are the same as for greenlet.throw return self.send(exc=args) + def _do_send(self): + if self._waiters and self.items: + waiter = self._waiters.pop() + result, exc = self.items.popleft() + waiter.switch((result, exc)) + def wait(self): - """Wait for an item sent by a send() call, in FIFO order. - If the corresponding send() specifies exc=SomeExceptionClass, this - wait() will raise that exception. - Otherwise, this wait() will return the corresponding send() call's - result= parameter. - """ - self.sem.acquire() - result, exc = self.items.popleft() - if exc is not None: - api.getcurrent().throw(*exc) - return result + if self.items: + result, exc = self.items.popleft() + if exc is None: + return result + else: + api.getcurrent().throw(*exc) + else: + self._waiters.add(api.getcurrent()) + try: + result, exc = api.get_hub().switch() + if exc is None: + return result + else: + api.getcurrent().throw(*exc) + finally: + self._waiters.discard(api.getcurrent()) def ready(self): - # could also base this on self.sem.counter... return len(self.items) > 0 + def full(self): + # for consistency with Channel + return False + + def waiting(self): + return len(self._waiters) + + +class Channel(object): + + def __init__(self, max_size=0): + self.max_size = max_size + self.items = collections.deque() + self._waiters = set() + self._senders = set() + + def __nonzero__(self): + return len(self.items)>0 + + def __len__(self): + return len(self.items) + + def __repr__(self): + params = (self.__class__.__name__, hex(id(self)), self.max_size, len(self.items), len(self._waiters), len(self._senders)) + return '<%s at %s max=%s items[%d] _w[%s] _s[%s]>' % params + + def send(self, result=None, exc=None): + if exc is not None and not isinstance(exc, tuple): + exc = (exc, ) + if api.getcurrent() is api.get_hub().greenlet: + self.items.append((result, exc)) + else: +# if self._waiters and self._senders: +# api.sleep(0) + self.items.append((result, exc)) + # note that send() does not work well with timeouts. if your timeout fires + # after this point, the item will remain in the queue + if self._waiters: + api.get_hub().schedule_call_global(0, self._do_switch) + if len(self.items) > self.max_size: + self._senders.add(api.getcurrent()) + try: + api.get_hub().switch() + finally: + self._senders.discard(api.getcurrent()) + + def send_exception(self, *args): + # the arguments are the same as for greenlet.throw + return self.send(exc=args) + + def _do_switch(self): + while True: + if self._waiters and self.items: + waiter = self._waiters.pop() + result, exc = self.items.popleft() + try: + waiter.switch((result, exc)) + except: + traceback.print_exc() + elif self._senders and len(self.items) <= self.max_size: + sender = self._senders.pop() + try: + sender.switch() + except: + traceback.print_exc() + else: + break + + def wait(self): + if self.items: + result, exc = self.items.popleft() + if len(self.items) <= self.max_size: + api.get_hub().schedule_call_global(0, self._do_switch) + if exc is None: + return result + else: + api.getcurrent().throw(*exc) + else: + if self._senders: + api.get_hub().schedule_call_global(0, self._do_switch) + self._waiters.add(api.getcurrent()) + try: + result, exc = api.get_hub().switch() + if exc is None: + return result + else: + api.getcurrent().throw(*exc) + finally: + self._waiters.discard(api.getcurrent()) + + def ready(self): + return len(self.items) > 0 + + def full(self): + return len(self.items) >= self.max_size + + def waiting(self): + return max(0, len(self._waiters) - len(self.items)) + + +def queue(max_size=None): + if max_size is None: + return Queue() + else: + return Channel(max_size) + class Actor(object): """ A free-running coroutine that accepts and processes messages.