Fetcher iterator should check for pause and seek resets
This commit is contained in:
@@ -400,11 +400,13 @@ class Fetcher(six.Iterator):
|
|||||||
for msg in self._unpack_message_set(tp, messages):
|
for msg in self._unpack_message_set(tp, messages):
|
||||||
|
|
||||||
# Because we are in a generator, it is possible for
|
# Because we are in a generator, it is possible for
|
||||||
# assignment to change between yield calls
|
# subscription state to change between yield calls
|
||||||
# so we need to re-check on each loop
|
# so we need to re-check on each loop
|
||||||
if not self._subscriptions.is_assigned(tp):
|
# this should catch assignment changes, pauses
|
||||||
|
# and resets via seek_to_beginning / seek_to_end
|
||||||
|
if not self._subscriptions.is_fetchable(tp):
|
||||||
log.debug("Not returning fetched records for partition %s"
|
log.debug("Not returning fetched records for partition %s"
|
||||||
" since it is no longer assigned", tp)
|
" since it is no longer fetchable", tp)
|
||||||
break
|
break
|
||||||
|
|
||||||
# Compressed messagesets may include earlier messages
|
# Compressed messagesets may include earlier messages
|
||||||
|
Reference in New Issue
Block a user