cast() and RPC replies should not block waiting for endpoint to ack

This patch fixes cast() and RPC replies so they do not block the
caller once the messages have been written to the link.  In both of
these cases messages are sent "at most once" with best effort.

If a negative acknowledgment is received a warning is logged.

Change-Id: I84671c62544ec388421ecd0ccafc267c3c3d6087
Closes-Bug: #1630637
This commit is contained in:
Kenneth Giusti 2016-10-05 16:04:05 -04:00
parent 6357a45f82
commit 03ef584218
3 changed files with 41 additions and 59 deletions

View File

@ -120,21 +120,20 @@ class SendTask(Task):
def _prepare(self, sender):
"""Called immediately before the message is handed off to the i/o
system. This implies that the sender link is up and credit is
available for this send request.
system. This implies that the sender link is up.
"""
pass
if not self.wait_for_ack:
# sender is not concerned with waiting for acknowledgment
# "best effort at-most-once delivery"
self._cleanup()
self._wakeup.set()
def _on_ack(self, state, info):
"""Called by eventloop thread when the ack/nack is received from the
peer.
"""
if self._wakeup.is_set():
LOG.debug("Message ACKed after send completed: %s %s", state, info)
return
if state != pyngus.SenderLink.ACCEPTED:
# TODO(kgiusti): should retry if deadline not hit
# TODO(kgiusti): could retry if deadline not hit
msg = ("{name} message send to {target} failed: remote"
" disposition: {disp}, info:"
"{info}".format(name=self.name,
@ -142,6 +141,7 @@ class SendTask(Task):
disp=state,
info=info))
self._error = exceptions.MessageDeliveryFailure(msg)
LOG.warning("%s", msg)
self._cleanup()
self._wakeup.set()
@ -149,18 +149,15 @@ class SendTask(Task):
"""Invoked by the eventloop when the send fails to complete before the
timeout is reached.
"""
if self._wakeup.is_set():
LOG.debug("Message send timeout occurred after send completed")
return
self.timer = None
if self.message.ttl:
msg = ("{name} message sent to {target} failed: timed"
" out".format(name=self.name, target=self.target))
self._error = exceptions.MessagingTimeout(msg)
else:
msg = ("{name} message sent to {target} failed:"
" undeliverable".format(name=self.name, target=self.target))
self._error = exceptions.MessageDeliveryFailure(msg)
msg = ("{name} message sent to {target} failed: timed"
" out".format(name=self.name, target=self.target))
LOG.warning("%s", msg)
# Only raise a MessagingTimeout if the caller has explicitly specified
# a timeout.
self._error = exceptions.MessagingTimeout(msg) \
if self.message.ttl else \
exceptions.MessageDeliveryFailure(msg)
self._cleanup()
self._wakeup.set()
@ -168,15 +165,11 @@ class SendTask(Task):
"""Invoked by the eventloop if the send operation fails for reasons
other than timeout and nack.
"""
if self._wakeup.is_set():
LOG.debug("Message send error occurred after send completed: %s",
str(description))
return
msg = ("{name} message sent to {target} failed:"
" {reason}".format(name=self.name,
target=self.target,
reason=description))
LOG.warning("%s", msg)
self._error = exceptions.MessageDeliveryFailure(msg)
self._cleanup()
self._wakeup.set()
@ -228,7 +221,7 @@ class RPCCallTask(SendTask):
# must wait for reply if ACCEPTED
def _cleanup(self):
if self._reply_link:
if self._reply_link and self._msg_id:
self._reply_link.cancel_response(self._msg_id)
self._msg_id = None
super(RPCCallTask, self)._cleanup()
@ -334,9 +327,7 @@ class Sender(pyngus.SenderEventHandler):
send_task.timer = self._scheduler.alarm(timer_callback,
send_task.deadline)
if not self._can_send:
self._pending_sends.append(send_task)
elif self._pending_sends:
if not self._can_send or self._pending_sends:
self._pending_sends.append(send_task)
else:
self._send(send_task)
@ -348,7 +339,7 @@ class Sender(pyngus.SenderEventHandler):
self._send_pending()
def credit_granted(self, sender_link):
self._send_pending()
pass
def sender_remote_closed(self, sender_link, pn_condition):
# The remote has initiated a close. This could happen when the message
@ -403,36 +394,30 @@ class Sender(pyngus.SenderEventHandler):
@property
def _can_send(self):
return (self._link is not None and
self._link.active and
self._link.credit > 0)
return self._link and self._link.active
def _send(self, send_task):
send_task._prepare(self)
send_task.message.address = self._address
if send_task.wait_for_ack:
def pyngus_callback(link, handle, state, info):
# invoked when the message bus (n)acks this message
if state == pyngus.SenderLink.TIMED_OUT:
# ignore pyngus timeout - we maintain our own timer
return
self._unacked.discard(send_task)
send_task._on_ack(state, info)
self._unacked.add(send_task)
self._link.send(send_task.message,
delivery_callback=pyngus_callback,
handle=self,
deadline=send_task.deadline)
else:
self._link.send(send_task.message)
# simulate ack to wakeup sender
send_task._on_ack(pyngus.SenderLink.ACCEPTED, dict())
def pyngus_callback(link, handle, state, info):
# invoked when the message bus (n)acks this message
if state == pyngus.SenderLink.TIMED_OUT:
# ignore pyngus timeout - we maintain our own timer
return
self._unacked.discard(send_task)
send_task._on_ack(state, info)
self._unacked.add(send_task)
self._link.send(send_task.message,
delivery_callback=pyngus_callback,
handle=self,
deadline=send_task.deadline)
def _send_pending(self):
# send as many pending messages as there is credit available
# send all pending messages
if self._can_send:
while self._pending_sends and self._link.credit > 0:
while self._pending_sends:
self._send(self._pending_sends.popleft())
def _open_link(self):

View File

@ -226,7 +226,7 @@ amqp1_opts = [
# Settlement control
cfg.MultiStrOpt('pre_settled',
default=['rpc-cast'],
default=['rpc-cast', 'rpc-reply'],
help="Send messages of this type pre-settled.\n"
"Pre-settled messages will not receive acknowledgement\n"
"from the peer. Note well: pre-settled messages may be\n"

View File

@ -1085,16 +1085,16 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
target.fanout = True
target.server = None
# these threads will share the same link
th = []
for i in range(3):
t = threading.Thread(target=driver.send,
args=(target, {"context": "whatever"},
{"msg": "n=%d" % i}),
kwargs={'wait_for_reply': False})
t.start()
t.join(timeout=1)
self.assertTrue(t.isAlive())
th.append(t)
# casts return once message is put on active link
t.join(timeout=30)
time.sleep(1) # ensure messages are going nowhere
self.assertEqual(self._broker.fanout_sent_count, 0)
# this will trigger the release of credit for the previous links
target.fanout = False
@ -1106,9 +1106,6 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
listener.join(timeout=30)
self.assertTrue(self._broker.fanout_count == 3)
self.assertFalse(listener.isAlive())
for t in th:
t.join(timeout=30)
self.assertFalse(t.isAlive())
driver.cleanup()