diff --git a/eventlet/coros.py b/eventlet/coros.py index b0db19e..f4242bc 100644 --- a/eventlet/coros.py +++ b/eventlet/coros.py @@ -6,6 +6,7 @@ import warnings from eventlet import api from eventlet import hubs from eventlet import greenthread +from eventlet import semaphore as semaphoremod class NOT_USED: def __repr__(self): @@ -27,137 +28,22 @@ def event(*a, **kw): return greenthread.Event(*a, **kw) -class Semaphore(object): - """An unbounded semaphore. - Optionally initialize with a resource *count*, then :meth:`acquire` and - :meth:`release` resources as needed. Attempting to :meth:`acquire` when - *count* is zero suspends the calling coroutine until *count* becomes - nonzero again. - """ - - def __init__(self, count=0): - self.counter = count - self._waiters = set() - - def __repr__(self): - params = (self.__class__.__name__, hex(id(self)), self.counter, len(self._waiters)) - return '<%s at %s c=%s _w[%s]>' % params - - def __str__(self): - params = (self.__class__.__name__, self.counter, len(self._waiters)) - return '<%s c=%s _w[%s]>' % params - - def locked(self): - return self.counter <= 0 - - def bounded(self): - # for consistency with BoundedSemaphore - return False - - def acquire(self, blocking=True): - if not blocking and self.locked(): - return False - if self.counter <= 0: - self._waiters.add(api.getcurrent()) - try: - while self.counter <= 0: - hubs.get_hub().switch() - finally: - self._waiters.discard(api.getcurrent()) - self.counter -= 1 - return True - - def __enter__(self): - self.acquire() - - def release(self, blocking=True): - # `blocking' parameter is for consistency with BoundedSemaphore and is ignored - self.counter += 1 - if self._waiters: - hubs.get_hub().schedule_call_global(0, self._do_acquire) - return True - - def _do_acquire(self): - if self._waiters and self.counter>0: - waiter = self._waiters.pop() - waiter.switch() - - def __exit__(self, typ, val, tb): - self.release() - - @property - def balance(self): - # positive means there are free items - # zero means there are no free items but nobody has requested one - # negative means there are requests for items, but no items - return self.counter - len(self._waiters) - - -class BoundedSemaphore(object): - """A bounded semaphore. - Optionally initialize with a resource *count*, then :meth:`acquire` and - :meth:`release` resources as needed. Attempting to :meth:`acquire` when - *count* is zero suspends the calling coroutine until count becomes nonzero - again. Attempting to :meth:`release` after *count* has reached *limit* - suspends the calling coroutine until *count* becomes less than *limit* - again. - """ - def __init__(self, count, limit): - if count > limit: - # accidentally, this also catches the case when limit is None - raise ValueError("'count' cannot be more than 'limit'") - self.lower_bound = Semaphore(count) - self.upper_bound = Semaphore(limit-count) - - def __repr__(self): - params = (self.__class__.__name__, hex(id(self)), self.balance, self.lower_bound, self.upper_bound) - return '<%s at %s b=%s l=%s u=%s>' % params - - def __str__(self): - params = (self.__class__.__name__, self.balance, self.lower_bound, self.upper_bound) - return '<%s b=%s l=%s u=%s>' % params - - def locked(self): - return self.lower_bound.locked() - - def bounded(self): - return self.upper_bound.locked() - - def acquire(self, blocking=True): - if not blocking and self.locked(): - return False - self.upper_bound.release() - try: - return self.lower_bound.acquire() - except: - self.upper_bound.counter -= 1 - # using counter directly means that it can be less than zero. - # however I certainly don't need to wait here and I don't seem to have - # a need to care about such inconsistency - raise - - def __enter__(self): - self.acquire() - - def release(self, blocking=True): - if not blocking and self.bounded(): - return False - self.lower_bound.release() - try: - return self.upper_bound.acquire() - except: - self.lower_bound.counter -= 1 - raise - - def __exit__(self, typ, val, tb): - self.release() - - @property - def balance(self): - return self.lower_bound.balance - self.upper_bound.balance +def Semaphore(count): + warnings.warn("The Semaphore class has moved! Please " + "use semaphore.Semaphore instead.", + DeprecationWarning, stacklevel=2) + return semaphoremod.Semaphore(count) +def BoundedSemaphore(count): + warnings.warn("The BoundedSemaphore class has moved! Please " + "use semaphore.BoundedSemaphore instead.", + DeprecationWarning, stacklevel=2) + return semaphoremod.BoundedSemaphore(count) def semaphore(count=0, limit=None): + warnings.warn("coros.semaphore is deprecated. Please use either " + "semaphore.Semaphore or semaphore.BoundedSemaphore instead.", + DeprecationWarning, stacklevel=2) if limit is None: return Semaphore(count) else: diff --git a/eventlet/green/thread.py b/eventlet/green/thread.py index 449aaf3..b5fe8fb 100644 --- a/eventlet/green/thread.py +++ b/eventlet/green/thread.py @@ -2,7 +2,7 @@ __thread = __import__('thread') from eventlet.support import greenlets as greenlet from eventlet.api import spawn -from eventlet.coros import Semaphore as LockType +from eventlet.semaphore import Semaphore as LockType error = __thread.error diff --git a/eventlet/greenpool.py b/eventlet/greenpool.py index c424cba..223dcbc 100644 --- a/eventlet/greenpool.py +++ b/eventlet/greenpool.py @@ -1,7 +1,8 @@ import itertools -from eventlet import greenthread from eventlet import coros +from eventlet import greenthread +from eventlet import semaphore __all__ = ['GreenPool', 'GreenPile'] @@ -20,7 +21,7 @@ class GreenPool(object): def __init__(self, size): self.size = size self.coroutines_running = set() - self.sem = coros.Semaphore(size) + self.sem = semaphore.Semaphore(size) self.no_coros_running = greenthread.Event() def resize(self, new_size): diff --git a/eventlet/greenthread.py b/eventlet/greenthread.py index 146115c..04d6930 100644 --- a/eventlet/greenthread.py +++ b/eventlet/greenthread.py @@ -13,8 +13,9 @@ def kill(g, *throw_args): By default, this exception is GreenletExit, but a specific exception may be specified in the *throw_args*. """ - get_hub_().schedule_call_global(0, g.throw, *throw_args) - if getcurrent() is not get_hub_().greenlet: + hub = hubs.get_hub() + hub.schedule_call_global(0, g.throw, *throw_args) + if getcurrent() is not hub.greenlet: sleep(0) def sleep(seconds=0): diff --git a/eventlet/semaphore.py b/eventlet/semaphore.py new file mode 100644 index 0000000..3072a97 --- /dev/null +++ b/eventlet/semaphore.py @@ -0,0 +1,131 @@ +from eventlet import greenthread +from eventlet import hubs + +class Semaphore(object): + """An unbounded semaphore. + Optionally initialize with a resource *count*, then :meth:`acquire` and + :meth:`release` resources as needed. Attempting to :meth:`acquire` when + *count* is zero suspends the calling coroutine until *count* becomes + nonzero again. + """ + + def __init__(self, count=0): + self.counter = count + self._waiters = set() + + def __repr__(self): + params = (self.__class__.__name__, hex(id(self)), self.counter, len(self._waiters)) + return '<%s at %s c=%s _w[%s]>' % params + + def __str__(self): + params = (self.__class__.__name__, self.counter, len(self._waiters)) + return '<%s c=%s _w[%s]>' % params + + def locked(self): + return self.counter <= 0 + + def bounded(self): + # for consistency with BoundedSemaphore + return False + + def acquire(self, blocking=True): + if not blocking and self.locked(): + return False + if self.counter <= 0: + self._waiters.add(greenthread.getcurrent()) + try: + while self.counter <= 0: + hubs.get_hub().switch() + finally: + self._waiters.discard(greenthread.getcurrent()) + self.counter -= 1 + return True + + def __enter__(self): + self.acquire() + + def release(self, blocking=True): + # `blocking' parameter is for consistency with BoundedSemaphore and is ignored + self.counter += 1 + if self._waiters: + hubs.get_hub().schedule_call_global(0, self._do_acquire) + return True + + def _do_acquire(self): + if self._waiters and self.counter>0: + waiter = self._waiters.pop() + waiter.switch() + + def __exit__(self, typ, val, tb): + self.release() + + @property + def balance(self): + # positive means there are free items + # zero means there are no free items but nobody has requested one + # negative means there are requests for items, but no items + return self.counter - len(self._waiters) + + +class BoundedSemaphore(object): + """A bounded semaphore. + Optionally initialize with a resource *count*, then :meth:`acquire` and + :meth:`release` resources as needed. Attempting to :meth:`acquire` when + *count* is zero suspends the calling coroutine until count becomes nonzero + again. Attempting to :meth:`release` after *count* has reached *limit* + suspends the calling coroutine until *count* becomes less than *limit* + again. + """ + def __init__(self, count, limit): + if count > limit: + # accidentally, this also catches the case when limit is None + raise ValueError("'count' cannot be more than 'limit'") + self.lower_bound = Semaphore(count) + self.upper_bound = Semaphore(limit-count) + + def __repr__(self): + params = (self.__class__.__name__, hex(id(self)), self.balance, self.lower_bound, self.upper_bound) + return '<%s at %s b=%s l=%s u=%s>' % params + + def __str__(self): + params = (self.__class__.__name__, self.balance, self.lower_bound, self.upper_bound) + return '<%s b=%s l=%s u=%s>' % params + + def locked(self): + return self.lower_bound.locked() + + def bounded(self): + return self.upper_bound.locked() + + def acquire(self, blocking=True): + if not blocking and self.locked(): + return False + self.upper_bound.release() + try: + return self.lower_bound.acquire() + except: + self.upper_bound.counter -= 1 + # using counter directly means that it can be less than zero. + # however I certainly don't need to wait here and I don't seem to have + # a need to care about such inconsistency + raise + + def __enter__(self): + self.acquire() + + def release(self, blocking=True): + if not blocking and self.bounded(): + return False + self.lower_bound.release() + try: + return self.upper_bound.acquire() + except: + self.lower_bound.counter -= 1 + raise + + def __exit__(self, typ, val, tb): + self.release() + + @property + def balance(self): + return self.lower_bound.balance - self.upper_bound.balance \ No newline at end of file diff --git a/tests/test__coros_semaphore.py b/tests/semaphore_test.py similarity index 62% rename from tests/test__coros_semaphore.py rename to tests/semaphore_test.py index 81eb869..131c0be 100644 --- a/tests/test__coros_semaphore.py +++ b/tests/semaphore_test.py @@ -1,28 +1,30 @@ import unittest -from eventlet import api, coros +import eventlet +from eventlet import semaphore from tests import LimitedTestCase class TestSemaphore(LimitedTestCase): - def test_bounded(self): - # this was originally semaphore's doctest - sem = coros.BoundedSemaphore(2, limit=3) + sem = semaphore.BoundedSemaphore(2, limit=3) self.assertEqual(sem.acquire(), True) self.assertEqual(sem.acquire(), True) - api.spawn(sem.release) + gt1 = eventlet.spawn(sem.release) self.assertEqual(sem.acquire(), True) self.assertEqual(-3, sem.balance) sem.release() sem.release() sem.release() - api.spawn(sem.acquire) + gt2 = eventlet.spawn(sem.acquire) sem.release() self.assertEqual(3, sem.balance) + gt1.wait() + gt2.wait() def test_bounded_with_zero_limit(self): - sem = coros.semaphore(0, 0) - api.spawn(sem.acquire) + sem = semaphore.BoundedSemaphore(0, 0) + gt = eventlet.spawn(sem.acquire) sem.release() + gt.wait() if __name__=='__main__':