Moved semaphore classes into their own module.

This commit is contained in:
Ryan Williams
2010-01-17 17:55:21 -08:00
parent 977c985f75
commit 612460b9a5
6 changed files with 162 additions and 141 deletions

View File

@@ -6,6 +6,7 @@ import warnings
from eventlet import api from eventlet import api
from eventlet import hubs from eventlet import hubs
from eventlet import greenthread from eventlet import greenthread
from eventlet import semaphore as semaphoremod
class NOT_USED: class NOT_USED:
def __repr__(self): def __repr__(self):
@@ -27,137 +28,22 @@ def event(*a, **kw):
return greenthread.Event(*a, **kw) return greenthread.Event(*a, **kw)
class Semaphore(object): def Semaphore(count):
"""An unbounded semaphore. warnings.warn("The Semaphore class has moved! Please "
Optionally initialize with a resource *count*, then :meth:`acquire` and "use semaphore.Semaphore instead.",
:meth:`release` resources as needed. Attempting to :meth:`acquire` when DeprecationWarning, stacklevel=2)
*count* is zero suspends the calling coroutine until *count* becomes return semaphoremod.Semaphore(count)
nonzero again.
"""
def __init__(self, count=0):
self.counter = count
self._waiters = set()
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), self.counter, len(self._waiters))
return '<%s at %s c=%s _w[%s]>' % params
def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._waiters))
return '<%s c=%s _w[%s]>' % params
def locked(self):
return self.counter <= 0
def bounded(self):
# for consistency with BoundedSemaphore
return False
def acquire(self, blocking=True):
if not blocking and self.locked():
return False
if self.counter <= 0:
self._waiters.add(api.getcurrent())
try:
while self.counter <= 0:
hubs.get_hub().switch()
finally:
self._waiters.discard(api.getcurrent())
self.counter -= 1
return True
def __enter__(self):
self.acquire()
def release(self, blocking=True):
# `blocking' parameter is for consistency with BoundedSemaphore and is ignored
self.counter += 1
if self._waiters:
hubs.get_hub().schedule_call_global(0, self._do_acquire)
return True
def _do_acquire(self):
if self._waiters and self.counter>0:
waiter = self._waiters.pop()
waiter.switch()
def __exit__(self, typ, val, tb):
self.release()
@property
def balance(self):
# positive means there are free items
# zero means there are no free items but nobody has requested one
# negative means there are requests for items, but no items
return self.counter - len(self._waiters)
class BoundedSemaphore(object):
"""A bounded semaphore.
Optionally initialize with a resource *count*, then :meth:`acquire` and
:meth:`release` resources as needed. Attempting to :meth:`acquire` when
*count* is zero suspends the calling coroutine until count becomes nonzero
again. Attempting to :meth:`release` after *count* has reached *limit*
suspends the calling coroutine until *count* becomes less than *limit*
again.
"""
def __init__(self, count, limit):
if count > limit:
# accidentally, this also catches the case when limit is None
raise ValueError("'count' cannot be more than 'limit'")
self.lower_bound = Semaphore(count)
self.upper_bound = Semaphore(limit-count)
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), self.balance, self.lower_bound, self.upper_bound)
return '<%s at %s b=%s l=%s u=%s>' % params
def __str__(self):
params = (self.__class__.__name__, self.balance, self.lower_bound, self.upper_bound)
return '<%s b=%s l=%s u=%s>' % params
def locked(self):
return self.lower_bound.locked()
def bounded(self):
return self.upper_bound.locked()
def acquire(self, blocking=True):
if not blocking and self.locked():
return False
self.upper_bound.release()
try:
return self.lower_bound.acquire()
except:
self.upper_bound.counter -= 1
# using counter directly means that it can be less than zero.
# however I certainly don't need to wait here and I don't seem to have
# a need to care about such inconsistency
raise
def __enter__(self):
self.acquire()
def release(self, blocking=True):
if not blocking and self.bounded():
return False
self.lower_bound.release()
try:
return self.upper_bound.acquire()
except:
self.lower_bound.counter -= 1
raise
def __exit__(self, typ, val, tb):
self.release()
@property
def balance(self):
return self.lower_bound.balance - self.upper_bound.balance
def BoundedSemaphore(count):
warnings.warn("The BoundedSemaphore class has moved! Please "
"use semaphore.BoundedSemaphore instead.",
DeprecationWarning, stacklevel=2)
return semaphoremod.BoundedSemaphore(count)
def semaphore(count=0, limit=None): def semaphore(count=0, limit=None):
warnings.warn("coros.semaphore is deprecated. Please use either "
"semaphore.Semaphore or semaphore.BoundedSemaphore instead.",
DeprecationWarning, stacklevel=2)
if limit is None: if limit is None:
return Semaphore(count) return Semaphore(count)
else: else:

