diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 59d2ff71e..15f7c3d1a 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -129,11 +129,6 @@ class Connection(object): self.group_id = driver_conf.consumer_group self.url = url self._parse_url() - # TODO(Support for manual/auto_commit functionality) - # When auto_commit is False, consumer can manually notify - # the completion of the subscription. - # Currently we don't support for non auto commit option - self.auto_commit = True self._consume_loop_stopped = False def _parse_url(self): @@ -229,17 +224,6 @@ class Connection(object): self.consumer.close() self.consumer = None - def commit(self): - """Commit is used by subscribers belonging to the same group. - After subscribing messages, commit is called to prevent - the other subscribers which belong to the same group - from re-subscribing the same messages. - - Currently self.auto_commit option is always True, - so we don't need to call this function. - """ - self.consumer.commit() - def _close_producer(self): with self.producer_lock: if self.producer: @@ -260,6 +244,10 @@ class Connection(object): @with_reconnect() def declare_topic_consumer(self, topics, group=None): + # TODO(Support for manual/auto_commit functionality) + # When auto_commit is False, consumer can manually notify + # the completion of the subscription. + # Currently we don't support for non auto commit option self.consumer = kafka.KafkaConsumer( *topics, group_id=(group or self.group_id), bootstrap_servers=self.hostaddrs, @@ -308,14 +296,6 @@ class KafkaListener(base.PollStyleListener): def cleanup(self): self.conn.close() - def commit(self): - # TODO(Support for manually/auto commit functionality) - # It's better to allow users to commit manually and support for - # self.auto_commit = False option. For now, this commit function - # is meaningless since user couldn't call this function and - # auto_commit option is always True. - self.conn.commit() - class KafkaDriver(base.BaseDriver): """Note: Current implementation of this driver is experimental.