Merge pull request #177 from zever/fetch-last-known-offsets
Move fetching last known offset logic to a stand alone function.
This commit is contained in:
@@ -100,6 +100,16 @@ class Consumer(object):
|
||||
self.commit)
|
||||
self.commit_timer.start()
|
||||
|
||||
if auto_commit:
|
||||
self.fetch_last_known_offsets(partitions)
|
||||
else:
|
||||
for partition in partitions:
|
||||
self.offsets[partition] = 0
|
||||
|
||||
def fetch_last_known_offsets(self, partitions=None):
|
||||
if not partitions:
|
||||
partitions = self.client.topic_partitions[self.topic]
|
||||
|
||||
def get_or_init_offset_callback(resp):
|
||||
try:
|
||||
kafka.common.check_error(resp)
|
||||
@@ -107,16 +117,13 @@ class Consumer(object):
|
||||
except kafka.common.UnknownTopicOrPartitionError:
|
||||
return 0
|
||||
|
||||
if auto_commit:
|
||||
for partition in partitions:
|
||||
req = OffsetFetchRequest(topic, partition)
|
||||
(offset,) = self.client.send_offset_fetch_request(group, [req],
|
||||
callback=get_or_init_offset_callback,
|
||||
fail_on_error=False)
|
||||
self.offsets[partition] = offset
|
||||
else:
|
||||
for partition in partitions:
|
||||
self.offsets[partition] = 0
|
||||
for partition in partitions:
|
||||
req = OffsetFetchRequest(self.topic, partition)
|
||||
(offset,) = self.client.send_offset_fetch_request(self.group, [req],
|
||||
callback=get_or_init_offset_callback,
|
||||
fail_on_error=False)
|
||||
self.offsets[partition] = offset
|
||||
self.fetch_offsets = self.offsets.copy()
|
||||
|
||||
def commit(self, partitions=None):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user