From c54d9c35a8fc6ac92709d13798bf92b66d78f1d9 Mon Sep 17 00:00:00 2001 From: meejah Date: Wed, 20 May 2015 15:57:33 -0600 Subject: [PATCH] ApplicationRunner support to wait until Goodbye has gone out We add asyncio and Twisted ApplicationRunner support to properly wait until the Goodbye message has gone out. This makes the leave() method return a Deferred that can be waited on, if you want, for the underlying session to actually hit STATE_CLOSED --- autobahn/asyncio/wamp.py | 12 ++++++++++-- autobahn/twisted/test/test_application_runner.py | 4 ++-- autobahn/twisted/wamp.py | 10 ++++++++++ autobahn/wamp/interfaces.py | 2 ++ autobahn/wamp/protocol.py | 2 ++ autobahn/wamp/test/test_runner.py | 10 +++++----- autobahn/websocket/protocol.py | 16 +++++++++++++++- 7 files changed, 46 insertions(+), 10 deletions(-) diff --git a/autobahn/asyncio/wamp.py b/autobahn/asyncio/wamp.py index 87693af1..09e10efa 100644 --- a/autobahn/asyncio/wamp.py +++ b/autobahn/asyncio/wamp.py @@ -25,6 +25,7 @@ ############################################################################### from __future__ import absolute_import +import signal from autobahn.wamp import protocol from autobahn.wamp.types import ComponentConfig @@ -158,8 +159,15 @@ class ApplicationRunner(object): txaio.use_asyncio() txaio.config.loop = loop coro = loop.create_connection(transport_factory, host, port, ssl=ssl) - loop.run_until_complete(coro) + (transport, protocol) = loop.run_until_complete(coro) + loop.add_signal_handler(signal.SIGTERM, loop.stop) # 4) now enter the asyncio event loop - loop.run_forever() + try: + loop.run_forever() + except KeyboardInterrupt: + # wait until we send Goodbye if user hit ctrl-c + # (done outside this except so SIGTERM gets the same handling) + pass + loop.run_until_complete(protocol._session.leave()) loop.close() diff --git a/autobahn/twisted/test/test_application_runner.py b/autobahn/twisted/test/test_application_runner.py index e5393765..de75a5e0 100644 --- a/autobahn/twisted/test/test_application_runner.py +++ b/autobahn/twisted/test/test_application_runner.py @@ -88,9 +88,9 @@ if os.environ.get('USE_TWISTED', False): # shouldn't have actually connected to anything # successfully, and the run() call shouldn't have inserted - # any of its own call/errbacks. + # any of its own call/errbacks. (except the cleanup handler) self.assertFalse(d.called) - self.assertEqual(0, len(d.callbacks)) + self.assertEqual(1, len(d.callbacks)) # neither reactor.run() NOR reactor.stop() should have been called # (just connectTCP() will have been called) diff --git a/autobahn/twisted/wamp.py b/autobahn/twisted/wamp.py index 45969e98..f741baf9 100644 --- a/autobahn/twisted/wamp.py +++ b/autobahn/twisted/wamp.py @@ -207,6 +207,16 @@ class ApplicationRunner(object): d = client.connect(transport_factory) + # as the reactor shuts down, we wish to wait until we've sent + # out our "Goodbye" message; leave() returns a Deferred that + # fires when the transport gets to STATE_CLOSED + def cleanup(proto): + if hasattr(proto, '_session') and proto._session is not None: + return proto._session.leave() + # if we connect successfully, the arg is a WampWebSocketClientProtocol + d.addCallback(lambda proto: reactor.addSystemEventTrigger( + 'before', 'shutdown', cleanup, proto)) + # if the user didn't ask us to start the reactor, then they # get to deal with any connect errors themselves. if start_reactor: diff --git a/autobahn/wamp/interfaces.py b/autobahn/wamp/interfaces.py index ccfceadf..bef343c2 100644 --- a/autobahn/wamp/interfaces.py +++ b/autobahn/wamp/interfaces.py @@ -303,6 +303,8 @@ class ISession(object): :param message: An optional (human readable) closing message, intended for logging purposes. :type message: str + + :return: may return a Future/Deferred that fires when we've disconnected """ @abc.abstractmethod diff --git a/autobahn/wamp/protocol.py b/autobahn/wamp/protocol.py index 5a97d2ec..0b9c0f03 100644 --- a/autobahn/wamp/protocol.py +++ b/autobahn/wamp/protocol.py @@ -978,6 +978,8 @@ class ApplicationSession(BaseSession): msg = wamp.message.Goodbye(reason=reason, message=log_message) self._transport.send(msg) self._goodbye_sent = True + # deferred that fires when transport actually hits CLOSED + return self._transport.is_closed else: raise SessionNotReady(u"Already requested to close the session") diff --git a/autobahn/wamp/test/test_runner.py b/autobahn/wamp/test/test_runner.py index c9a4980b..adf2ba96 100644 --- a/autobahn/wamp/test/test_runner.py +++ b/autobahn/wamp/test/test_runner.py @@ -117,7 +117,7 @@ else: ApplicationRunner. ''' loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): ssl = {} runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm', @@ -132,7 +132,7 @@ else: ApplicationRunner and the websocket URL starts with "ws:". ''' loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm') runner.run('_unused_') @@ -145,7 +145,7 @@ else: ApplicationRunner and the websocket URL starts with "wss:". ''' loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): runner = ApplicationRunner('wss://127.0.0.1:8080/wss', 'realm') runner.run('_unused_') @@ -157,7 +157,7 @@ else: but only a "ws:" URL. ''' loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): runner = ApplicationRunner('ws://127.0.0.1:8080/wss', 'realm', ssl=True) @@ -190,7 +190,7 @@ else: context = ssl.create_default_context() loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): runner = ApplicationRunner('ws://127.0.0.1:8080/wss', 'realm', ssl=context) diff --git a/autobahn/websocket/protocol.py b/autobahn/websocket/protocol.py index 327c3924..27c469a9 100755 --- a/autobahn/websocket/protocol.py +++ b/autobahn/websocket/protocol.py @@ -660,6 +660,10 @@ class WebSocketProtocol(object): Configuration attributes specific to clients. """ + def __init__(self): + #: a Future/Deferred that fires when we hit STATE_CLOSED + self.is_closed = txaio.create_future() + def onOpen(self): """ Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onOpen` @@ -967,7 +971,12 @@ class WebSocketProtocol(object): if self.debugCodePaths: self.factory._log("dropping connection") self.droppedByMe = True + + # this code-path will be hit (*without* hitting + # _connectionLost) in some timeout scenarios (unit-tests + # cover these). However, sometimes we hit both. self.state = WebSocketProtocol.STATE_CLOSED + txaio.resolve(self.is_closed, self) self._closeConnection(abort) else: @@ -1218,7 +1227,12 @@ class WebSocketProtocol(object): self.autoPingTimeoutCall.cancel() self.autoPingTimeoutCall = None - self.state = WebSocketProtocol.STATE_CLOSED + # check required here because in some scenarios dropConnection + # will already have resolved the Future/Deferred. + if self.state != WebSocketProtocol.STATE_CLOSED: + self.state = WebSocketProtocol.STATE_CLOSED + txaio.resolve(self.is_closed, self) + if self.wasServingFlashSocketPolicyFile: if self.debug: self.factory._log("connection dropped after serving Flash Socket Policy File")