change resolution to milliseconds, spread out pings over bucket-size

This commit is contained in:
meejah
2016-04-22 18:27:37 -06:00
parent c806f8a734
commit e40c05d6ce
5 changed files with 29 additions and 24 deletions

View File

@@ -202,7 +202,8 @@ txaio module
such that any ``.call_later`` calls done through it (instead of
via :meth:`txaio.call_later`) will be "quantized" into buckets and
processed in ``chunk_size`` batches "near" the time they are
supposed to fire.
supposed to fire. ``seconds_per_bucket`` is only accurate to
"milliseconds".
When there are "tens of thousands" of outstanding timers, CPU
usage can become a problem -- if the accuracy of the timers isn't

View File

@@ -149,10 +149,12 @@ def test_batched_chunks(framework_tx):
# have put one more delayed call (at "0 seconds in the
# future") into the reactor after the first batch of 2 calls
# was notified.
new_loop.advance(2.1)
new_loop.advance(2)
new_loop.advance(1)
assert len(calls) == 3
assert len(laters) == 2
assert laters[1][0][0] == 0 # second call-later 0 seconds in future
# second call-later half the interval in the future (i.e. 1s)
assert laters[1][0][0] == 1.0
def test_batched_chunks_with_errors(framework_tx):
@@ -185,7 +187,8 @@ def test_batched_chunks_with_errors(framework_tx):
# notify everything, causing an error from the second batch
try:
new_loop.advance(2.1)
new_loop.advance(2)
new_loop.advance(1)
assert False, "Should get exception"
except RuntimeError as e:
assert "processing call_later" in str(e)

View File

@@ -32,11 +32,13 @@ class _BatchedTimer(IBatchedTimer):
:meth:`txaio.make_batched_timer` and that is the only way they
should be instantiated. You may depend on methods from the
interface class only (:class:`txaio.IBatchedTimer`)
**NOTE** that the times are in milliseconds in this class!
"""
def __init__(self, bucket_seconds, chunk_size,
def __init__(self, bucket_milliseconds, chunk_size,
seconds_provider, delayed_call_creator):
self._bucket_seconds = bucket_seconds
self._bucket_milliseconds = float(bucket_milliseconds)
self._chunk_size = chunk_size
self._get_seconds = seconds_provider
self._create_delayed_call = delayed_call_creator
@@ -46,20 +48,16 @@ class _BatchedTimer(IBatchedTimer):
"""
IBatchedTimer API
"""
# "quantize" the delay to the nearest bucket (note: always
# doing floor essentially, so e.g. 29.9 with 5s buckets still
# gets you "25s from now") -- arguably would be better to
# "properly round"? Minimizes average error, but some delays
# get longer ... hmm.
real_time = int(self._get_seconds() + delay)
real_time -= (real_time % self._bucket_seconds)
# "quantize" the delay to the nearest bucket
real_time = int(self._get_seconds() + delay) * 1000
real_time -= int(real_time % self._bucket_milliseconds)
call = _BatchedCall(self, real_time, lambda: func(*args, **kwargs))
try:
self._buckets[real_time][1].append(call)
except KeyError:
# new bucket; need to add "actual" underlying IDelayedCall
delayed_call = self._create_delayed_call(
real_time,
(real_time / 1000.0) - self._get_seconds(),
self._notify_bucket, real_time,
)
self._buckets[real_time] = (delayed_call, [call])
@@ -75,7 +73,7 @@ class _BatchedTimer(IBatchedTimer):
del self._buckets[real_time]
errors = []
def notify_one_chunk(calls, chunk_size):
def notify_one_chunk(calls, chunk_size, chunk_delay_ms):
for call in calls[:chunk_size]:
try:
call()
@@ -84,8 +82,8 @@ class _BatchedTimer(IBatchedTimer):
calls = calls[chunk_size:]
if calls:
self._create_delayed_call(
0,
lambda: notify_one_chunk(calls, chunk_size),
chunk_delay_ms / 1000.0,
notify_one_chunk, calls, chunk_size, chunk_delay_ms,
)
else:
# done all calls; make sure there were no errors
@@ -94,7 +92,8 @@ class _BatchedTimer(IBatchedTimer):
for e in errors:
msg += u"{}\n".format(e)
raise RuntimeError(msg)
notify_one_chunk(calls, self._chunk_size)
delay_ms = self._bucket_milliseconds / (float(len(calls)) / self._chunk_size)
notify_one_chunk(calls, self._chunk_size, delay_ms)
def _remove_call(self, real_time, call):
"""

View File

@@ -321,8 +321,9 @@ def make_batched_timer(bucket_seconds, chunk_size=100):
:class:`txaio.IBatchedTimer`.
:param bucket_seconds: the number of seconds in each bucket. That
is, a value of 5 means that any timeout within a 5 second window
will be in the same bucket, and get notified at the same time.
is, a value of 5 means that any timeout within a 5 second
window will be in the same bucket, and get notified at the
same time. This is only accurate to "milliseconds".
:param chunk_size: when "doing" the callbacks in a particular
bucket, this controls how many we do at once before yielding to
@@ -330,7 +331,7 @@ def make_batched_timer(bucket_seconds, chunk_size=100):
"""
get_seconds = config.loop.time
return _BatchedTimer(
bucket_seconds, chunk_size,
bucket_seconds * 1000.0, chunk_size,
seconds_provider=get_seconds,
delayed_call_creator=call_later,
)

View File

@@ -422,8 +422,9 @@ def make_batched_timer(bucket_seconds, chunk_size=100):
:class:`txaio.IBatchedTimer`.
:param bucket_seconds: the number of seconds in each bucket. That
is, a value of 5 means that any timeout within a 5 second window
will be in the same bucket, and get notified at the same time.
is, a value of 5 means that any timeout within a 5 second
window will be in the same bucket, and get notified at the
same time. This is only accurate to "milliseconds".
:param chunk_size: when "doing" the callbacks in a particular
bucket, this controls how many we do at once before yielding to
@@ -436,7 +437,7 @@ def make_batched_timer(bucket_seconds, chunk_size=100):
return clock.callLater(delay, fun, *args, **kwargs)
return _BatchedTimer(
bucket_seconds, chunk_size,
bucket_seconds * 1000.0, chunk_size,
seconds_provider=get_seconds,
delayed_call_creator=create_delayed_call,
)