From d3fedf862400a295d3964e84baea71f56f47fe0f Mon Sep 17 00:00:00 2001 From: Ilya Tyaptin Date: Mon, 21 Mar 2016 11:59:12 +0300 Subject: [PATCH] [Kafka] Ensure a topics before consume messages Currently we trying ot fetch messages from the topics even they have bot been created yet. This behaviour causes a KafkaConfigurationError which are raised in the kafka driver. Change-Id: I78cfd5ac24fbf37be5649232d0bc825319cf6402 Closes-bug: #1557521 --- oslo_messaging/_drivers/impl_kafka.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 49e39d1a4..11de3903a 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -226,6 +226,9 @@ class Connection(object): self.kafka_client = None def declare_topic_consumer(self, topics, group=None): + self._ensure_connection() + for topic in topics: + self.kafka_client.ensure_topic_exists(topic) self.consumer = kafka.KafkaConsumer( *topics, group_id=group, bootstrap_servers=["%s:%s" % (self.host, str(self.port))],