Files
deb-python-txaio/test/test_batched_timers_tx.py

195 lines
6.0 KiB
Python

###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Crossbar.io Technologies GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
import txaio
from txaio.testutil import replace_loop
# XXX it would be nice to unify the _tx versus _aio versions of all
# these tests ...
def test_batched_successful_call(framework_tx):
'''
'''
from twisted.internet.task import Clock
new_loop = Clock()
calls = []
with replace_loop(new_loop):
def foo(*args, **kw):
calls.append((args, kw))
batched = txaio.make_batched_timer(5)
# add 3 calls: first 2 should be in the same bucket, 3rd in
# another bucket
batched.call_later(5.1, foo, "first call")
batched.call_later(9.9, foo, "second call")
batched.call_later(10.1, foo, "third call")
# advancing 4.9 seconds: shouldn't have expired from a bucket
new_loop.advance(4.9)
assert len(calls) == 0
# tick over past first bucket; first two calls should happen
# (the "5s -> 10s" bucket)
new_loop.advance(0.2)
assert len(calls) == 2
assert calls[0] == (("first call", ), dict())
assert calls[1] == (("second call", ), dict())
# tick into next bucket
new_loop.advance(5)
assert len(calls) == 3
assert calls[2] == (("third call", ), dict())
def test_batched_cancel(framework_tx):
'''
'''
from twisted.internet.task import Clock
new_loop = Clock()
calls = []
def foo(*args, **kw):
calls.append((args, kw))
with replace_loop(new_loop):
batched = txaio.make_batched_timer(1)
call = batched.call_later(2, foo, "a call")
# advance clock a bit; shouldn't have fired anything yet
new_loop.advance(1.2)
call.cancel()
# advancing clock past where we "should" get the call, if it
# were still active.
new_loop.advance(4.0)
assert len(calls) == 0
def test_batched_cancel_too_late(framework_tx):
'''
nothing bad happens if we cancel() after the callbacks
'''
from twisted.internet.task import Clock
new_loop = Clock()
calls = []
def foo(*args, **kw):
calls.append((args, kw))
with replace_loop(new_loop):
batched = txaio.make_batched_timer(1)
call = batched.call_later(2, foo, "a call")
new_loop.advance(2.1)
assert len(calls) == 1
call.cancel()
assert len(calls) == 1
new_loop.advance(1)
assert len(calls) == 1
def test_batched_chunks(framework_tx):
'''
should yield to reactor every chunk
'''
from twisted.internet.task import Clock
laters = []
class FakeClock(Clock):
def callLater(self, *args, **kw):
laters.append((args, kw))
Clock.callLater(self, *args, **kw)
new_loop = FakeClock()
calls = []
def foo(*args, **kw):
calls.append((args, kw))
with replace_loop(new_loop):
batched = txaio.make_batched_timer(1, chunk_size=2)
batched.call_later(2, foo, "call0")
batched.call_later(2, foo, "call1")
batched.call_later(2, foo, "call2")
# we have 3 calls in one bucket, so there should be just a
# single "real" delayed call outstanding
assert len(laters) == 1
# ...and this call-later should be 2 seconds from now
assert laters[0][0][0] == 2
# the chunk-size is 2, so after advancing to 2 seconds from
# now, we should have notified 2 of the callers and added
# another call-later. We're spreading these out over the
# bucket-size, so it should be at 0.5 seconds from now.
new_loop.advance(2)
new_loop.advance(1)
assert len(calls) == 3
assert len(laters) == 2
# second call-later half the interval in the future (i.e. 0.5s)
assert laters[1][0][0] == 0.5
def test_batched_chunks_with_errors(framework_tx):
'''
errors from batched calls are reported
'''
from twisted.internet.task import Clock
laters = []
class FakeClock(Clock):
def callLater(self, *args, **kw):
laters.append((args, kw))
Clock.callLater(self, *args, **kw)
new_loop = FakeClock()
calls = []
def foo(*args, **kw):
calls.append((args, kw))
def error(*args, **kw):
raise RuntimeError("sadness")
with replace_loop(new_loop):
batched = txaio.make_batched_timer(1, chunk_size=2)
batched.call_later(2, foo, "call0")
batched.call_later(2, foo, "call1")
batched.call_later(2, foo, "call2")
batched.call_later(2, error)
# notify everything, causing an error from the second batch
try:
new_loop.advance(2)
new_loop.advance(1)
assert False, "Should get exception"
except RuntimeError as e:
assert "processing call_later" in str(e)