test__coros_queue: use queue's waiting() method
This commit is contained in:
@@ -1,12 +1,7 @@
|
||||
from greentest import LimitedTestCase
|
||||
from unittest import main
|
||||
from eventlet import api, coros
|
||||
from eventlet import api, coros, proc
|
||||
|
||||
def waiting(queue):
|
||||
try:
|
||||
return len(queue.sem._waiters)
|
||||
except AttributeError:
|
||||
return len(queue.sem.lower_bound._waiters)
|
||||
|
||||
class TestQueue(LimitedTestCase):
|
||||
|
||||
@@ -184,10 +179,10 @@ class TestQueue(LimitedTestCase):
|
||||
e1 = coros.event()
|
||||
api.spawn(do_wait, q, e1)
|
||||
api.sleep(0)
|
||||
self.assertEquals(1, waiting(q))
|
||||
self.assertEquals(1, q.waiting())
|
||||
q.send('hi')
|
||||
api.sleep(0)
|
||||
self.assertEquals(0, waiting(q))
|
||||
self.assertEquals(0, q.waiting())
|
||||
self.assertEquals('hi', e1.wait())
|
||||
self.assertEquals(0, waiting(q))
|
||||
|
||||
|
Reference in New Issue
Block a user