diff --git a/autobahn/wamp/protocol.py b/autobahn/wamp/protocol.py index 1fc12bb3..8ab43b45 100644 --- a/autobahn/wamp/protocol.py +++ b/autobahn/wamp/protocol.py @@ -39,7 +39,7 @@ from autobahn.wamp import role from autobahn.wamp import exception from autobahn.wamp.exception import ApplicationError, ProtocolError, SessionNotReady, SerializationError from autobahn.wamp.interfaces import ISession # noqa -from autobahn.wamp.types import SessionDetails +from autobahn.wamp.types import SessionDetails, CloseDetails from autobahn.wamp.cryptobox import EncryptedPayload from autobahn.wamp.request import \ Publication, \ @@ -1104,17 +1104,38 @@ class ApplicationSession(BaseSession): Implements :func:`autobahn.wamp.interfaces.ISession.onJoin` """ + def _errback_outstanding_requests(self, exc): + """ + Errback any still outstanding requests with exc. + """ + for requests in [self._publish_reqs, + self._subscribe_reqs, + self._unsubscribe_reqs, + self._call_reqs, + self._register_reqs, + self._unregister_reqs]: + for request in requests.values(): + self.log.info('cleaning up outstanding {request_type} request {request_id}, firing errback on user handler {request_on_reply}', + request_on_reply=request.on_reply, + request_id=request.request_id, + request_type=request.__class__.__name__) + txaio.reject(request.on_reply, exc) + requests.clear() + @public def onLeave(self, details): """ Implements :func:`autobahn.wamp.interfaces.ISession.onLeave` """ - if details.reason.startswith('wamp.error.'): - self.log.error('{reason}: {wamp_message}', reason=details.reason, wamp_message=details.message) + if details.reason != CloseDetails.REASON_DEFAULT: + self.log.warn('session closed with reason {reason} [{message}]', reason=details.reason, message=details.message) + + # fire ApplicationError on any currently outstanding requests + exc = ApplicationError(details.reason, details.message) + self._errback_outstanding_requests(exc) if self._transport: self.disconnect() - # do we ever call onLeave with a valid transport? @public def leave(self, reason=None, message=None): @@ -1141,7 +1162,11 @@ class ApplicationSession(BaseSession): """ Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect` """ - pass + # fire TransportLost on any _still_ outstanding requests + # (these should have been already cleaned up in onLeave() - when + # this actually has fired) + exc = exception.TransportLost() + self._errback_outstanding_requests(exc) @public def publish(self, topic, *args, **kwargs):