semaphore: Don't hog a semaphore if someone else is waiting for it
https://github.com/eventlet/eventlet/issues/136 https://github.com/eventlet/eventlet/pull/163 (comment by Sergey Shepelev) _waiters is now deque, watch out for O(N) acquire()
This commit is contained in:

committed by
Sergey Shepelev

parent
a6ce444265
commit
bab1116809
@@ -1,4 +1,7 @@
|
|||||||
from __future__ import with_statement
|
from __future__ import with_statement
|
||||||
|
|
||||||
|
import collections
|
||||||
|
|
||||||
from eventlet import greenthread
|
from eventlet import greenthread
|
||||||
from eventlet import hubs
|
from eventlet import hubs
|
||||||
from eventlet.timeout import Timeout
|
from eventlet.timeout import Timeout
|
||||||
@@ -35,7 +38,7 @@ class Semaphore(object):
|
|||||||
if value < 0:
|
if value < 0:
|
||||||
raise ValueError("Semaphore must be initialized with a positive "
|
raise ValueError("Semaphore must be initialized with a positive "
|
||||||
"number, got %s" % value)
|
"number, got %s" % value)
|
||||||
self._waiters = set()
|
self._waiters = collections.deque()
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
params = (self.__class__.__name__, hex(id(self)),
|
params = (self.__class__.__name__, hex(id(self)),
|
||||||
@@ -80,8 +83,12 @@ class Semaphore(object):
|
|||||||
raise ValueError("can't specify timeout for non-blocking acquire")
|
raise ValueError("can't specify timeout for non-blocking acquire")
|
||||||
if not blocking and self.locked():
|
if not blocking and self.locked():
|
||||||
return False
|
return False
|
||||||
if self.counter <= 0:
|
|
||||||
self._waiters.add(greenthread.getcurrent())
|
current_thread = greenthread.getcurrent()
|
||||||
|
|
||||||
|
if self.counter <= 0 or self._waiters:
|
||||||
|
if current_thread not in self._waiters:
|
||||||
|
self._waiters.append(current_thread)
|
||||||
try:
|
try:
|
||||||
if timeout is not None:
|
if timeout is not None:
|
||||||
ok = False
|
ok = False
|
||||||
@@ -92,10 +99,19 @@ class Semaphore(object):
|
|||||||
if not ok:
|
if not ok:
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
while self.counter <= 0:
|
# If someone else is already in this wait loop, give them
|
||||||
|
# a chance to get out.
|
||||||
|
while True:
|
||||||
hubs.get_hub().switch()
|
hubs.get_hub().switch()
|
||||||
|
if self.counter > 0:
|
||||||
|
break
|
||||||
finally:
|
finally:
|
||||||
self._waiters.discard(greenthread.getcurrent())
|
try:
|
||||||
|
self._waiters.remove(current_thread)
|
||||||
|
except ValueError:
|
||||||
|
# Fine if its already been dropped.
|
||||||
|
pass
|
||||||
|
|
||||||
self.counter -= 1
|
self.counter -= 1
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -117,7 +133,7 @@ class Semaphore(object):
|
|||||||
|
|
||||||
def _do_acquire(self):
|
def _do_acquire(self):
|
||||||
if self._waiters and self.counter > 0:
|
if self._waiters and self.counter > 0:
|
||||||
waiter = self._waiters.pop()
|
waiter = self._waiters.popleft()
|
||||||
waiter.switch()
|
waiter.switch()
|
||||||
|
|
||||||
def __exit__(self, typ, val, tb):
|
def __exit__(self, typ, val, tb):
|
||||||
|
@@ -45,5 +45,24 @@ class TestSemaphore(LimitedTestCase):
|
|||||||
self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
|
self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1)
|
||||||
|
|
||||||
|
|
||||||
|
def test_semaphore_contention():
|
||||||
|
g_mutex = semaphore.Semaphore()
|
||||||
|
counts = [0, 0]
|
||||||
|
|
||||||
|
def worker(no):
|
||||||
|
while min(counts) < 200:
|
||||||
|
with g_mutex:
|
||||||
|
counts[no - 1] += 1
|
||||||
|
eventlet.sleep(0.001)
|
||||||
|
|
||||||
|
t1 = eventlet.spawn(worker, no=1)
|
||||||
|
t2 = eventlet.spawn(worker, no=2)
|
||||||
|
eventlet.sleep(0.5)
|
||||||
|
t1.kill()
|
||||||
|
t2.kill()
|
||||||
|
|
||||||
|
assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
Reference in New Issue
Block a user