added coros.Semaphore and coros.BoundedSemaphore; made coros.semaphore a factory function; this fixes semaphore(0, 0) deadlock

This commit is contained in:
Denis Bilenko
2008-12-24 13:38:00 +06:00
parent 63bcdc5668
commit 961419045b

View File

@@ -530,104 +530,147 @@ class multievent(object):
return len(self.items) > 0 return len(self.items) > 0
class semaphore(object): class Semaphore(object):
"""Classic semaphore implemented with a counter and an event. """An unbounded semaphore.
Optionally initialize with a resource count, then acquire() and release() Optionally initialize with a resource count, then acquire() and release()
resources as needed. Attempting to acquire() when count is zero suspends resources as needed. Attempting to acquire() when count is zero suspends
the calling coroutine until count becomes nonzero again. the calling coroutine until count becomes nonzero again.
>>> from eventlet import coros, api
>>> sem = coros.semaphore(2, limit=3)
>>> sem.acquire()
True
>>> sem.acquire()
True
>>> def releaser(sem):
... print "releasing one"
... sem.release()
...
>>> _ = api.spawn(releaser, sem)
>>> sem.acquire()
releasing one
True
>>> sem.counter
0
>>> for x in xrange(3):
... sem.release()
...
>>> def acquirer(sem):
... print "acquiring one"
... sem.acquire()
...
>>> _ = api.spawn(acquirer, sem)
>>> sem.release()
acquiring one
>>> sem.counter
3
""" """
def __init__(self, count=0, limit=None):
if limit is not None and count > limit: def __init__(self, count=0):
# Prevent initializing with inconsistent values
count = limit
self.counter = count self.counter = count
self.limit = limit self._waiters = {}
self.acqevent = event()
self.relevent = event()
if self.counter > 0:
# If we initially have items, then don't block acquire()s.
self.acqevent.send()
if self.limit is None or self.counter < self.limit:
# If either there's no limit or we're below it, don't block on
# release()s.
self.relevent.send()
def __str__(self): def __str__(self):
params = (self.__class__.__name__, hex(id(self)), self.counter, self.limit, self.acqevent, self.relevent) params = (self.__class__.__name__, hex(id(self)), self.counter)
return '<%s at %s %r/%r acq=%s rel=%s>' % params return '<%s at %s counter=%r>' % params
def locked(self): def locked(self):
return self.counter <= 0 return self.counter <= 0
def acquire(self, blocking=1): def bounded(self):
if blocking==0 and self.locked(): # for consistency with BoundedSemaphore
return False
def acquire(self, blocking=True):
if not blocking and self.locked():
return False return False
# This logic handles the self.limit is None case because None != any integer. if self.counter<=0:
while self.counter == 0: self._waiters[api.getcurrent()] = None
# Loop until there are resources to acquire. We loop because we try:
# could be one of several coroutines waiting for a single item. If api.get_hub().switch()
# we all get notified, only one is going to claim it, and the rest finally:
# of us must continue waiting. self._waiters.pop(api.getcurrent(), None)
self.acqevent.wait()
# claim the resource
self.counter -= 1 self.counter -= 1
if self.counter == 0:
# If we just transitioned from having a resource to having none,
# make anyone else's wait() actually wait.
self.acqevent.reset()
if self.counter + 1 == self.limit:
# If we just transitioned from being full to having room for one
# more resource, notify whoever was waiting to release one.
self.relevent.send()
return True return True
__enter__ = acquire __enter__ = acquire
def release(self): def release(self, blocking=True):
# This logic handles the self.limit is None case because None != any integer. # `blocking' parameter is for consistency with BoundedSemaphore and is ignored
while self.counter == self.limit:
self.relevent.wait()
self.counter += 1 self.counter += 1
if self.counter == self.limit: if self._waiters:
self.relevent.reset() api.get_hub().schedule_call_global(0, self._do_acquire)
if self.counter == 1: return True
# If self.counter was 0 before we incremented it, then wake up
# anybody who was waiting def _do_acquire(self):
self.acqevent.send() if self._waiters and self.counter>0:
waiter, _unused = self._waiters.popitem()
waiter.switch()
def __exit__(self, typ, val, tb): def __exit__(self, typ, val, tb):
self.release() self.release()
class BoundedSemaphore(object):
"""A bounded semaphore.
Optionally initialize with a resource count, then acquire() and release()
resources as needed. Attempting to acquire() when count is zero suspends
the calling coroutine until count becomes nonzero again. Attempting to
release() after count has reached limit suspends the calling coroutine until
count becomes less than limit again.
"""
def __init__(self, count, limit):
if count > limit:
# accidentally, this also catches the case when limit is None
raise ValueError("'count' cannot be more than 'limit'")
self.counter = count
self.limit = limit
self._acquire_waiters = {}
self._release_waiters = {}
def __str__(self):
params = (self.__class__.__name__, hex(id(self)), self.counter, self.limit)
return '<%s at %s %r/%r>' % params
def locked(self):
return self.counter <= 0
def bounded(self):
return self.counter >= self.limit
def acquire(self, blocking=True):
if not blocking and self.locked():
return False
if self.counter<=0:
if self._release_waiters:
api.get_hub().schedule_call(0, self._do_unlock)
self._acquire_waiters[api.getcurrent()] = None
try:
api.get_hub().switch()
finally:
self._acquire_waiters.pop(api.getcurrent(), None)
self.counter -= 1
if self._release_waiters and self.counter < self.limit:
api.get_hub().schedule_call(0, self._do_release)
return True
__enter__ = acquire
def _do_unlock(self):
if self._release_waiters and self._acquire_waiters:
api.get_hub().schedule_call(0, self._do_acquire)
waiter, _unused = self._release_waiters.popitem()
waiter.switch()
def _do_release(self):
if self._release_waiters and self.counter<self.limit:
waiter, _unused = self._release_waiters.popitem()
waiter.switch()
def _do_acquire(self):
if self._acquire_waiters and self.counter>0:
waiter, _unused = self._acquire_waiters.popitem()
waiter.switch()
def release(self, blocking=True):
if not blocking and self.bounded():
return False
if self.counter>=self.limit:
if self._acquire_waiters:
api.get_hub().schedule_call(0, self._do_unlock)
self._release_waiters[api.getcurrent()] = None
try:
api.get_hub().switch()
finally:
self._release_waiters.pop(api.getcurrent(), None)
self.counter += 1
if self._acquire_waiters and self.counter > 0:
api.get_hub().schedule_call(0, self._do_acquire)
return True
def __exit__(self, typ, val, tb):
self.release()
def semaphore(count=0, limit=None):
if limit is None:
return Semaphore(count)
else:
return BoundedSemaphore(count, limit)
class metaphore(object): class metaphore(object):
"""This is sort of an inverse semaphore: a counter that starts at 0 and """This is sort of an inverse semaphore: a counter that starts at 0 and
waits only if nonzero. It's used to implement a "wait for all" scenario. waits only if nonzero. It's used to implement a "wait for all" scenario.