diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 0dacec488..6ab24dcf6 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -215,9 +215,9 @@ class ReplyWaiter(object): try: while True: reply, ending = self._poll_connection(msg_id, timeout) - if reply: + if not ending: final_reply = reply - elif ending: + else: return final_reply finally: self.conn_lock.release() @@ -232,9 +232,9 @@ class ReplyWaiter(object): # The first thread got its reply, let's try and take over # the responsibility for polling continue - if reply: + if not ending: final_reply = reply - elif ending: + else: return final_reply diff --git a/tests/test_rabbit.py b/tests/test_rabbit.py index 5cff175c0..142252ca9 100644 --- a/tests/test_rabbit.py +++ b/tests/test_rabbit.py @@ -114,6 +114,15 @@ class TestSendReceive(test_utils.BaseTestCase): ('with_context', dict(ctxt={'user': 'mark'})), ] + _reply = [ + ('rx_id', dict(rx_id=True, reply=None)), + ('none', dict(rx_id=False, reply=None)), + ('empty_list', dict(rx_id=False, reply=[])), + ('empty_dict', dict(rx_id=False, reply={})), + ('false', dict(rx_id=False, reply=False)), + ('zero', dict(rx_id=False, reply=0)), + ] + _failure = [ ('success', dict(failure=False)), ('failure', dict(failure=True, expected=False)), @@ -129,6 +138,7 @@ class TestSendReceive(test_utils.BaseTestCase): def generate_scenarios(cls): cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders, cls._context, + cls._reply, cls._failure, cls._timeout) @@ -163,7 +173,7 @@ class TestSendReceive(test_utils.BaseTestCase): try: replies.append(driver.send(target, self.ctxt, - {'foo': i}, + {'tx_id': i}, wait_for_reply=True, timeout=self.timeout)) self.assertFalse(self.failure) @@ -182,7 +192,7 @@ class TestSendReceive(test_utils.BaseTestCase): received = listener.poll() self.assertIsNotNone(received) self.assertEqual(received.ctxt, self.ctxt) - self.assertEqual(received.message, {'foo': i}) + self.assertEqual(received.message, {'tx_id': i}) msgs.append(received) # reply in reverse, except reply to the first guy second from last @@ -199,8 +209,10 @@ class TestSendReceive(test_utils.BaseTestCase): failure = sys.exc_info() msgs[i].reply(failure=failure, log_failure=not self.expected) + elif self.rx_id: + msgs[i].reply({'rx_id': i}) else: - msgs[i].reply({'bar': msgs[i].message['foo']}) + msgs[i].reply(self.reply) senders[i].join() self.assertEqual(len(replies), len(senders)) @@ -209,8 +221,10 @@ class TestSendReceive(test_utils.BaseTestCase): self.assertIsInstance(reply, messaging.MessagingTimeout) elif self.failure: self.assertIsInstance(reply, ZeroDivisionError) + elif self.rx_id: + self.assertEqual(reply, {'rx_id': order[i]}) else: - self.assertEqual(reply, {'bar': order[i]}) + self.assertEqual(reply, self.reply) if not self.timeout and self.failure and not self.expected: self.assertTrue(len(errors) > 0, errors)