Merge "Should not send replies for cast messages"
This commit is contained in:
@@ -64,6 +64,10 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
|||||||
conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
|
conn.direct_send(self.msg_id, rpc_common.serialize_msg(msg))
|
||||||
|
|
||||||
def reply(self, reply=None, failure=None, log_failure=True):
|
def reply(self, reply=None, failure=None, log_failure=True):
|
||||||
|
if not self.msg_id:
|
||||||
|
# NOTE(Alexei_987) not sending reply, if msg_id is empty
|
||||||
|
# because reply should not be expected by caller side
|
||||||
|
return
|
||||||
with self.listener.driver._get_connection() as conn:
|
with self.listener.driver._get_connection() as conn:
|
||||||
self._send_reply(conn, reply, failure, log_failure=log_failure)
|
self._send_reply(conn, reply, failure, log_failure=log_failure)
|
||||||
self._send_reply(conn, ending=True)
|
self._send_reply(conn, ending=True)
|
||||||
|
|||||||
@@ -317,19 +317,25 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
|
self.stubs.Set(amqpdriver.ReplyWaiter, 'wait', reply_waiter)
|
||||||
|
|
||||||
def send_and_wait_for_reply(i):
|
def send_and_wait_for_reply(i, wait_for_reply):
|
||||||
replies.append(driver.send(target,
|
replies.append(driver.send(target,
|
||||||
{},
|
{},
|
||||||
{'tx_id': i},
|
{'tx_id': i},
|
||||||
wait_for_reply=True,
|
wait_for_reply=wait_for_reply,
|
||||||
timeout=None))
|
timeout=None))
|
||||||
|
|
||||||
while len(senders) < 2:
|
while len(senders) < 2:
|
||||||
t = threading.Thread(target=send_and_wait_for_reply,
|
t = threading.Thread(target=send_and_wait_for_reply,
|
||||||
args=(len(senders), ))
|
args=(len(senders), True))
|
||||||
t.daemon = True
|
t.daemon = True
|
||||||
senders.append(t)
|
senders.append(t)
|
||||||
|
|
||||||
|
# test the case then msg_id is not set
|
||||||
|
t = threading.Thread(target=send_and_wait_for_reply,
|
||||||
|
args=(len(senders), False))
|
||||||
|
t.daemon = True
|
||||||
|
senders.append(t)
|
||||||
|
|
||||||
# Start the first guy, receive his message, but delay his polling
|
# Start the first guy, receive his message, but delay his polling
|
||||||
notify_condition = threading.Condition()
|
notify_condition = threading.Condition()
|
||||||
wait_conditions.append(notify_condition)
|
wait_conditions.append(notify_condition)
|
||||||
@@ -354,6 +360,20 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
|
|||||||
# Wait for the second thread to finish
|
# Wait for the second thread to finish
|
||||||
senders[1].join()
|
senders[1].join()
|
||||||
|
|
||||||
|
# Start the 3rd guy, receive his message
|
||||||
|
senders[2].start()
|
||||||
|
|
||||||
|
msgs.append(listener.poll())
|
||||||
|
self.assertEqual({'tx_id': 2}, msgs[-1].message)
|
||||||
|
|
||||||
|
# Verify the _send_reply was not invoked by driver:
|
||||||
|
with mock.patch.object(msgs[2], '_send_reply') as method:
|
||||||
|
msgs[2].reply({'rx_id': 2})
|
||||||
|
self.assertEqual(method.call_count, 0)
|
||||||
|
|
||||||
|
# Wait for the 3rd thread to finish
|
||||||
|
senders[2].join()
|
||||||
|
|
||||||
# Let the first thread continue
|
# Let the first thread continue
|
||||||
with notify_condition:
|
with notify_condition:
|
||||||
notify_condition.notify()
|
notify_condition.notify()
|
||||||
@@ -364,7 +384,8 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
|
|||||||
# Verify replies were received out of order
|
# Verify replies were received out of order
|
||||||
self.assertEqual(len(senders), len(replies))
|
self.assertEqual(len(senders), len(replies))
|
||||||
self.assertEqual({'rx_id': 1}, replies[0])
|
self.assertEqual({'rx_id': 1}, replies[0])
|
||||||
self.assertEqual({'rx_id': 0}, replies[1])
|
self.assertIsNone(replies[1])
|
||||||
|
self.assertEqual({'rx_id': 0}, replies[2])
|
||||||
|
|
||||||
|
|
||||||
def _declare_queue(target):
|
def _declare_queue(target):
|
||||||
|
|||||||
Reference in New Issue
Block a user