216 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			216 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| ###############################################################################
 | |
| #
 | |
| # The MIT License (MIT)
 | |
| #
 | |
| # Copyright (c) Crossbar.io Technologies GmbH
 | |
| #
 | |
| # Permission is hereby granted, free of charge, to any person obtaining a copy
 | |
| # of this software and associated documentation files (the "Software"), to deal
 | |
| # in the Software without restriction, including without limitation the rights
 | |
| # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 | |
| # copies of the Software, and to permit persons to whom the Software is
 | |
| # furnished to do so, subject to the following conditions:
 | |
| #
 | |
| # The above copyright notice and this permission notice shall be included in
 | |
| # all copies or substantial portions of the Software.
 | |
| #
 | |
| # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | |
| # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | |
| # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 | |
| # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 | |
| # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 | |
| # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 | |
| # THE SOFTWARE.
 | |
| #
 | |
| ###############################################################################
 | |
| 
 | |
| import txaio
 | |
| from txaio.testutil import replace_loop
 | |
| 
 | |
| 
 | |
| # XXX it would be nice to unify the _tx versus _aio versions of all
 | |
| # these tests ...
 | |
| def test_batched_successful_call(framework_tx):
 | |
|     '''
 | |
|     '''
 | |
|     from twisted.internet.task import Clock
 | |
|     new_loop = Clock()
 | |
|     calls = []
 | |
|     with replace_loop(new_loop):
 | |
|         def foo(*args, **kw):
 | |
|             calls.append((args, kw))
 | |
| 
 | |
|         batched = txaio.make_batched_timer(5)
 | |
| 
 | |
|         # add 3 calls: first 2 should be in the same bucket, 3rd in
 | |
|         # another bucket
 | |
|         batched.call_later(5.1, foo, "first call")
 | |
|         batched.call_later(9.9, foo, "second call")
 | |
|         batched.call_later(10.1, foo, "third call")
 | |
| 
 | |
|         # advancing 4.9 seconds: shouldn't have expired from a bucket
 | |
|         new_loop.advance(4.9)
 | |
|         assert len(calls) == 0
 | |
| 
 | |
|         # tick over past first bucket; first two calls should happen
 | |
|         # (the "5s -> 10s" bucket)
 | |
|         new_loop.advance(0.2)
 | |
|         assert len(calls) == 2
 | |
|         assert calls[0] == (("first call", ), dict())
 | |
|         assert calls[1] == (("second call", ), dict())
 | |
| 
 | |
|         # tick into next bucket
 | |
|         new_loop.advance(5)
 | |
|         assert len(calls) == 3
 | |
|         assert calls[2] == (("third call", ), dict())
 | |
| 
 | |
| 
 | |
| def test_batched_cancel(framework_tx):
 | |
|     '''
 | |
|     '''
 | |
| 
 | |
|     from twisted.internet.task import Clock
 | |
|     new_loop = Clock()
 | |
|     calls = []
 | |
| 
 | |
|     def foo(*args, **kw):
 | |
|         calls.append((args, kw))
 | |
| 
 | |
|     with replace_loop(new_loop):
 | |
|         batched = txaio.make_batched_timer(1)
 | |
|         call = batched.call_later(2, foo, "a call")
 | |
| 
 | |
|         # advance clock a bit; shouldn't have fired anything yet
 | |
|         new_loop.advance(1.2)
 | |
| 
 | |
|         call.cancel()
 | |
| 
 | |
|         # advancing clock past where we "should" get the call, if it
 | |
|         # were still active.
 | |
|         new_loop.advance(4.0)
 | |
|         assert len(calls) == 0
 | |
| 
 | |
| 
 | |
| def test_batched_cancel_too_late(framework_tx):
 | |
|     '''
 | |
|     nothing bad happens if we cancel() after the callbacks
 | |
|     '''
 | |
| 
 | |
|     from twisted.internet.task import Clock
 | |
|     new_loop = Clock()
 | |
|     calls = []
 | |
| 
 | |
|     def foo(*args, **kw):
 | |
|         calls.append((args, kw))
 | |
| 
 | |
|     with replace_loop(new_loop):
 | |
|         batched = txaio.make_batched_timer(1)
 | |
|         call = batched.call_later(2, foo, "a call")
 | |
| 
 | |
|         new_loop.advance(2.1)
 | |
|         assert len(calls) == 1
 | |
|         call.cancel()
 | |
|         assert len(calls) == 1
 | |
