Merge "use message id cache for RPC listener"
This commit is contained in:
commit
d4f7ea21fc
oslo_messaging
@ -292,6 +292,7 @@ class ObsoleteReplyQueuesCache(object):
|
|||||||
|
|
||||||
|
|
||||||
class AMQPListener(base.PollStyleListener):
|
class AMQPListener(base.PollStyleListener):
|
||||||
|
use_cache = False
|
||||||
|
|
||||||
def __init__(self, driver, conn):
|
def __init__(self, driver, conn):
|
||||||
super(AMQPListener, self).__init__(driver.prefetch_size)
|
super(AMQPListener, self).__init__(driver.prefetch_size)
|
||||||
@ -308,7 +309,13 @@ class AMQPListener(base.PollStyleListener):
|
|||||||
|
|
||||||
def __call__(self, message):
|
def __call__(self, message):
|
||||||
ctxt = rpc_amqp.unpack_context(message)
|
ctxt = rpc_amqp.unpack_context(message)
|
||||||
unique_id = self.msg_id_cache.check_duplicate_message(message)
|
try:
|
||||||
|
unique_id = self.msg_id_cache.check_duplicate_message(message)
|
||||||
|
except rpc_common.DuplicateMessageError:
|
||||||
|
LOG.exception("ignoring duplicate message %s", ctxt.msg_id)
|
||||||
|
return
|
||||||
|
if self.use_cache:
|
||||||
|
self.msg_id_cache.add(unique_id)
|
||||||
if ctxt.msg_id:
|
if ctxt.msg_id:
|
||||||
LOG.debug("received message msg_id: %(msg_id)s reply to "
|
LOG.debug("received message msg_id: %(msg_id)s reply to "
|
||||||
"%(queue)s", {'queue': ctxt.reply_q,
|
"%(queue)s", {'queue': ctxt.reply_q,
|
||||||
@ -389,6 +396,7 @@ class AMQPListener(base.PollStyleListener):
|
|||||||
|
|
||||||
class RpcAMQPListener(AMQPListener):
|
class RpcAMQPListener(AMQPListener):
|
||||||
message_cls = AMQPIncomingMessage
|
message_cls = AMQPIncomingMessage
|
||||||
|
use_cache = True
|
||||||
|
|
||||||
def __call__(self, message):
|
def __call__(self, message):
|
||||||
# NOTE(kgiusti): In the original RPC implementation the RPC server
|
# NOTE(kgiusti): In the original RPC implementation the RPC server
|
||||||
|
@ -1107,3 +1107,55 @@ class TestPollTimeoutLimit(test_utils.BaseTestCase):
|
|||||||
{},
|
{},
|
||||||
{'tx_id': 'test'})
|
{'tx_id': 'test'})
|
||||||
thread.join()
|
thread.join()
|
||||||
|
|
||||||
|
|
||||||
|
class TestMsgIdCache(test_utils.BaseTestCase):
|
||||||
|
@mock.patch('kombu.message.Message.reject')
|
||||||
|
def test_reply_wire_format(self, reject_mock):
|
||||||
|
self.conf.oslo_messaging_rabbit.kombu_compression = None
|
||||||
|
|
||||||
|
transport = oslo_messaging.get_transport(self.conf,
|
||||||
|
'kombu+memory:////')
|
||||||
|
self.addCleanup(transport.cleanup)
|
||||||
|
|
||||||
|
driver = transport._driver
|
||||||
|
|
||||||
|
target = oslo_messaging.Target(topic='testtopic',
|
||||||
|
server=None,
|
||||||
|
fanout=False)
|
||||||
|
|
||||||
|
listener = driver.listen(target, None, None)._poll_style_listener
|
||||||
|
|
||||||
|
connection, producer = _create_producer(target)
|
||||||
|
self.addCleanup(connection.release)
|
||||||
|
|
||||||
|
msg = {
|
||||||
|
'oslo.version': '2.0',
|
||||||
|
'oslo.message': {}
|
||||||
|
}
|
||||||
|
|
||||||
|
msg['oslo.message'].update({
|
||||||
|
'_msg_id': uuid.uuid4().hex,
|
||||||
|
'_unique_id': uuid.uuid4().hex,
|
||||||
|
'_reply_q': 'reply_' + uuid.uuid4().hex,
|
||||||
|
'_timeout': None,
|
||||||
|
})
|
||||||
|
|
||||||
|
msg['oslo.message'] = jsonutils.dumps(msg['oslo.message'])
|
||||||
|
|
||||||
|
producer.publish(msg)
|
||||||
|
|
||||||
|
received = listener.poll()[0]
|
||||||
|
self.assertIsNotNone(received)
|
||||||
|
self.assertEqual({}, received.message)
|
||||||
|
|
||||||
|
# publish the same message a second time
|
||||||
|
producer.publish(msg)
|
||||||
|
|
||||||
|
received = listener.poll(timeout=1)
|
||||||
|
|
||||||
|
# duplicate message is ignored
|
||||||
|
self.assertEqual(len(received), 0)
|
||||||
|
|
||||||
|
# we should not reject duplicate message
|
||||||
|
reject_mock.assert_not_called()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user