Log debug messages when skipping fetched messages due to offset checks
This commit is contained in:
@@ -338,6 +338,8 @@ class Fetcher(six.Iterator):
|
|||||||
for record in self._unpack_message_set(tp, messages):
|
for record in self._unpack_message_set(tp, messages):
|
||||||
# Fetched compressed messages may include additional records
|
# Fetched compressed messages may include additional records
|
||||||
if record.offset < fetch_offset:
|
if record.offset < fetch_offset:
|
||||||
|
log.debug("Skipping message offset: %s (expecting %s)",
|
||||||
|
record.offset, fetch_offset)
|
||||||
continue
|
continue
|
||||||
drained[tp].append(record)
|
drained[tp].append(record)
|
||||||
else:
|
else:
|
||||||
@@ -419,6 +421,9 @@ class Fetcher(six.Iterator):
|
|||||||
# Compressed messagesets may include earlier messages
|
# Compressed messagesets may include earlier messages
|
||||||
# It is also possible that the user called seek()
|
# It is also possible that the user called seek()
|
||||||
elif msg.offset != self._subscriptions.assignment[tp].position:
|
elif msg.offset != self._subscriptions.assignment[tp].position:
|
||||||
|
log.debug("Skipping message offset: %s (expecting %s)",
|
||||||
|
msg.offset,
|
||||||
|
self._subscriptions.assignment[tp].position)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self._subscriptions.assignment[tp].position = msg.offset + 1
|
self._subscriptions.assignment[tp].position = msg.offset + 1
|
||||||
|
|||||||
Reference in New Issue
Block a user