diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 589baf5e5..24fdbc737 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object): class AMQPListener(base.PollStyleListener): + use_cache = False def __init__(self, driver, conn): super(AMQPListener, self).__init__(driver.prefetch_size) @@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener): def __call__(self, message): ctxt = rpc_amqp.unpack_context(message) - unique_id = self.msg_id_cache.check_duplicate_message(message) + try: + unique_id = self.msg_id_cache.check_duplicate_message(message) + except rpc_common.DuplicateMessageError: + LOG.exception("ignoring duplicate message %s", ctxt.msg_id) + return + if self.use_cache: + self.msg_id_cache.add(unique_id) if ctxt.msg_id: LOG.debug("received message msg_id: %(msg_id)s reply to " "%(queue)s", {'queue': ctxt.reply_q, @@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener): class RpcAMQPListener(AMQPListener): message_cls = AMQPIncomingMessage + use_cache = True def __call__(self, message): # NOTE(kgiusti): In the original RPC implementation the RPC server diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index f8882f27e..895566127 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -1107,3 +1107,55 @@ class TestPollTimeoutLimit(test_utils.BaseTestCase): {}, {'tx_id': 'test'}) thread.join() + + +class TestMsgIdCache(test_utils.BaseTestCase): + @mock.patch('kombu.message.Message.reject') + def test_reply_wire_format(self, reject_mock): + self.conf.oslo_messaging_rabbit.kombu_compression = None + + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + + driver = transport._driver + + target = oslo_messaging.Target(topic='testtopic', + server=None, + fanout=False) + + listener = driver.listen(target, None, None)._poll_style_listener + + connection, producer = _create_producer(target) + self.addCleanup(connection.release) + + msg = { + 'oslo.version': '2.0', + 'oslo.message': {} + } + + msg['oslo.message'].update({ + '_msg_id': uuid.uuid4().hex, + '_unique_id': uuid.uuid4().hex, + '_reply_q': 'reply_' + uuid.uuid4().hex, + '_timeout': None, + }) + + msg['oslo.message'] = jsonutils.dumps(msg['oslo.message']) + + producer.publish(msg) + + received = listener.poll()[0] + self.assertIsNotNone(received) + self.assertEqual({}, received.message) + + # publish the same message a second time + producer.publish(msg) + + received = listener.poll(timeout=1) + + # duplicate message is ignored + self.assertEqual(len(received), 0) + + # we should not reject duplicate message + reject_mock.assert_not_called()