From 961419045b357378af67f700b41d9403d322b26b Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Wed, 24 Dec 2008 13:38:00 +0600 Subject: [PATCH] added coros.Semaphore and coros.BoundedSemaphore; made coros.semaphore a factory function; this fixes semaphore(0, 0) deadlock --- eventlet/coros.py | 193 ++++++++++++++++++++++++++++------------------ 1 file changed, 118 insertions(+), 75 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index e3a1cb1..b15f8fd 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -530,104 +530,147 @@ class multievent(object): return len(self.items) > 0 -class semaphore(object): - """Classic semaphore implemented with a counter and an event. +class Semaphore(object): + """An unbounded semaphore. Optionally initialize with a resource count, then acquire() and release() resources as needed. Attempting to acquire() when count is zero suspends the calling coroutine until count becomes nonzero again. - - >>> from eventlet import coros, api - >>> sem = coros.semaphore(2, limit=3) - >>> sem.acquire() - True - >>> sem.acquire() - True - >>> def releaser(sem): - ... print "releasing one" - ... sem.release() - ... - >>> _ = api.spawn(releaser, sem) - >>> sem.acquire() - releasing one - True - >>> sem.counter - 0 - >>> for x in xrange(3): - ... sem.release() - ... - >>> def acquirer(sem): - ... print "acquiring one" - ... sem.acquire() - ... - >>> _ = api.spawn(acquirer, sem) - >>> sem.release() - acquiring one - >>> sem.counter - 3 """ - def __init__(self, count=0, limit=None): - if limit is not None and count > limit: - # Prevent initializing with inconsistent values - count = limit + + def __init__(self, count=0): self.counter = count - self.limit = limit - self.acqevent = event() - self.relevent = event() - if self.counter > 0: - # If we initially have items, then don't block acquire()s. - self.acqevent.send() - if self.limit is None or self.counter < self.limit: - # If either there's no limit or we're below it, don't block on - # release()s. - self.relevent.send() + self._waiters = {} def __str__(self): - params = (self.__class__.__name__, hex(id(self)), self.counter, self.limit, self.acqevent, self.relevent) - return '<%s at %s %r/%r acq=%s rel=%s>' % params + params = (self.__class__.__name__, hex(id(self)), self.counter) + return '<%s at %s counter=%r>' % params def locked(self): return self.counter <= 0 - def acquire(self, blocking=1): - if blocking==0 and self.locked(): + def bounded(self): + # for consistency with BoundedSemaphore + return False + + def acquire(self, blocking=True): + if not blocking and self.locked(): return False - # This logic handles the self.limit is None case because None != any integer. - while self.counter == 0: - # Loop until there are resources to acquire. We loop because we - # could be one of several coroutines waiting for a single item. If - # we all get notified, only one is going to claim it, and the rest - # of us must continue waiting. - self.acqevent.wait() - # claim the resource + if self.counter<=0: + self._waiters[api.getcurrent()] = None + try: + api.get_hub().switch() + finally: + self._waiters.pop(api.getcurrent(), None) self.counter -= 1 - if self.counter == 0: - # If we just transitioned from having a resource to having none, - # make anyone else's wait() actually wait. - self.acqevent.reset() - if self.counter + 1 == self.limit: - # If we just transitioned from being full to having room for one - # more resource, notify whoever was waiting to release one. - self.relevent.send() return True __enter__ = acquire - def release(self): - # This logic handles the self.limit is None case because None != any integer. - while self.counter == self.limit: - self.relevent.wait() + def release(self, blocking=True): + # `blocking' parameter is for consistency with BoundedSemaphore and is ignored self.counter += 1 - if self.counter == self.limit: - self.relevent.reset() - if self.counter == 1: - # If self.counter was 0 before we incremented it, then wake up - # anybody who was waiting - self.acqevent.send() + if self._waiters: + api.get_hub().schedule_call_global(0, self._do_acquire) + return True + + def _do_acquire(self): + if self._waiters and self.counter>0: + waiter, _unused = self._waiters.popitem() + waiter.switch() def __exit__(self, typ, val, tb): self.release() +class BoundedSemaphore(object): + """A bounded semaphore. + Optionally initialize with a resource count, then acquire() and release() + resources as needed. Attempting to acquire() when count is zero suspends + the calling coroutine until count becomes nonzero again. Attempting to + release() after count has reached limit suspends the calling coroutine until + count becomes less than limit again. + + """ + def __init__(self, count, limit): + if count > limit: + # accidentally, this also catches the case when limit is None + raise ValueError("'count' cannot be more than 'limit'") + self.counter = count + self.limit = limit + self._acquire_waiters = {} + self._release_waiters = {} + + def __str__(self): + params = (self.__class__.__name__, hex(id(self)), self.counter, self.limit) + return '<%s at %s %r/%r>' % params + + def locked(self): + return self.counter <= 0 + + def bounded(self): + return self.counter >= self.limit + + def acquire(self, blocking=True): + if not blocking and self.locked(): + return False + if self.counter<=0: + if self._release_waiters: + api.get_hub().schedule_call(0, self._do_unlock) + self._acquire_waiters[api.getcurrent()] = None + try: + api.get_hub().switch() + finally: + self._acquire_waiters.pop(api.getcurrent(), None) + self.counter -= 1 + if self._release_waiters and self.counter < self.limit: + api.get_hub().schedule_call(0, self._do_release) + return True + + __enter__ = acquire + + def _do_unlock(self): + if self._release_waiters and self._acquire_waiters: + api.get_hub().schedule_call(0, self._do_acquire) + waiter, _unused = self._release_waiters.popitem() + waiter.switch() + + def _do_release(self): + if self._release_waiters and self.counter0: + waiter, _unused = self._acquire_waiters.popitem() + waiter.switch() + + def release(self, blocking=True): + if not blocking and self.bounded(): + return False + if self.counter>=self.limit: + if self._acquire_waiters: + api.get_hub().schedule_call(0, self._do_unlock) + self._release_waiters[api.getcurrent()] = None + try: + api.get_hub().switch() + finally: + self._release_waiters.pop(api.getcurrent(), None) + self.counter += 1 + if self._acquire_waiters and self.counter > 0: + api.get_hub().schedule_call(0, self._do_acquire) + return True + + def __exit__(self, typ, val, tb): + self.release() + + +def semaphore(count=0, limit=None): + if limit is None: + return Semaphore(count) + else: + return BoundedSemaphore(count, limit) + + class metaphore(object): """This is sort of an inverse semaphore: a counter that starts at 0 and waits only if nonzero. It's used to implement a "wait for all" scenario.