Merge pull request #403 from meejah/shutdown-with-close-message-squash

Shutdown with Goodbye
This commit is contained in:
Tobias Oberstein
2015-05-25 18:26:46 +02:00
8 changed files with 49 additions and 14 deletions

View File

@@ -25,6 +25,7 @@
###############################################################################
from __future__ import absolute_import
import signal
from autobahn.wamp import protocol
from autobahn.wamp.types import ComponentConfig
@@ -158,8 +159,15 @@ class ApplicationRunner(object):
txaio.use_asyncio()
txaio.config.loop = loop
coro = loop.create_connection(transport_factory, host, port, ssl=ssl)
loop.run_until_complete(coro)
(transport, protocol) = loop.run_until_complete(coro)
loop.add_signal_handler(signal.SIGTERM, loop.stop)
# 4) now enter the asyncio event loop
loop.run_forever()
try:
loop.run_forever()
except KeyboardInterrupt:
# wait until we send Goodbye if user hit ctrl-c
# (done outside this except so SIGTERM gets the same handling)
pass
loop.run_until_complete(protocol._session.leave())
loop.close()

View File

@@ -88,9 +88,9 @@ if os.environ.get('USE_TWISTED', False):
# shouldn't have actually connected to anything
# successfully, and the run() call shouldn't have inserted
# any of its own call/errbacks.
# any of its own call/errbacks. (except the cleanup handler)
self.assertFalse(d.called)
self.assertEqual(0, len(d.callbacks))
self.assertEqual(1, len(d.callbacks))
# neither reactor.run() NOR reactor.stop() should have been called
# (just connectTCP() will have been called)

View File

@@ -208,6 +208,16 @@ class ApplicationRunner(object):
d = client.connect(transport_factory)
# as the reactor shuts down, we wish to wait until we've sent
# out our "Goodbye" message; leave() returns a Deferred that
# fires when the transport gets to STATE_CLOSED
def cleanup(proto):
if hasattr(proto, '_session') and proto._session is not None:
return proto._session.leave()
# if we connect successfully, the arg is a WampWebSocketClientProtocol
d.addCallback(lambda proto: reactor.addSystemEventTrigger(
'before', 'shutdown', cleanup, proto))
# if the user didn't ask us to start the reactor, then they
# get to deal with any connect errors themselves.
if start_reactor:

View File

@@ -303,6 +303,8 @@ class ISession(object):
:param message: An optional (human readable) closing message, intended for
logging purposes.
:type message: str
:return: may return a Future/Deferred that fires when we've disconnected
"""
@abc.abstractmethod

View File

@@ -978,6 +978,8 @@ class ApplicationSession(BaseSession):
msg = wamp.message.Goodbye(reason=reason, message=log_message)
self._transport.send(msg)
self._goodbye_sent = True
# deferred that fires when transport actually hits CLOSED
return self._transport.is_closed
else:
raise SessionNotReady(u"Already requested to close the session")

View File

@@ -117,7 +117,7 @@ else:
ApplicationRunner.
'''
loop = Mock()
loop.create_connection = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
ssl = {}
runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm',
@@ -132,7 +132,7 @@ else:
ApplicationRunner and the websocket URL starts with "ws:".
'''
loop = Mock()
loop.create_connection = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
runner = ApplicationRunner('ws://127.0.0.1:8080/ws', 'realm')
runner.run('_unused_')
@@ -145,7 +145,7 @@ else:
ApplicationRunner and the websocket URL starts with "wss:".
'''
loop = Mock()
loop.create_connection = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
runner = ApplicationRunner('wss://127.0.0.1:8080/wss', 'realm')
runner.run('_unused_')
@@ -157,7 +157,7 @@ else:
but only a "ws:" URL.
'''
loop = Mock()
loop.create_connection = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
runner = ApplicationRunner('ws://127.0.0.1:8080/wss', 'realm',
ssl=True)
@@ -190,7 +190,7 @@ else:
context = ssl.create_default_context()
loop = Mock()
loop.create_connection = Mock()
loop.run_until_complete = Mock(return_value=(Mock(), Mock()))
with patch.object(asyncio, 'get_event_loop', return_value=loop):
runner = ApplicationRunner('ws://127.0.0.1:8080/wss', 'realm',
ssl=context)

View File

@@ -24,7 +24,7 @@
#
###############################################################################
from __future__ import absolute_import
from __future__ import absolute_import, print_function
import traceback
@@ -79,9 +79,8 @@ class WampWebSocketProtocol(object):
print("WAMP-over-WebSocket transport lost: wasClean = {0}, code = {1}, reason = '{2}'".format(wasClean, code, reason))
self._session.onClose(wasClean)
except Exception:
# silently ignore exceptions raised here ..
if self.factory.debug_wamp:
traceback.print_exc()
print("Error invoking onClose():")
traceback.print_exc()
self._session = None
def onMessage(self, payload, isBinary):

View File

@@ -660,6 +660,10 @@ class WebSocketProtocol(object):
Configuration attributes specific to clients.
"""
def __init__(self):
#: a Future/Deferred that fires when we hit STATE_CLOSED
self.is_closed = txaio.create_future()
def onOpen(self):
"""
Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onOpen`
@@ -967,7 +971,12 @@ class WebSocketProtocol(object):
if self.debugCodePaths:
self.factory._log("dropping connection")
self.droppedByMe = True
# this code-path will be hit (*without* hitting
# _connectionLost) in some timeout scenarios (unit-tests
# cover these). However, sometimes we hit both.
self.state = WebSocketProtocol.STATE_CLOSED
txaio.resolve(self.is_closed, self)
self._closeConnection(abort)
else:
@@ -1218,7 +1227,12 @@ class WebSocketProtocol(object):
self.autoPingTimeoutCall.cancel()
self.autoPingTimeoutCall = None
self.state = WebSocketProtocol.STATE_CLOSED
# check required here because in some scenarios dropConnection
# will already have resolved the Future/Deferred.
if self.state != WebSocketProtocol.STATE_CLOSED:
self.state = WebSocketProtocol.STATE_CLOSED
txaio.resolve(self.is_closed, self)
if self.wasServingFlashSocketPolicyFile:
if self.debug:
self.factory._log("connection dropped after serving Flash Socket Policy File")