View File

@@ -2,7 +2,7 @@
__thread = __import__('thread') __thread = __import__('thread')
from eventlet.support import greenlets as greenlet from eventlet.support import greenlets as greenlet
from eventlet.api import spawn from eventlet.api import spawn
from eventlet.coros import Semaphore as LockType from eventlet.semaphore import Semaphore as LockType
error = __thread.error error = __thread.error

View File

@@ -1,7 +1,8 @@
import itertools import itertools
from eventlet import greenthread
from eventlet import coros from eventlet import coros
from eventlet import greenthread
from eventlet import semaphore
__all__ = ['GreenPool', 'GreenPile'] __all__ = ['GreenPool', 'GreenPile']
@@ -20,7 +21,7 @@ class GreenPool(object):
def __init__(self, size): def __init__(self, size):
self.size = size self.size = size
self.coroutines_running = set() self.coroutines_running = set()
self.sem = coros.Semaphore(size) self.sem = semaphore.Semaphore(size)
self.no_coros_running = greenthread.Event() self.no_coros_running = greenthread.Event()
def resize(self, new_size): def resize(self, new_size):

View File

@@ -13,8 +13,9 @@ def kill(g, *throw_args):
By default, this exception is GreenletExit, but a specific exception By default, this exception is GreenletExit, but a specific exception
may be specified in the *throw_args*. may be specified in the *throw_args*.
""" """
get_hub_().schedule_call_global(0, g.throw, *throw_args) hub = hubs.get_hub()
if getcurrent() is not get_hub_().greenlet: hub.schedule_call_global(0, g.throw, *throw_args)
if getcurrent() is not hub.greenlet:
sleep(0) sleep(0)
def sleep(seconds=0): def sleep(seconds=0):

131
eventlet/semaphore.py Normal file
View File

