Fix for the bug in Channel that Gregory Holt discovered, two new tests added to check for it. Also, changed the default exception-squelching behavior of RunningProcSet.waitall because it was squelching assertions and timeouts too which made it impossible to test.
This commit is contained in:
@@ -514,8 +514,6 @@ class Channel(object):
|
||||
if self._waiters:
|
||||
api.get_hub().schedule_call_global(0, self._do_switch)
|
||||
else:
|
||||
if self._waiters and self._senders:
|
||||
api.sleep(0)
|
||||
self.items.append((result, exc))
|
||||
# note that send() does not work well with timeouts. if your timeout fires
|
||||
# after this point, the item will remain in the queue
|
||||
|
@@ -726,7 +726,7 @@ class RunningProcSet(object):
|
||||
self.add(p)
|
||||
return p
|
||||
|
||||
def waitall(self, trap_errors=True):
|
||||
def waitall(self, trap_errors=False):
|
||||
while self.procs:
|
||||
waitall(self.procs, trap_errors=trap_errors)
|
||||
|
||||
|
@@ -232,6 +232,26 @@ class TestChannel(LimitedTestCase):
|
||||
api.sleep(0)
|
||||
self.assertEqual(['waiting', 'sending hello', 'hello', 'sending world', 'world', 'sent world'], events)
|
||||
|
||||
def test_waiters(self):
|
||||
c = coros.Channel()
|
||||
w1 = coros.execute(c.wait)
|
||||
w2 = coros.execute(c.wait)
|
||||
w3 = coros.execute(c.wait)
|
||||
api.sleep(0)
|
||||
self.assertEquals(c.waiting(), 3)
|
||||
s1 = coros.execute(c.send, 1)
|
||||
s2 = coros.execute(c.send, 2)
|
||||
s3 = coros.execute(c.send, 3)
|
||||
api.sleep(0) # this gets all the sends into a waiting state
|
||||
self.assertEquals(c.waiting(), 0)
|
||||
|
||||
s1.wait()
|
||||
s2.wait()
|
||||
s3.wait()
|
||||
self.assertEquals(w1.wait(), 1)
|
||||
self.assertEquals(w2.wait(), 2)
|
||||
self.assertEquals(w3.wait(), 3)
|
||||
|
||||
|
||||
if __name__=='__main__':
|
||||
main()
|
||||
|
@@ -267,6 +267,32 @@ class PoolBasicTests(LimitedTestCase):
|
||||
evt = p.execute(lambda a: ('foo', a), 1)
|
||||
self.assertEqual(evt.wait(), ('foo', 1))
|
||||
|
||||
def test_with_intpool(self):
|
||||
from eventlet import pools
|
||||
class IntPool(pools.Pool):
|
||||
def create(self):
|
||||
self.current_integer = getattr(self, 'current_integer', 0) + 1
|
||||
return self.current_integer
|
||||
|
||||
def subtest(intpool_size, pool_size, num_executes):
|
||||
def run(int_pool):
|
||||
token = int_pool.get()
|
||||
api.sleep(0.0001)
|
||||
int_pool.put(token)
|
||||
return token
|
||||
|
||||
int_pool = IntPool(max_size=intpool_size)
|
||||
pool = self.klass(max_size=pool_size)
|
||||
for ix in xrange(num_executes):
|
||||
pool.execute(run, int_pool)
|
||||
pool.waitall()
|
||||
|
||||
subtest(4, 7, 7)
|
||||
subtest(50, 75, 100)
|
||||
for isize in (20, 30, 40, 50):
|
||||
for psize in (25, 35, 50):
|
||||
subtest(isize, psize, psize)
|
||||
|
||||
|
||||
if __name__=='__main__':
|
||||
main()
|
||||
|
Reference in New Issue
Block a user