|         new_loop.advance(1)
 | |
|         assert len(calls) == 1
 | |
| 
 | |
| 
 | |
| def test_batched_chunks(framework_tx):
 | |
|     '''
 | |
|     should yield to reactor every chunk
 | |
|     '''
 | |
| 
 | |
|     from twisted.internet.task import Clock
 | |
|     laters = []
 | |
| 
 | |
|     class FakeClock(Clock):
 | |
|         def callLater(self, *args, **kw):  # noqa
 | |
|             laters.append((args, kw))
 | |
|             Clock.callLater(self, *args, **kw)
 | |
|     new_loop = FakeClock()
 | |
|     calls = []
 | |
| 
 | |
|     def foo(*args, **kw):
 | |
|         calls.append((args, kw))
 | |
| 
 | |
|     with replace_loop(new_loop):
 | |
|         batched = txaio.make_batched_timer(1, chunk_size=2)
 | |
|         batched.call_later(2, foo, "call0")
 | |
|         batched.call_later(2, foo, "call1")
 | |
|         batched.call_later(2, foo, "call2")
 | |
| 
 | |
|         # we have 3 calls in one bucket, so there should be just a
 | |
|         # single "real" delayed call outstanding
 | |
|         assert len(laters) == 1
 | |
|         # ...and this call-later should be 2 seconds from now
 | |
|         assert laters[0][0][0] == 2
 | |
| 
 | |
|         # the chunk-size is 2, so after advancing to 2 seconds from
 | |
|         # now, we should have notified 2 of the callers and added
 | |
|         # another call-later. We're spreading these out over the
 | |
|         # bucket-size, so it should be at 0.5 seconds from now.
 | |
|         new_loop.advance(2)
 | |
|         new_loop.advance(1)
 | |
|         assert len(calls) == 3
 | |
|         assert len(laters) == 2
 | |
|         # second call-later half the interval in the future (i.e. 0.5s)
 | |
|         assert laters[1][0][0] == 0.5
 | |
| 
 | |
| 
 | |
| def test_batched_chunks_with_errors(framework_tx):
 | |
|     '''
 | |
|     errors from batched calls are reported
 | |
|     '''
 | |
| 
 | |
|     from twisted.internet.task import Clock
 | |
|     laters = []
 | |
| 
 | |
|     class FakeClock(Clock):
 | |
|         def callLater(self, *args, **kw):  # noqa
 | |
|             laters.append((args, kw))
 | |
|             Clock.callLater(self, *args, **kw)
 | |
|     new_loop = FakeClock()
 | |
|     calls = []
 | |
| 
 | |
|     def foo(*args, **kw):
 | |
|         calls.append((args, kw))
 | |
| 
 | |
|     def error(*args, **kw):
 | |
|         raise RuntimeError("sadness")
 | |
| 
 | |
|     with replace_loop(new_loop):
 | |
|         batched = txaio.make_batched_timer(1, chunk_size=2)
 | |
|         batched.call_later(2, foo, "call0")
 | |
|         batched.call_later(2, foo, "call1")
 | |
|         batched.call_later(2, foo, "call2")
 | |
|         batched.call_later(2, error)
 | |
| 
 | |
|         # notify everything, causing an error from the second batch
 | |
|         try:
 | |
|             new_loop.advance(2)
 | |
|             new_loop.advance(1)
 | |
|             assert False, "Should get exception"
 | |
|         except RuntimeError as e:
 | |
|             assert "processing call_later" in str(e)
 | |
| 
 | |
| 
 | |
| def test_batched_close_to_now(framework_tx):
 | |
|     '''
 | |
|     if our current time is fractional, and we make a call_later with a
 | |
|     tiny delay that's still within the same second, we'll produce a
 | |
|     negative call_later when adding a bucket; see issue #81
 | |
|     '''
 | |
|     from twisted.internet.task import Clock
 | |
| 
 | |
|     class FakeClock(Clock):
 | |
|         def callLater(self, delay, *args, **kw):  # noqa
 | |
|             # 'real' reactors do this, but Clock doesn't assert on
 | |
|             # this.
 | |
|             assert delay >= 0
 | |
|             return Clock.callLater(self, delay, *args, **kw)
 | |
| 
 | |
|     with replace_loop(FakeClock()) as clock:
 | |
|         clock.advance(0.5)
 | |
|         batched = txaio.make_batched_timer(1, chunk_size=2)
 | |
|         batched.call_later(0.1, lambda: None)
 | 
