Merge pull request #654 from meejah/issue642-batched-timers

batched timers
This commit is contained in:
Tobias Oberstein 2016-04-29 07:23:21 +02:00
commit 99f723f64b
5 changed files with 59 additions and 43 deletions

View File

@ -28,6 +28,7 @@ from __future__ import absolute_import, print_function
import os
import unittest2 as unittest
from txaio.testutil import replace_loop
if os.environ.get('USE_TWISTED', False):
from mock import patch
@ -121,14 +122,14 @@ else:
context object that is passed (as ssl) to the __init__ method of
ApplicationRunner.
'''
loop = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
ssl = {}
runner = ApplicationRunner(u'ws://127.0.0.1:8080/ws', u'realm',
ssl=ssl)
runner.run('_unused_')
self.assertIs(ssl, loop.create_connection.call_args[1]['ssl'])
with replace_loop(Mock()) as loop:
with patch.object(asyncio, 'get_event_loop', return_value=loop):
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
ssl = {}
runner = ApplicationRunner(u'ws://127.0.0.1:8080/ws', u'realm',
ssl=ssl)
runner.run('_unused_')
self.assertIs(ssl, loop.create_connection.call_args[1]['ssl'])
def test_omitted_SSLContext_insecure(self):
'''
@ -136,12 +137,12 @@ else:
if no ssl argument is passed to the __init__ method of
ApplicationRunner and the websocket URL starts with "ws:".
'''
loop = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
runner = ApplicationRunner(u'ws://127.0.0.1:8080/ws', u'realm')
runner.run('_unused_')
self.assertIs(False, loop.create_connection.call_args[1]['ssl'])
with replace_loop(Mock()) as loop:
with patch.object(asyncio, 'get_event_loop', return_value=loop):
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
runner = ApplicationRunner(u'ws://127.0.0.1:8080/ws', u'realm')
runner.run('_unused_')
self.assertIs(False, loop.create_connection.call_args[1]['ssl'])
def test_omitted_SSLContext_secure(self):
'''
@ -149,21 +150,20 @@ else:
if no ssl argument is passed to the __init__ method of
ApplicationRunner and the websocket URL starts with "wss:".
'''
loop = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
runner = ApplicationRunner(u'wss://127.0.0.1:8080/wss', u'realm')
runner.run('_unused_')
self.assertIs(True, loop.create_connection.call_args[1]['ssl'])
with replace_loop(Mock()) as loop:
with patch.object(asyncio, 'get_event_loop', return_value=loop):
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
runner = ApplicationRunner(u'wss://127.0.0.1:8080/wss', u'realm')
runner.run(self.fail)
self.assertIs(True, loop.create_connection.call_args[1]['ssl'])
def test_conflict_SSL_True_with_ws_url(self):
'''
ApplicationRunner must raise an exception if given an ssl value of True
but only a "ws:" URL.
'''
loop = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
with replace_loop(Mock()) as loop:
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
runner = ApplicationRunner(u'ws://127.0.0.1:8080/wss', u'realm',
ssl=True)
error = ('^ssl argument value passed to ApplicationRunner '
@ -194,9 +194,8 @@ else:
else:
context = ssl.create_default_context()
loop = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
with replace_loop(Mock()) as loop:
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
runner = ApplicationRunner(u'ws://127.0.0.1:8080/wss', u'realm',
ssl=context)
error = ('^ssl argument value passed to ApplicationRunner '

View File

@ -469,6 +469,14 @@ class WebSocketProtocol(object):
"""
log = txaio.make_logger()
ping_timer = txaio.make_batched_timer(
bucket_seconds=0.200,
chunk_size=1000,
)
handshake_timer = txaio.make_batched_timer(
bucket_seconds=0.200,
chunk_size=1000,
)
def __init__(self):
#: a Future/Deferred that fires when we hit STATE_CLOSED
@ -657,11 +665,10 @@ class WebSocketProtocol(object):
# When we are a client, the server should drop the TCP
# If that doesn't happen, we do. And that will set wasClean = False.
if self.serverConnectionDropTimeout > 0:
call = txaio.call_later(
self.serverConnectionDropTimeoutCall = txaio.call_later(
self.serverConnectionDropTimeout,
self.onServerConnectionDropTimeout,
)
self.serverConnectionDropTimeoutCall = call
elif self.state == WebSocketProtocol.STATE_OPEN:
# The peer initiates a closing handshake, so we reply
@ -969,14 +976,17 @@ class WebSocketProtocol(object):
self.openHandshakeTimeoutCall = None
self.closeHandshakeTimeoutCall = None
# set opening handshake timeout handler
if self.openHandshakeTimeout > 0:
self.openHandshakeTimeoutCall = txaio.call_later(self.openHandshakeTimeout, self.onOpenHandshakeTimeout)
self.autoPingTimeoutCall = None
self.autoPingPending = None
self.autoPingPendingCall = None
# set opening handshake timeout handler
if self.openHandshakeTimeout > 0:
self.openHandshakeTimeoutCall = self.handshake_timer.call_later(
self.openHandshakeTimeout,
self.onOpenHandshakeTimeout,
)
def _connectionLost(self, reason):
"""
This is called by network framework when a transport connection was
@ -1643,7 +1653,10 @@ class WebSocketProtocol(object):
self.autoPingTimeoutCall = None
if self.autoPingInterval:
self.autoPingPendingCall = txaio.call_later(self.autoPingInterval, self._sendAutoPing)
self.autoPingPendingCall = self.ping_timer.call_later(
self.autoPingInterval,
self._sendAutoPing,
)
else:
self.log.debug("Auto ping/pong: received non-pending pong")
except:
@ -1782,7 +1795,10 @@ class WebSocketProtocol(object):
"Expecting ping in {seconds} seconds for auto-ping/pong",
seconds=self.autoPingTimeout,
)
self.autoPingTimeoutCall = txaio.call_later(self.autoPingTimeout, self.onAutoPingTimeout)
self.autoPingTimeoutCall = self.ping_timer.call_later(
self.autoPingTimeout,
self.onAutoPingTimeout,
)
def sendPong(self, payload=None):
"""
@ -2820,7 +2836,10 @@ class WebSocketServerProtocol(WebSocketProtocol):
# automatic ping/pong
#
if self.autoPingInterval:
self.autoPingPendingCall = txaio.call_later(self.autoPingInterval, self._sendAutoPing)
self.autoPingPendingCall = self.ping_timer.call_later(
self.autoPingInterval,
self._sendAutoPing,
)
# fire handler on derived class
#
@ -3627,7 +3646,10 @@ class WebSocketClientProtocol(WebSocketProtocol):
# automatic ping/pong
#
if self.autoPingInterval:
self.autoPingPendingCall = txaio.call_later(self.autoPingInterval, self._sendAutoPing)
self.autoPingPendingCall = self.ping_timer.call_later(
self.autoPingInterval,
self._sendAutoPing,
)
# we handle this symmetrical to server-side .. that is, give the
# client a chance to bail out .. i.e. on no subprotocol selected

View File

@ -124,6 +124,7 @@ class WebSocketServerProtocolTests(unittest.TestCase):
def test_auto_ping(self):
proto = Mock()
proto._get_seconds = Mock(return_value=1)
self.protocol.autoPingInterval = 1
self.protocol.websocket_protocols = [proto]
self.protocol.websocket_extensions = []

View File

@ -213,8 +213,6 @@ if os.environ.get('USE_TWISTED', False):
# we should have scheduled an autoPing
self.assertEqual(1, len(reactor.calls))
self.assertEqual(self.proto._sendAutoPing, reactor.calls[0].func)
# ^^ un-unit-testy to assert on internal method?
# advance past first auto-ping timeout
reactor.advance(5)
@ -239,7 +237,6 @@ if os.environ.get('USE_TWISTED', False):
# Because we have autoPingTimeout there should be
# another delayed-called created now
self.assertEqual(1, len(reactor.calls))
self.assertEqual(self.proto.onAutoPingTimeout, reactor.calls[0].func)
self.assertNotEqual(self.proto.state, self.proto.STATE_CLOSED)
# ...which we'll now cause to trigger, aborting the connection
@ -262,8 +259,6 @@ if os.environ.get('USE_TWISTED', False):
# we should have scheduled an autoPing
self.assertEqual(1, len(reactor.calls))
self.assertEqual(self.proto._sendAutoPing, reactor.calls[0].func)
# ^^ un-unit-testy to assert on internal method?
# advance past first auto-ping timeout
reactor.advance(5)
@ -271,7 +266,6 @@ if os.environ.get('USE_TWISTED', False):
# should have an auto-ping timeout scheduled, and we
# save it for later (to check it got cancelled)
self.assertEqual(1, len(reactor.calls))
self.assertEqual(self.proto.onAutoPingTimeout, reactor.calls[0].func)
timeout_call = reactor.calls[0]
# elsewhere we check that we actually send an opcode-9

View File

@ -171,7 +171,7 @@ setup(
platforms='Any',
install_requires=[
'six>=1.10.0', # MIT license
'txaio>=2.3.1', # MIT license
'txaio>=2.5.1', # MIT license
],
extras_require={
'all': extras_require_all,