From bec4ef4d126bdcc31fa6aff5f459d490ae3bdd58 Mon Sep 17 00:00:00 2001 From: Ilya Tyaptin Date: Thu, 10 Mar 2016 16:28:40 +0300 Subject: [PATCH] Use only unique topics for the Kafka driver Consumer in Kafka driver should use only unique topic, otherwise a FetchDuplicate exception will be raised. Change-Id: I569ce446eaf05dbc3a7fd0b41a2307e940ab87fb Closes-bug: #1555081 --- oslo_messaging/_drivers/impl_kafka.py | 4 ++-- .../tests/drivers/test_impl_kafka.py | 20 +++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 49e39d1a4..a9aa58c62 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -351,9 +351,9 @@ class KafkaDriver(base.BaseDriver): :type pool: string """ conn = self._get_connection(purpose=PURPOSE_LISTEN) - topics = [] + topics = set() for target, priority in targets_and_priorities: - topics.append(target_to_topic(target, priority)) + topics.add(target_to_topic(target, priority)) conn.declare_topic_consumer(topics, pool) diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index 7f1d5e377..057ec1eda 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -206,6 +206,26 @@ class TestKafkaListener(test_utils.BaseTestCase): self.driver.listen_for_notifications(fake_targets_and_priorities) self.assertEqual(1, len(fake_consumer.mock_calls)) + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') + @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') + def test_converting_targets_to_topics(self, fake_consumer, + fake_ensure_connection): + fake_targets_and_priorities = [ + (oslo_messaging.Target(topic="fake_topic", + exchange="test1"), 'info'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test2"), 'info'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test1"), 'error'), + (oslo_messaging.Target(topic="fake_topic", + exchange="test3"), 'error'), + ] + self.driver.listen_for_notifications(fake_targets_and_priorities) + self.assertEqual(1, len(fake_consumer.mock_calls)) + fake_consumer.assert_called_once_with(set(['fake_topic.error', + 'fake_topic.info']), + None) + @mock.patch.object(kafka_driver.Connection, '_ensure_connection') @mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer') def test_stop_listener(self, fake_consumer, fake_client):