@@ -0,0 +1,131 @@
from eventlet import greenthread
from eventlet import hubs
class Semaphore(object):
"""An unbounded semaphore.
Optionally initialize with a resource *count*, then :meth:`acquire` and
:meth:`release` resources as needed. Attempting to :meth:`acquire` when
*count* is zero suspends the calling coroutine until *count* becomes
nonzero again.
"""
def __init__(self, count=0):
self.counter = count
self._waiters = set()
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), self.counter, len(self._waiters))
return '<%s at %s c=%s _w[%s]>' % params
def __str__(self):
params = (self.__class__.__name__, self.counter, len(self._waiters))
return '<%s c=%s _w[%s]>' % params
def locked(self):
return self.counter <= 0
def bounded(self):
# for consistency with BoundedSemaphore
return False
def acquire(self, blocking=True):
if not blocking and self.locked():
return False
if self.counter <= 0:
self._waiters.add(greenthread.getcurrent())
try:
while self.counter <= 0:
hubs.get_hub().switch()
finally:
self._waiters.discard(greenthread.getcurrent())
self.counter -= 1
return True
def __enter__(self):
self.acquire()
def release(self, blocking=True):
# `blocking' parameter is for consistency with BoundedSemaphore and is ignored
self.counter += 1
if self._waiters:
hubs.get_hub().schedule_call_global(0, self._do_acquire)
return True
def _do_acquire(self):
if self._waiters and self.counter>0:
waiter = self._waiters.pop()
waiter.switch()
def __exit__(self, typ, val, tb):
self.release()
@property
def balance(self):
# positive means there are free items
# zero means there are no free items but nobody has requested one
# negative means there are requests for items, but no items
return self.counter - len(self._waiters)
class BoundedSemaphore(object):
"""A bounded semaphore.
Optionally initialize with a resource *count*, then :meth:`acquire` and
:meth:`release` resources as needed. Attempting to :meth:`acquire` when
*count* is zero suspends the calling coroutine until count becomes nonzero
again. Attempting to :meth:`release` after *count* has reached *limit*
suspends the calling coroutine until *count* becomes less than *limit*
again.
"""
def __init__(self, count, limit):
if count > limit:
# accidentally, this also catches the case when limit is None
raise ValueError("'count' cannot be more than 'limit'")
self.lower_bound = Semaphore(count)
self.upper_bound = Semaphore(limit-count)
def __repr__(self):
params = (self.__class__.__name__, hex(id(self)), self.balance, self.lower_bound, self.upper_bound)
return '<%s at %s b=%s l=%s u=%s>' % params
def __str__(self):
params = (self.__class__.__name__, self.balance, self.lower_bound, self.upper_bound)
return '<%s b=%s l=%s u=%s>' % params
def locked(self):
return self.lower_bound.locked()
def bounded(self):
return self.upper_bound.locked()
def acquire(self, blocking=True):
if not blocking and self.locked():
return False
self.upper_bound.release()
try:
return self.lower_bound.acquire()
except:
self.upper_bound.counter -= 1
# using counter directly means that it can be less than zero.
# however I certainly don't need to wait here and I don't seem to have
# a need to care about such inconsistency
raise
def __enter__(self):
self.acquire()
def release(self, blocking=True):
if not blocking and self.bounded():
return False
self.lower_bound.release()
try:
return self.upper_bound.acquire()
except:
self.lower_bound.counter -= 1
raise
def __exit__(self, typ, val, tb):
self.release()
@property
def balance(self):
return self.lower_bound.balance - self.upper_bound.balance

View File

@@ -1,28 +1,30 @@
import unittest import unittest
from eventlet import api, coros import eventlet
from eventlet import semaphore
from tests import LimitedTestCase from tests import LimitedTestCase
class TestSemaphore(LimitedTestCase): class TestSemaphore(LimitedTestCase):
def test_bounded(self): def test_bounded(self):
# this was originally semaphore's doctest sem = semaphore.BoundedSemaphore(2, limit=3)
sem = coros.BoundedSemaphore(2, limit=3)
self.assertEqual(sem.acquire(), True) self.assertEqual(sem.acquire(), True)
self.assertEqual(sem.acquire(), True) self.assertEqual(sem.acquire(), True)
api.spawn(sem.release) gt1 = eventlet.spawn(sem.release)
self.assertEqual(sem.acquire(), True) self.assertEqual(sem.acquire(), True)
self.assertEqual(-3, sem.balance) self.assertEqual(-3, sem.balance)
sem.release() sem.release()
sem.release() sem.release()
sem.release() sem.release()
api.spawn(sem.acquire) gt2 = eventlet.spawn(sem.acquire)
sem.release() sem.release()
self.assertEqual(3, sem.balance) self.assertEqual(3, sem.balance)
gt1.wait()
gt2.wait()
def test_bounded_with_zero_limit(self): def test_bounded_with_zero_limit(self):
sem = coros.semaphore(0, 0) sem = semaphore.BoundedSemaphore(0, 0)
api.spawn(sem.acquire) gt = eventlet.spawn(sem.acquire)
sem.release() sem.release()
gt.wait()
if __name__=='__main__': if __name__=='__main__':