Merge "Use only unique topics for the Kafka driver"
This commit is contained in:
commit
0dbeba1c6a
@ -354,9 +354,9 @@ class KafkaDriver(base.BaseDriver):
|
|||||||
:type pool: string
|
:type pool: string
|
||||||
"""
|
"""
|
||||||
conn = self._get_connection(purpose=PURPOSE_LISTEN)
|
conn = self._get_connection(purpose=PURPOSE_LISTEN)
|
||||||
topics = []
|
topics = set()
|
||||||
for target, priority in targets_and_priorities:
|
for target, priority in targets_and_priorities:
|
||||||
topics.append(target_to_topic(target, priority))
|
topics.add(target_to_topic(target, priority))
|
||||||
|
|
||||||
conn.declare_topic_consumer(topics, pool)
|
conn.declare_topic_consumer(topics, pool)
|
||||||
|
|
||||||
|
@ -206,6 +206,26 @@ class TestKafkaListener(test_utils.BaseTestCase):
|
|||||||
self.driver.listen_for_notifications(fake_targets_and_priorities)
|
self.driver.listen_for_notifications(fake_targets_and_priorities)
|
||||||
self.assertEqual(1, len(fake_consumer.mock_calls))
|
self.assertEqual(1, len(fake_consumer.mock_calls))
|
||||||
|
|
||||||
|
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||||
|
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
|
||||||
|
def test_converting_targets_to_topics(self, fake_consumer,
|
||||||
|
fake_ensure_connection):
|
||||||
|
fake_targets_and_priorities = [
|
||||||
|
(oslo_messaging.Target(topic="fake_topic",
|
||||||
|
exchange="test1"), 'info'),
|
||||||
|
(oslo_messaging.Target(topic="fake_topic",
|
||||||
|
exchange="test2"), 'info'),
|
||||||
|
(oslo_messaging.Target(topic="fake_topic",
|
||||||
|
exchange="test1"), 'error'),
|
||||||
|
(oslo_messaging.Target(topic="fake_topic",
|
||||||
|
exchange="test3"), 'error'),
|
||||||
|
]
|
||||||
|
self.driver.listen_for_notifications(fake_targets_and_priorities)
|
||||||
|
self.assertEqual(1, len(fake_consumer.mock_calls))
|
||||||
|
fake_consumer.assert_called_once_with(set(['fake_topic.error',
|
||||||
|
'fake_topic.info']),
|
||||||
|
None)
|
||||||
|
|
||||||
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
@mock.patch.object(kafka_driver.Connection, '_ensure_connection')
|
||||||
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
|
@mock.patch.object(kafka_driver.Connection, 'declare_topic_consumer')
|
||||||
def test_stop_listener(self, fake_consumer, fake_client):
|
def test_stop_listener(self, fake_consumer, fake_client):
|
||||||
|
Loading…
Reference in New Issue
Block a user