diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 49e39d1a4..11de3903a 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -226,6 +226,9 @@ class Connection(object): self.kafka_client = None def declare_topic_consumer(self, topics, group=None): + self._ensure_connection() + for topic in topics: + self.kafka_client.ensure_topic_exists(topic) self.consumer = kafka.KafkaConsumer( *topics, group_id=group, bootstrap_servers=["%s:%s" % (self.host, str(self.port))],