diff --git a/eventlet/semaphore.py b/eventlet/semaphore.py index 73dbbc1..962b304 100644 --- a/eventlet/semaphore.py +++ b/eventlet/semaphore.py @@ -1,4 +1,7 @@ from __future__ import with_statement + +import collections + from eventlet import greenthread from eventlet import hubs from eventlet.timeout import Timeout @@ -35,7 +38,7 @@ class Semaphore(object): if value < 0: raise ValueError("Semaphore must be initialized with a positive " "number, got %s" % value) - self._waiters = set() + self._waiters = collections.deque() def __repr__(self): params = (self.__class__.__name__, hex(id(self)), @@ -80,8 +83,12 @@ class Semaphore(object): raise ValueError("can't specify timeout for non-blocking acquire") if not blocking and self.locked(): return False - if self.counter <= 0: - self._waiters.add(greenthread.getcurrent()) + + current_thread = greenthread.getcurrent() + + if self.counter <= 0 or self._waiters: + if current_thread not in self._waiters: + self._waiters.append(current_thread) try: if timeout is not None: ok = False @@ -92,10 +99,19 @@ class Semaphore(object): if not ok: return False else: - while self.counter <= 0: + # If someone else is already in this wait loop, give them + # a chance to get out. + while True: hubs.get_hub().switch() + if self.counter > 0: + break finally: - self._waiters.discard(greenthread.getcurrent()) + try: + self._waiters.remove(current_thread) + except ValueError: + # Fine if its already been dropped. + pass + self.counter -= 1 return True @@ -117,7 +133,7 @@ class Semaphore(object): def _do_acquire(self): if self._waiters and self.counter > 0: - waiter = self._waiters.pop() + waiter = self._waiters.popleft() waiter.switch() def __exit__(self, typ, val, tb): diff --git a/tests/semaphore_test.py b/tests/semaphore_test.py index 1316330..ced9136 100644 --- a/tests/semaphore_test.py +++ b/tests/semaphore_test.py @@ -45,5 +45,24 @@ class TestSemaphore(LimitedTestCase): self.assertRaises(ValueError, sem.acquire, blocking=False, timeout=1) +def test_semaphore_contention(): + g_mutex = semaphore.Semaphore() + counts = [0, 0] + + def worker(no): + while min(counts) < 200: + with g_mutex: + counts[no - 1] += 1 + eventlet.sleep(0.001) + + t1 = eventlet.spawn(worker, no=1) + t2 = eventlet.spawn(worker, no=2) + eventlet.sleep(0.5) + t1.kill() + t2.kill() + + assert abs(counts[0] - counts[1]) < int(min(counts) * 0.1), counts + + if __name__ == '__main__': - unittest.main() + unittest.main() \ No newline at end of file