diff --git a/autobahn/asyncio/wamp.py b/autobahn/asyncio/wamp.py index 87693af1..09e10efa 100644 --- a/autobahn/asyncio/wamp.py +++ b/autobahn/asyncio/wamp.py @@ -25,6 +25,7 @@ ############################################################################### from __future__ import absolute_import +import signal from autobahn.wamp import protocol from autobahn.wamp.types import ComponentConfig @@ -158,8 +159,15 @@ class ApplicationRunner(object): txaio.use_asyncio() txaio.config.loop = loop coro = loop.create_connection(transport_factory, host, port, ssl=ssl) - loop.run_until_complete(coro) + (transport, protocol) = loop.run_until_complete(coro) + loop.add_signal_handler(signal.SIGTERM, loop.stop) # 4) now enter the asyncio event loop - loop.run_forever() + try: + loop.run_forever() + except KeyboardInterrupt: + # wait until we send Goodbye if user hit ctrl-c + # (done outside this except so SIGTERM gets the same handling) + pass + loop.run_until_complete(protocol._session.leave()) loop.close() diff --git a/autobahn/twisted/test/test_application_runner.py b/autobahn/twisted/test/test_application_runner.py index e5393765..de75a5e0 100644 --- a/autobahn/twisted/test/test_application_runner.py +++ b/autobahn/twisted/test/test_application_runner.py @@ -88,9 +88,9 @@ if os.environ.get('USE_TWISTED', False): # shouldn't have actually connected to anything # successfully, and the run() call shouldn't have inserted - # any of its own call/errbacks. + # any of its own call/errbacks. (except the cleanup handler) self.assertFalse(d.called) - self.assertEqual(0, len(d.callbacks)) + self.assertEqual(1, len(d.callbacks)) # neither reactor.run() NOR reactor.stop() should have been called # (just connectTCP() will have been called) diff --git a/autobahn/twisted/wamp.py b/autobahn/twisted/wamp.py index 2bfc04d7..7a242328 100644 --- a/autobahn/twisted/wamp.py +++ b/autobahn/twisted/wamp.py @@ -208,6 +208,16 @@ class ApplicationRunner(object): d = client.connect(transport_factory) + # as the reactor shuts down, we wish to wait until we've sent + # out our "Goodbye" message; leave() returns a Deferred that + # fires when the transport gets to STATE_CLOSED + def cleanup(proto): + if hasattr(proto, '_session') and proto._session is not None: + return proto._session.leave() + # if we connect successfully, the arg is a WampWebSocketClientProtocol + d.addCallback(lambda proto: reactor.addSystemEventTrigger( + 'before', 'shutdown', cleanup, proto)) + # if the user didn't ask us to start the reactor, then they # get to deal with any connect errors themselves. if start_reactor: diff --git a/autobahn/wamp/interfaces.py b/autobahn/wamp/interfaces.py index ccfceadf..bef343c2 100644 --- a/autobahn/wamp/interfaces.py +++ b/autobahn/wamp/interfaces.py @@ -303,6 +303,8 @@ class ISession(object): :param message: An optional (human readable) closing message, intended for logging purposes. :type message: str + + :return: may return a Future/Deferred that fires when we've disconnected """ @abc.abstractmethod diff --git a/autobahn/wamp/protocol.py b/autobahn/wamp/protocol.py index 5a97d2ec..0b9c0f03 100644 --- a/autobahn/wamp/protocol.py +++ b/autobahn/wamp/protocol.py @@ -978,6 +978,8 @@ class ApplicationSession(BaseSession): msg = wamp.message.Goodbye(reason=reason, message=log_message) self._transport.send(msg) self._goodbye_sent = True + # deferred that fires when transport actually hits CLOSED + return self._transport.is_closed else: raise SessionNotReady(u"Already requested to close the session") diff --git a/autobahn/wamp/test/test_runner.py b/autobahn/wamp/test/test_runner.py index c9a4980b..adf2ba96 100644 --- a/autobahn/wamp/test/test_runner.py +++ b/autobahn/wamp/test/test_runner.py @@ -117,7 +117,7 @@ else: ApplicationRunner. ''' loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): ssl = {} runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm', @@ -132,7 +132,7 @@ else: ApplicationRunner and the websocket URL starts with "ws:". ''' loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm') runner.run('_unused_') @@ -145,7 +145,7 @@ else: ApplicationRunner and the websocket URL starts with "wss:". ''' loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): runner = ApplicationRunner('wss://127.0.0.1:8080/wss', 'realm') runner.run('_unused_') @@ -157,7 +157,7 @@ else: but only a "ws:" URL. ''' loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): runner = ApplicationRunner('ws://127.0.0.1:8080/wss', 'realm', ssl=True) @@ -190,7 +190,7 @@ else: context = ssl.create_default_context() loop = Mock() - loop.create_connection = Mock() + loop.run_until_complete = Mock(return_value=(Mock(), Mock())) with patch.object(asyncio, 'get_event_loop', return_value=loop): runner = ApplicationRunner('ws://127.0.0.1:8080/wss', 'realm', ssl=context) diff --git a/autobahn/wamp/websocket.py b/autobahn/wamp/websocket.py index ed069d6e..39fefd5b 100644 --- a/autobahn/wamp/websocket.py +++ b/autobahn/wamp/websocket.py @@ -24,7 +24,7 @@ # ############################################################################### -from __future__ import absolute_import +from __future__ import absolute_import, print_function import traceback @@ -79,9 +79,8 @@ class WampWebSocketProtocol(object): print("WAMP-over-WebSocket transport lost: wasClean = {0}, code = {1}, reason = '{2}'".format(wasClean, code, reason)) self._session.onClose(wasClean) except Exception: - # silently ignore exceptions raised here .. - if self.factory.debug_wamp: - traceback.print_exc() + print("Error invoking onClose():") + traceback.print_exc() self._session = None def onMessage(self, payload, isBinary): diff --git a/autobahn/websocket/protocol.py b/autobahn/websocket/protocol.py index 327c3924..27c469a9 100755 --- a/autobahn/websocket/protocol.py +++ b/autobahn/websocket/protocol.py @@ -660,6 +660,10 @@ class WebSocketProtocol(object): Configuration attributes specific to clients. """ + def __init__(self): + #: a Future/Deferred that fires when we hit STATE_CLOSED + self.is_closed = txaio.create_future() + def onOpen(self): """ Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onOpen` @@ -967,7 +971,12 @@ class WebSocketProtocol(object): if self.debugCodePaths: self.factory._log("dropping connection") self.droppedByMe = True + + # this code-path will be hit (*without* hitting + # _connectionLost) in some timeout scenarios (unit-tests + # cover these). However, sometimes we hit both. self.state = WebSocketProtocol.STATE_CLOSED + txaio.resolve(self.is_closed, self) self._closeConnection(abort) else: @@ -1218,7 +1227,12 @@ class WebSocketProtocol(object): self.autoPingTimeoutCall.cancel() self.autoPingTimeoutCall = None - self.state = WebSocketProtocol.STATE_CLOSED + # check required here because in some scenarios dropConnection + # will already have resolved the Future/Deferred. + if self.state != WebSocketProtocol.STATE_CLOSED: + self.state = WebSocketProtocol.STATE_CLOSED + txaio.resolve(self.is_closed, self) + if self.wasServingFlashSocketPolicyFile: if self.debug: self.factory._log("connection dropped after serving Flash Socket Policy File")