errback still outstanding requests when session is closed - fixes #800
This commit is contained in:
@@ -39,7 +39,7 @@ from autobahn.wamp import role
|
|||||||
from autobahn.wamp import exception
|
from autobahn.wamp import exception
|
||||||
from autobahn.wamp.exception import ApplicationError, ProtocolError, SessionNotReady, SerializationError
|
from autobahn.wamp.exception import ApplicationError, ProtocolError, SessionNotReady, SerializationError
|
||||||
from autobahn.wamp.interfaces import ISession # noqa
|
from autobahn.wamp.interfaces import ISession # noqa
|
||||||
from autobahn.wamp.types import SessionDetails
|
from autobahn.wamp.types import SessionDetails, CloseDetails
|
||||||
from autobahn.wamp.cryptobox import EncryptedPayload
|
from autobahn.wamp.cryptobox import EncryptedPayload
|
||||||
from autobahn.wamp.request import \
|
from autobahn.wamp.request import \
|
||||||
Publication, \
|
Publication, \
|
||||||
@@ -1104,17 +1104,38 @@ class ApplicationSession(BaseSession):
|
|||||||
Implements :func:`autobahn.wamp.interfaces.ISession.onJoin`
|
Implements :func:`autobahn.wamp.interfaces.ISession.onJoin`
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
def _errback_outstanding_requests(self, exc):
|
||||||
|
"""
|
||||||
|
Errback any still outstanding requests with exc.
|
||||||
|
"""
|
||||||
|
for requests in [self._publish_reqs,
|
||||||
|
self._subscribe_reqs,
|
||||||
|
self._unsubscribe_reqs,
|
||||||
|
self._call_reqs,
|
||||||
|
self._register_reqs,
|
||||||
|
self._unregister_reqs]:
|
||||||
|
for request in requests.values():
|
||||||
|
self.log.info('cleaning up outstanding {request_type} request {request_id}, firing errback on user handler {request_on_reply}',
|
||||||
|
request_on_reply=request.on_reply,
|
||||||
|
request_id=request.request_id,
|
||||||
|
request_type=request.__class__.__name__)
|
||||||
|
txaio.reject(request.on_reply, exc)
|
||||||
|
requests.clear()
|
||||||
|
|
||||||
@public
|
@public
|
||||||
def onLeave(self, details):
|
def onLeave(self, details):
|
||||||
"""
|
"""
|
||||||
Implements :func:`autobahn.wamp.interfaces.ISession.onLeave`
|
Implements :func:`autobahn.wamp.interfaces.ISession.onLeave`
|
||||||
"""
|
"""
|
||||||
if details.reason.startswith('wamp.error.'):
|
if details.reason != CloseDetails.REASON_DEFAULT:
|
||||||
self.log.error('{reason}: {wamp_message}', reason=details.reason, wamp_message=details.message)
|
self.log.warn('session closed with reason {reason} [{message}]', reason=details.reason, message=details.message)
|
||||||
|
|
||||||
|
# fire ApplicationError on any currently outstanding requests
|
||||||
|
exc = ApplicationError(details.reason, details.message)
|
||||||
|
self._errback_outstanding_requests(exc)
|
||||||
|
|
||||||
if self._transport:
|
if self._transport:
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
# do we ever call onLeave with a valid transport?
|
|
||||||
|
|
||||||
@public
|
@public
|
||||||
def leave(self, reason=None, message=None):
|
def leave(self, reason=None, message=None):
|
||||||
@@ -1141,7 +1162,11 @@ class ApplicationSession(BaseSession):
|
|||||||
"""
|
"""
|
||||||
Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect`
|
Implements :func:`autobahn.wamp.interfaces.ISession.onDisconnect`
|
||||||
"""
|
"""
|
||||||
pass
|
# fire TransportLost on any _still_ outstanding requests
|
||||||
|
# (these should have been already cleaned up in onLeave() - when
|
||||||
|
# this actually has fired)
|
||||||
|
exc = exception.TransportLost()
|
||||||
|
self._errback_outstanding_requests(exc)
|
||||||
|
|
||||||
@public
|
@public
|
||||||
def publish(self, topic, *args, **kwargs):
|
def publish(self, topic, *args, **kwargs):
|
||||||
|
|||||||
Reference in New Issue
Block a user