From ada0811e7a16223d05f8acb2b9f2c82e74413e0d Mon Sep 17 00:00:00 2001 From: Denis Bilenko Date: Sun, 29 Mar 2009 23:02:52 +0700 Subject: [PATCH] reimplement BoundedSemaphore using two Semaphores; this resulted in a much more readable implementation - queue(0) should work well as a channel now - add 'balance' property, similar to that of channel's --- eventlet/coros.py | 71 ++++++++++-------------------- greentest/test__coros_semaphore.py | 4 +- 2 files changed, 26 insertions(+), 49 deletions(-) diff --git a/eventlet/coros.py b/eventlet/coros.py index a92fc5d..d5533ca 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -335,74 +335,51 @@ class BoundedSemaphore(object): if count > limit: # accidentally, this also catches the case when limit is None raise ValueError("'count' cannot be more than 'limit'") - self.counter = count - self.limit = limit - self._acquire_waiters = {} - self._release_waiters = {} + self.lower_bound = Semaphore(count) + self.upper_bound = Semaphore(limit-count) def __str__(self): - params = (self.__class__.__name__, hex(id(self)), self.counter, self.limit) + params = (self.__class__.__name__, hex(id(self)), self.lower_bound.counter, self.upper_bound.counter) return '<%s at %s %r/%r>' % params def locked(self): - return self.counter <= 0 + return self.lower_bound.locked() def bounded(self): - return self.counter >= self.limit + return self.upper_bound.locked() def acquire(self, blocking=True): if not blocking and self.locked(): return False - if self.counter<=0: - if self._release_waiters: - api.get_hub().schedule_call_global(0, self._do_unlock) - self._acquire_waiters[api.getcurrent()] = None - try: - api.get_hub().switch() - finally: - self._acquire_waiters.pop(api.getcurrent(), None) - self.counter -= 1 - if self._release_waiters and self.counter < self.limit: - api.get_hub().schedule_call_global(0, self._do_release) - return True + self.upper_bound.release() + try: + return self.lower_bound.acquire() + except: + self.upper_bound.counter -= 1 + # using counter directly means that it can be less than zero. + # however I certainly don't need to wait here and I don't seem to have + # a need to care about such inconsistency + raise __enter__ = acquire - def _do_unlock(self): - if self._release_waiters and self._acquire_waiters: - waiter, _unused = self._release_waiters.popitem() - waiter.switch() - self._do_acquire() - - def _do_release(self): - if self._release_waiters and self.counter0: - waiter, _unused = self._acquire_waiters.popitem() - waiter.switch() - def release(self, blocking=True): if not blocking and self.bounded(): return False - if self.counter>=self.limit: - if self._acquire_waiters: - api.get_hub().schedule_call_global(0, self._do_unlock) - self._release_waiters[api.getcurrent()] = None - try: - api.get_hub().switch() - finally: - self._release_waiters.pop(api.getcurrent(), None) - self.counter += 1 - if self._acquire_waiters and self.counter > 0: - api.get_hub().schedule_call_global(0, self._do_acquire) - return True + self.lower_bound.release() + try: + return self.upper_bound.acquire() + except: + self.lower_bound.counter -= 1 + raise def __exit__(self, typ, val, tb): self.release() + @property + def balance(self): + return self.lower_bound.counter - self.upper_bound.counter + def semaphore(count=0, limit=None): if limit is None: diff --git a/greentest/test__coros_semaphore.py b/greentest/test__coros_semaphore.py index 62c07fb..cf33895 100644 --- a/greentest/test__coros_semaphore.py +++ b/greentest/test__coros_semaphore.py @@ -32,13 +32,13 @@ class TestSemaphore(LimitedTestCase): self.assertEqual(sem.acquire(), True) api.spawn(sem.release) self.assertEqual(sem.acquire(), True) - self.assertEqual(0, sem.counter) + self.assertEqual(-3, sem.balance) sem.release() sem.release() sem.release() api.spawn(sem.acquire) sem.release() - self.assertEqual(3, sem.counter) + self.assertEqual(3, sem.balance) def test_bounded_with_zero_limit(self): sem = coros.semaphore(0, 0)