From 03ef5842185cbec519e6c54067c3cc16f0839310 Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Wed, 5 Oct 2016 16:04:05 -0400 Subject: [PATCH] cast() and RPC replies should not block waiting for endpoint to ack This patch fixes cast() and RPC replies so they do not block the caller once the messages have been written to the link. In both of these cases messages are sent "at most once" with best effort. If a negative acknowledgment is received a warning is logged. Change-Id: I84671c62544ec388421ecd0ccafc267c3c3d6087 Closes-Bug: #1630637 --- .../_drivers/amqp1_driver/controller.py | 87 ++++++++----------- oslo_messaging/_drivers/amqp1_driver/opts.py | 2 +- .../tests/drivers/test_amqp_driver.py | 11 +-- 3 files changed, 41 insertions(+), 59 deletions(-) diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index ad045013e..d2d59b642 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -120,21 +120,20 @@ class SendTask(Task): def _prepare(self, sender): """Called immediately before the message is handed off to the i/o - system. This implies that the sender link is up and credit is - available for this send request. + system. This implies that the sender link is up. """ - pass + if not self.wait_for_ack: + # sender is not concerned with waiting for acknowledgment + # "best effort at-most-once delivery" + self._cleanup() + self._wakeup.set() def _on_ack(self, state, info): """Called by eventloop thread when the ack/nack is received from the peer. """ - if self._wakeup.is_set(): - LOG.debug("Message ACKed after send completed: %s %s", state, info) - return - if state != pyngus.SenderLink.ACCEPTED: - # TODO(kgiusti): should retry if deadline not hit + # TODO(kgiusti): could retry if deadline not hit msg = ("{name} message send to {target} failed: remote" " disposition: {disp}, info:" "{info}".format(name=self.name, @@ -142,6 +141,7 @@ class SendTask(Task): disp=state, info=info)) self._error = exceptions.MessageDeliveryFailure(msg) + LOG.warning("%s", msg) self._cleanup() self._wakeup.set() @@ -149,18 +149,15 @@ class SendTask(Task): """Invoked by the eventloop when the send fails to complete before the timeout is reached. """ - if self._wakeup.is_set(): - LOG.debug("Message send timeout occurred after send completed") - return self.timer = None - if self.message.ttl: - msg = ("{name} message sent to {target} failed: timed" - " out".format(name=self.name, target=self.target)) - self._error = exceptions.MessagingTimeout(msg) - else: - msg = ("{name} message sent to {target} failed:" - " undeliverable".format(name=self.name, target=self.target)) - self._error = exceptions.MessageDeliveryFailure(msg) + msg = ("{name} message sent to {target} failed: timed" + " out".format(name=self.name, target=self.target)) + LOG.warning("%s", msg) + # Only raise a MessagingTimeout if the caller has explicitly specified + # a timeout. + self._error = exceptions.MessagingTimeout(msg) \ + if self.message.ttl else \ + exceptions.MessageDeliveryFailure(msg) self._cleanup() self._wakeup.set() @@ -168,15 +165,11 @@ class SendTask(Task): """Invoked by the eventloop if the send operation fails for reasons other than timeout and nack. """ - if self._wakeup.is_set(): - LOG.debug("Message send error occurred after send completed: %s", - str(description)) - return - msg = ("{name} message sent to {target} failed:" " {reason}".format(name=self.name, target=self.target, reason=description)) + LOG.warning("%s", msg) self._error = exceptions.MessageDeliveryFailure(msg) self._cleanup() self._wakeup.set() @@ -228,7 +221,7 @@ class RPCCallTask(SendTask): # must wait for reply if ACCEPTED def _cleanup(self): - if self._reply_link: + if self._reply_link and self._msg_id: self._reply_link.cancel_response(self._msg_id) self._msg_id = None super(RPCCallTask, self)._cleanup() @@ -334,9 +327,7 @@ class Sender(pyngus.SenderEventHandler): send_task.timer = self._scheduler.alarm(timer_callback, send_task.deadline) - if not self._can_send: - self._pending_sends.append(send_task) - elif self._pending_sends: + if not self._can_send or self._pending_sends: self._pending_sends.append(send_task) else: self._send(send_task) @@ -348,7 +339,7 @@ class Sender(pyngus.SenderEventHandler): self._send_pending() def credit_granted(self, sender_link): - self._send_pending() + pass def sender_remote_closed(self, sender_link, pn_condition): # The remote has initiated a close. This could happen when the message @@ -403,36 +394,30 @@ class Sender(pyngus.SenderEventHandler): @property def _can_send(self): - return (self._link is not None and - self._link.active and - self._link.credit > 0) + return self._link and self._link.active def _send(self, send_task): send_task._prepare(self) send_task.message.address = self._address - if send_task.wait_for_ack: - def pyngus_callback(link, handle, state, info): - # invoked when the message bus (n)acks this message - if state == pyngus.SenderLink.TIMED_OUT: - # ignore pyngus timeout - we maintain our own timer - return - self._unacked.discard(send_task) - send_task._on_ack(state, info) - self._unacked.add(send_task) - self._link.send(send_task.message, - delivery_callback=pyngus_callback, - handle=self, - deadline=send_task.deadline) - else: - self._link.send(send_task.message) - # simulate ack to wakeup sender - send_task._on_ack(pyngus.SenderLink.ACCEPTED, dict()) + def pyngus_callback(link, handle, state, info): + # invoked when the message bus (n)acks this message + if state == pyngus.SenderLink.TIMED_OUT: + # ignore pyngus timeout - we maintain our own timer + return + self._unacked.discard(send_task) + send_task._on_ack(state, info) + + self._unacked.add(send_task) + self._link.send(send_task.message, + delivery_callback=pyngus_callback, + handle=self, + deadline=send_task.deadline) def _send_pending(self): - # send as many pending messages as there is credit available + # send all pending messages if self._can_send: - while self._pending_sends and self._link.credit > 0: + while self._pending_sends: self._send(self._pending_sends.popleft()) def _open_link(self): diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py index e7cc38494..c407ea58c 100644 --- a/oslo_messaging/_drivers/amqp1_driver/opts.py +++ b/oslo_messaging/_drivers/amqp1_driver/opts.py @@ -226,7 +226,7 @@ amqp1_opts = [ # Settlement control cfg.MultiStrOpt('pre_settled', - default=['rpc-cast'], + default=['rpc-cast', 'rpc-reply'], help="Send messages of this type pre-settled.\n" "Pre-settled messages will not receive acknowledgement\n" "from the peer. Note well: pre-settled messages may be\n" diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index cc8f9c370..d1ca36ce7 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -1085,16 +1085,16 @@ class TestLinkRecovery(_AmqpBrokerTestCase): target.fanout = True target.server = None # these threads will share the same link - th = [] for i in range(3): t = threading.Thread(target=driver.send, args=(target, {"context": "whatever"}, {"msg": "n=%d" % i}), kwargs={'wait_for_reply': False}) t.start() - t.join(timeout=1) - self.assertTrue(t.isAlive()) - th.append(t) + # casts return once message is put on active link + t.join(timeout=30) + + time.sleep(1) # ensure messages are going nowhere self.assertEqual(self._broker.fanout_sent_count, 0) # this will trigger the release of credit for the previous links target.fanout = False @@ -1106,9 +1106,6 @@ class TestLinkRecovery(_AmqpBrokerTestCase): listener.join(timeout=30) self.assertTrue(self._broker.fanout_count == 3) self.assertFalse(listener.isAlive()) - for t in th: - t.join(timeout=30) - self.assertFalse(t.isAlive()) driver.cleanup()