send acknowledge for event only after successful return of user subscription handler
This commit is contained in:
@@ -590,23 +590,28 @@ class ApplicationSession(BaseSession):
|
||||
if handler.details_arg:
|
||||
invoke_kwargs[handler.details_arg] = types.EventDetails(publication=msg.publication, publisher=msg.publisher, publisher_authid=msg.publisher_authid, publisher_authrole=msg.publisher_authrole, topic=topic, retained=msg.retained, enc_algo=msg.enc_algo)
|
||||
|
||||
# FIXME: https://github.com/crossbario/autobahn-python/issues/764
|
||||
def _success(_):
|
||||
# Acknowledged Events -- only if we got the details header and
|
||||
# the broker advertised it
|
||||
if msg.x_acknowledged_delivery and self._router_roles["broker"].x_acknowledged_event_delivery:
|
||||
if self._transport:
|
||||
response = message.EventReceived(msg.publication)
|
||||
self._transport.send(response)
|
||||
else:
|
||||
self.log.warn("successfully processed event with acknowledged delivery, but could not send ACK, since the transport was lost in the meantime")
|
||||
|
||||
def _error(e):
|
||||
errmsg = 'While firing {0} subscribed under {1}.'.format(
|
||||
handler.fn, msg.subscription)
|
||||
return self._swallow_error(e, errmsg)
|
||||
|
||||
future = txaio.as_future(handler.fn, *invoke_args, **invoke_kwargs)
|
||||
txaio.add_callbacks(future, None, _error)
|
||||
txaio.add_callbacks(future, _success, _error)
|
||||
|
||||
else:
|
||||
raise ProtocolError("EVENT received for non-subscribed subscription ID {0}".format(msg.subscription))
|
||||
|
||||
# Acknowledged Events -- only if we got the details header and
|
||||
# the broker advertised it
|
||||
if msg.x_acknowledged_delivery and self._router_roles["broker"].x_acknowledged_event_delivery:
|
||||
response = message.EventReceived(msg.publication)
|
||||
self._transport.send(response)
|
||||
|
||||
elif isinstance(msg, message.Published):
|
||||
|
||||
if msg.request in self._publish_reqs:
|
||||
|
||||
Reference in New Issue
Block a user