Fix Confluent Kafka consumer topics subscription

The change fixes:

`TypeError: on_revoke expects a callable`

when `repartition_callback` is not set.

Change-Id: I488d6c99e2f2ad857d44b471c5b62e549062a46e
Story: 2003705
Task: 36093
This commit is contained in:
Witek Bedyk 2019-08-06 14:53:03 +02:00
parent ce053d1094
commit 875c233fd7
1 changed files with 4 additions and 1 deletions

View File

@ -56,7 +56,10 @@ class KafkaConsumer(object):
self._commit_callback = commit_callback
self._max_commit_interval = max_commit_interval
self._consumer = confluent_kafka.Consumer(consumer_config)
self._consumer.subscribe([topic], on_revoke=repartition_callback)
if repartition_callback:
self._consumer.subscribe([topic], on_revoke=repartition_callback)
else:
self._consumer.subscribe([topic])
self._last_commit = None
def __iter__(self):