Move kafka._msg_iter initialization from __init__() to next()
This commit is contained in:
@@ -122,8 +122,8 @@ class KafkaConsumer(object):
|
||||
|
||||
def __init__(self, *topics, **configs):
|
||||
self.configure(**configs)
|
||||
|
||||
self.set_topic_partitions(*topics)
|
||||
self._msg_iter = None
|
||||
|
||||
# Setup offsets
|
||||
self._offsets = OffsetsStruct(fetch=defaultdict(dict),
|
||||
@@ -143,9 +143,6 @@ class KafkaConsumer(object):
|
||||
self._reset_highwater_offsets()
|
||||
self._reset_task_done_offsets()
|
||||
|
||||
# Start the message fetch generator
|
||||
self._msg_iter = self.fetch_messages()
|
||||
|
||||
def _fetch_stored_offsets(self):
|
||||
logger.info("Consumer fetching stored offsets")
|
||||
for topic, partition in self._topics:
|
||||
@@ -198,6 +195,10 @@ class KafkaConsumer(object):
|
||||
self._set_consumer_timeout_start()
|
||||
while True:
|
||||
|
||||
# Fetch a new batch if needed
|
||||
if self._msg_iter is None:
|
||||
self._msg_iter = self.fetch_messages()
|
||||
|
||||
# Check for auto-commit
|
||||
if self.should_auto_commit():
|
||||
self.commit()
|
||||
@@ -205,9 +206,9 @@ class KafkaConsumer(object):
|
||||
try:
|
||||
return self._msg_iter.next()
|
||||
|
||||
# If the previous batch finishes, start get new batch
|
||||
# Handle batch completion
|
||||
except StopIteration:
|
||||
self._msg_iter = self.fetch_messages()
|
||||
self._msg_iter = None
|
||||
|
||||
self._check_consumer_timeout()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user