From e40c05d6ce555d4753a12b275d9af95d13df8823 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 22 Apr 2016 18:27:37 -0600 Subject: [PATCH] change resolution to milliseconds, spread out pings over bucket-size --- docs/api.rst | 3 ++- test/test_batched_timers_tx.py | 9 ++++++--- txaio/_common.py | 27 +++++++++++++-------------- txaio/aio.py | 7 ++++--- txaio/tx.py | 7 ++++--- 5 files changed, 29 insertions(+), 24 deletions(-) diff --git a/docs/api.rst b/docs/api.rst index 826c0d8..61db304 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -202,7 +202,8 @@ txaio module such that any ``.call_later`` calls done through it (instead of via :meth:`txaio.call_later`) will be "quantized" into buckets and processed in ``chunk_size`` batches "near" the time they are - supposed to fire. + supposed to fire. ``seconds_per_bucket`` is only accurate to + "milliseconds". When there are "tens of thousands" of outstanding timers, CPU usage can become a problem -- if the accuracy of the timers isn't diff --git a/test/test_batched_timers_tx.py b/test/test_batched_timers_tx.py index dad5493..368c528 100644 --- a/test/test_batched_timers_tx.py +++ b/test/test_batched_timers_tx.py @@ -149,10 +149,12 @@ def test_batched_chunks(framework_tx): # have put one more delayed call (at "0 seconds in the # future") into the reactor after the first batch of 2 calls # was notified. - new_loop.advance(2.1) + new_loop.advance(2) + new_loop.advance(1) assert len(calls) == 3 assert len(laters) == 2 - assert laters[1][0][0] == 0 # second call-later 0 seconds in future + # second call-later half the interval in the future (i.e. 1s) + assert laters[1][0][0] == 1.0 def test_batched_chunks_with_errors(framework_tx): @@ -185,7 +187,8 @@ def test_batched_chunks_with_errors(framework_tx): # notify everything, causing an error from the second batch try: - new_loop.advance(2.1) + new_loop.advance(2) + new_loop.advance(1) assert False, "Should get exception" except RuntimeError as e: assert "processing call_later" in str(e) diff --git a/txaio/_common.py b/txaio/_common.py index 9e4aca5..21b5099 100644 --- a/txaio/_common.py +++ b/txaio/_common.py @@ -32,11 +32,13 @@ class _BatchedTimer(IBatchedTimer): :meth:`txaio.make_batched_timer` and that is the only way they should be instantiated. You may depend on methods from the interface class only (:class:`txaio.IBatchedTimer`) + + **NOTE** that the times are in milliseconds in this class! """ - def __init__(self, bucket_seconds, chunk_size, + def __init__(self, bucket_milliseconds, chunk_size, seconds_provider, delayed_call_creator): - self._bucket_seconds = bucket_seconds + self._bucket_milliseconds = float(bucket_milliseconds) self._chunk_size = chunk_size self._get_seconds = seconds_provider self._create_delayed_call = delayed_call_creator @@ -46,20 +48,16 @@ class _BatchedTimer(IBatchedTimer): """ IBatchedTimer API """ - # "quantize" the delay to the nearest bucket (note: always - # doing floor essentially, so e.g. 29.9 with 5s buckets still - # gets you "25s from now") -- arguably would be better to - # "properly round"? Minimizes average error, but some delays - # get longer ... hmm. - real_time = int(self._get_seconds() + delay) - real_time -= (real_time % self._bucket_seconds) + # "quantize" the delay to the nearest bucket + real_time = int(self._get_seconds() + delay) * 1000 + real_time -= int(real_time % self._bucket_milliseconds) call = _BatchedCall(self, real_time, lambda: func(*args, **kwargs)) try: self._buckets[real_time][1].append(call) except KeyError: # new bucket; need to add "actual" underlying IDelayedCall delayed_call = self._create_delayed_call( - real_time, + (real_time / 1000.0) - self._get_seconds(), self._notify_bucket, real_time, ) self._buckets[real_time] = (delayed_call, [call]) @@ -75,7 +73,7 @@ class _BatchedTimer(IBatchedTimer): del self._buckets[real_time] errors = [] - def notify_one_chunk(calls, chunk_size): + def notify_one_chunk(calls, chunk_size, chunk_delay_ms): for call in calls[:chunk_size]: try: call() @@ -84,8 +82,8 @@ class _BatchedTimer(IBatchedTimer): calls = calls[chunk_size:] if calls: self._create_delayed_call( - 0, - lambda: notify_one_chunk(calls, chunk_size), + chunk_delay_ms / 1000.0, + notify_one_chunk, calls, chunk_size, chunk_delay_ms, ) else: # done all calls; make sure there were no errors @@ -94,7 +92,8 @@ class _BatchedTimer(IBatchedTimer): for e in errors: msg += u"{}\n".format(e) raise RuntimeError(msg) - notify_one_chunk(calls, self._chunk_size) + delay_ms = self._bucket_milliseconds / (float(len(calls)) / self._chunk_size) + notify_one_chunk(calls, self._chunk_size, delay_ms) def _remove_call(self, real_time, call): """ diff --git a/txaio/aio.py b/txaio/aio.py index ba92da9..e9d1dba 100644 --- a/txaio/aio.py +++ b/txaio/aio.py @@ -321,8 +321,9 @@ def make_batched_timer(bucket_seconds, chunk_size=100): :class:`txaio.IBatchedTimer`. :param bucket_seconds: the number of seconds in each bucket. That - is, a value of 5 means that any timeout within a 5 second window - will be in the same bucket, and get notified at the same time. + is, a value of 5 means that any timeout within a 5 second + window will be in the same bucket, and get notified at the + same time. This is only accurate to "milliseconds". :param chunk_size: when "doing" the callbacks in a particular bucket, this controls how many we do at once before yielding to @@ -330,7 +331,7 @@ def make_batched_timer(bucket_seconds, chunk_size=100): """ get_seconds = config.loop.time return _BatchedTimer( - bucket_seconds, chunk_size, + bucket_seconds * 1000.0, chunk_size, seconds_provider=get_seconds, delayed_call_creator=call_later, ) diff --git a/txaio/tx.py b/txaio/tx.py index 5b8d15d..e3a1476 100644 --- a/txaio/tx.py +++ b/txaio/tx.py @@ -422,8 +422,9 @@ def make_batched_timer(bucket_seconds, chunk_size=100): :class:`txaio.IBatchedTimer`. :param bucket_seconds: the number of seconds in each bucket. That - is, a value of 5 means that any timeout within a 5 second window - will be in the same bucket, and get notified at the same time. + is, a value of 5 means that any timeout within a 5 second + window will be in the same bucket, and get notified at the + same time. This is only accurate to "milliseconds". :param chunk_size: when "doing" the callbacks in a particular bucket, this controls how many we do at once before yielding to @@ -436,7 +437,7 @@ def make_batched_timer(bucket_seconds, chunk_size=100): return clock.callLater(delay, fun, *args, **kwargs) return _BatchedTimer( - bucket_seconds, chunk_size, + bucket_seconds * 1000.0, chunk_size, seconds_provider=get_seconds, delayed_call_creator=create_delayed_call, )