Reenabled the queue tests. The semaphore now releases to waiters in the order that they started waiting. There's a FIX note in there because the queue should not need a sleep before the waiter count is correct.
This commit is contained in:
@@ -262,6 +262,7 @@ class Semaphore(object):
|
||||
|
||||
def __init__(self, count=0):
|
||||
self.counter = count
|
||||
self._order = collections.deque()
|
||||
self._waiters = {}
|
||||
|
||||
def __str__(self):
|
||||
@@ -280,6 +281,7 @@ class Semaphore(object):
|
||||
return False
|
||||
while self.counter<=0:
|
||||
self._waiters[api.getcurrent()] = None
|
||||
self._order.append(api.getcurrent())
|
||||
try:
|
||||
api.get_hub().switch()
|
||||
finally:
|
||||
@@ -293,14 +295,21 @@ class Semaphore(object):
|
||||
def release(self, blocking=True):
|
||||
# `blocking' parameter is for consistency with BoundedSemaphore and is ignored
|
||||
self.counter += 1
|
||||
if self._waiters:
|
||||
if self._waiters or self._order:
|
||||
api.get_hub().schedule_call_global(0, self._do_acquire)
|
||||
return True
|
||||
|
||||
def _do_acquire(self):
|
||||
if self._waiters and self.counter>0:
|
||||
waiter, _unused = self._waiters.popitem()
|
||||
waiter.switch()
|
||||
if (self._waiters or self._order) and self.counter>0:
|
||||
waiter = None
|
||||
while not waiter and self._order:
|
||||
waiter = self._order.popleft()
|
||||
if waiter not in self._waiters:
|
||||
waiter = None
|
||||
|
||||
if waiter is not None:
|
||||
self._waiters.pop(waiter)
|
||||
waiter.switch()
|
||||
|
||||
def __exit__(self, typ, val, tb):
|
||||
self.release()
|
||||
|
@@ -73,7 +73,7 @@ class TestQueue(LimitedTestCase):
|
||||
self.assertEquals(e2.wait(),'hi')
|
||||
self.assertEquals(e1.wait(),'done')
|
||||
|
||||
def skip_test_multiple_waiters(self):
|
||||
def test_multiple_waiters(self):
|
||||
q = coros.queue()
|
||||
|
||||
def waiter(q, evt):
|
||||
@@ -164,7 +164,7 @@ class TestQueue(LimitedTestCase):
|
||||
self.assertEquals(e2.wait(), 'timed out')
|
||||
self.assertEquals(q.wait(), 'sent')
|
||||
|
||||
def disable_test_waiting(self):
|
||||
def test_waiting(self):
|
||||
def do_wait(q, evt):
|
||||
result = q.wait()
|
||||
evt.send(result)
|
||||
@@ -175,6 +175,7 @@ class TestQueue(LimitedTestCase):
|
||||
api.sleep(0)
|
||||
self.assertEquals(1, waiting(q))
|
||||
q.send('hi')
|
||||
api.sleep(0) # *FIX this should not be necessary
|
||||
self.assertEquals(0, waiting(q))
|
||||
self.assertEquals('hi', e1.wait())
|
||||
self.assertEquals(0, waiting(q))
|
||||
|
Reference in New Issue
Block a user