diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index c5b2378e7..f9f1b06e8 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -64,6 +64,10 @@ class AMQPIncomingMessage(base.IncomingMessage): conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg)) def reply(self, reply=None, failure=None, log_failure=True): + if not self.msg_id: + # NOTE(Alexei_987) not sending reply, if msg_id is empty + # because reply should not be expected by caller side + return with self.listener.driver._get_connection() as conn: self._send_reply(conn, reply, failure, log_failure=log_failure) self._send_reply(conn, ending=True) diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index 05b2ad937..198252cb2 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -317,19 +317,25 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter) - def send_and_wait_for_reply(i): + def send_and_wait_for_reply(i, wait_for_reply): replies.append(driver.send(target, {}, {'tx_id': i}, - wait_for_reply=True, + wait_for_reply=wait_for_reply, timeout=None)) while len(senders) < 2: t = threading.Thread(target=send_and_wait_for_reply, - args=(len(senders), )) + args=(len(senders), True)) t.daemon = True senders.append(t) + # test the case then msg_id is not set + t = threading.Thread(target=send_and_wait_for_reply, + args=(len(senders), False)) + t.daemon = True + senders.append(t) + # Start the first guy, receive his message, but delay his polling notify_condition = threading.Condition() wait_conditions.append(notify_condition) @@ -354,6 +360,20 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): # Wait for the second thread to finish senders[1].join() + # Start the 3rd guy, receive his message + senders[2].start() + + msgs.append(listener.poll()) + self.assertEqual({'tx_id': 2}, msgs[-1].message) + + # Verify the _send_reply was not invoked by driver: + with mock.patch.object(msgs[2], '_send_reply') as method: + msgs[2].reply({'rx_id': 2}) + self.assertEqual(method.call_count, 0) + + # Wait for the 3rd thread to finish + senders[2].join() + # Let the first thread continue with notify_condition: notify_condition.notify() @@ -364,7 +384,8 @@ class TestRacyWaitForReply(test_utils.BaseTestCase): # Verify replies were received out of order self.assertEqual(len(senders), len(replies)) self.assertEqual({'rx_id': 1}, replies[0]) - self.assertEqual({'rx_id': 0}, replies[1]) + self.assertIsNone(replies[1]) + self.assertEqual({'rx_id': 0}, replies[2]) def _declare_queue(target):