From ed81db742f75ddc8be2d726ee2e1373d4aacf4ad Mon Sep 17 00:00:00 2001 From: meejah Date: Thu, 6 Apr 2017 01:25:00 -0600 Subject: [PATCH] batched_timer support for loop= --- test/test_batched_timers_aio.py | 27 +++++++++++++++++++++++++-- txaio/_common.py | 3 ++- txaio/aio.py | 19 ++++++++++++++++--- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/test/test_batched_timers_aio.py b/test/test_batched_timers_aio.py index 8c94783..d9dab7b 100644 --- a/test/test_batched_timers_aio.py +++ b/test/test_batched_timers_aio.py @@ -77,6 +77,31 @@ def test_batched_successful_call(framework_aio): assert calls[2] == (("third call", ), dict()) +def test_batched_successful_call_explicit_loop(framework_aio): + ''' + batched calls really happen in batches + ''' + # Trollius doesn't come with this, so won't work on py2 + pytest.importorskip('asyncio.test_utils') + from asyncio.test_utils import TestLoop + + def time_gen(): + yield + yield + new_loop = TestLoop(time_gen) + calls = [] + + def foo(*args, **kw): + calls.append((args, kw)) + + batched = txaio.make_batched_timer(5, loop=new_loop) + + batched.call_later(1, foo, "first call") + new_loop.advance_time(2.0) + new_loop._run_once() + assert len(calls) == 1 + + def test_batched_cancel(framework_aio): ''' we can cancel uncalled call_laters @@ -88,7 +113,6 @@ def test_batched_cancel(framework_aio): def time_gen(): yield yield - yield new_loop = TestLoop(time_gen) calls = [] @@ -123,7 +147,6 @@ def test_batched_cancel_too_late(framework_aio): def time_gen(): yield yield - yield new_loop = TestLoop(time_gen) calls = [] diff --git a/txaio/_common.py b/txaio/_common.py index 7a61d1f..5a942a4 100644 --- a/txaio/_common.py +++ b/txaio/_common.py @@ -38,7 +38,7 @@ class _BatchedTimer(IBatchedTimer): """ def __init__(self, bucket_milliseconds, chunk_size, - seconds_provider, delayed_call_creator): + seconds_provider, delayed_call_creator, loop=None): if bucket_milliseconds <= 0.0: raise ValueError( "bucket_milliseconds must be > 0.0" @@ -48,6 +48,7 @@ class _BatchedTimer(IBatchedTimer): self._get_seconds = seconds_provider self._create_delayed_call = delayed_call_creator self._buckets = dict() # real seconds -> (IDelayedCall, list) + self._loop = loop def call_later(self, delay, func, *args, **kwargs): """ diff --git a/txaio/aio.py b/txaio/aio.py index 000de05..9111e80 100644 --- a/txaio/aio.py +++ b/txaio/aio.py @@ -317,7 +317,7 @@ def call_later(delay, fun, *args, **kwargs): return config.loop.call_later(delay, real_call) -def make_batched_timer(bucket_seconds, chunk_size=100): +def make_batched_timer(bucket_seconds, chunk_size=100, loop=None): """ Creates and returns an object implementing :class:`txaio.IBatchedTimer`. @@ -332,13 +332,26 @@ def make_batched_timer(bucket_seconds, chunk_size=100): the reactor. """ + # XXX this duplicates code from 'call_later', but I don't see an + # alternative + if loop is not None: + + def _create_call_later(delay, fun, *args, **kwargs): + real_call = functools.partial(fun, *args, **kwargs) + return loop.call_later(delay, real_call) + the_loop = loop + + else: + _create_call_later = call_later + the_loop = config.loop + def get_seconds(): - return config.loop.time() + return the_loop.time() return _BatchedTimer( bucket_seconds * 1000.0, chunk_size, seconds_provider=get_seconds, - delayed_call_creator=call_later, + delayed_call_creator=_create_call_later, )