Use explicit subscription state flag to handle seek() during message iteration
This commit is contained in:
@@ -424,6 +424,12 @@ class Fetcher(six.Iterator):
|
||||
elif fetch_offset == position:
|
||||
log.log(0, "Returning fetched records at offset %d for assigned"
|
||||
" partition %s", position, tp)
|
||||
|
||||
# We can ignore any prior signal to drop pending message sets
|
||||
# because we are starting from a fresh one where fetch_offset == position
|
||||
# i.e., the user seek()'d to this position
|
||||
self._subscriptions.assignment[tp].drop_pending_message_set = False
|
||||
|
||||
for msg in self._unpack_message_set(tp, messages):
|
||||
|
||||
# Because we are in a generator, it is possible for
|
||||
@@ -436,8 +442,16 @@ class Fetcher(six.Iterator):
|
||||
" since it is no longer fetchable", tp)
|
||||
break
|
||||
|
||||
# If there is a seek during message iteration,
|
||||
# we should stop unpacking this message set and
|
||||
# wait for a new fetch response that aligns with the
|
||||
# new seek position
|
||||
elif self._subscriptions.assignment[tp].drop_pending_message_set:
|
||||
log.debug("Skipping remainder of message set for partition %s", tp)
|
||||
self._subscriptions.assignment[tp].drop_pending_message_set = False
|
||||
break
|
||||
|
||||
# Compressed messagesets may include earlier messages
|
||||
# It is also possible that the user called seek()
|
||||
elif msg.offset < self._subscriptions.assignment[tp].position:
|
||||
log.debug("Skipping message offset: %s (expecting %s)",
|
||||
msg.offset,
|
||||
|
||||
@@ -350,6 +350,7 @@ class TopicPartitionState(object):
|
||||
self.reset_strategy = None # the reset strategy if awaitingReset is set
|
||||
self._position = None # offset exposed to the user
|
||||
self.highwater = None
|
||||
self.drop_pending_message_set = False
|
||||
|
||||
def _set_position(self, offset):
|
||||
assert self.has_valid_position, 'Valid position required'
|
||||
@@ -371,6 +372,7 @@ class TopicPartitionState(object):
|
||||
self.awaiting_reset = False
|
||||
self.reset_strategy = None
|
||||
self.has_valid_position = True
|
||||
self.drop_pending_message_set = True
|
||||
|
||||
def pause(self):
|
||||
self.paused = True
|
||||
|
||||
Reference in New Issue
Block a user