Check for assignment changes before yielding new record
This commit is contained in:
@@ -361,6 +361,15 @@ class Fetcher(six.Iterator):
|
||||
log.log(0, "Returning fetched records at offset %d for assigned"
|
||||
" partition %s", position, tp)
|
||||
for msg in self._unpack_message_set(tp, messages):
|
||||
|
||||
# Because we are in a generator, it is possible for
|
||||
# assignment to change between yield calls
|
||||
# so we need to re-check on each loop
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
log.debug("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
break
|
||||
|
||||
self._subscriptions.assignment[tp].position = msg.offset + 1
|
||||
yield msg
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user