Added some comments about message state
This commit is contained in:
@@ -365,6 +365,13 @@ class SimpleConsumer(Consumer):
|
||||
next_offset = None
|
||||
for message in resp.messages:
|
||||
next_offset = message.offset
|
||||
|
||||
# update the offset before the message is yielded. This is
|
||||
# so that the consumer state is not lost in certain cases.
|
||||
# For eg: the message is yielded and consumed by the caller,
|
||||
# but the caller does not come back into the generator again.
|
||||
# The message will be consumed but the status will not be
|
||||
# updated in the consumer
|
||||
self.offsets[partition] = message.offset
|
||||
yield message
|
||||
if next_offset is None:
|
||||
|
||||
Reference in New Issue
Block a user