Fetcher logging should be debug or trace (left higher during testing)
This commit is contained in:
@@ -301,9 +301,9 @@ class Fetcher(six.Iterator):
|
||||
|
||||
elif fetch_offset == position:
|
||||
next_offset = messages[-1][0] + 1
|
||||
log.debug("Returning fetched records at offset %d for assigned"
|
||||
" partition %s and update position to %s", position,
|
||||
tp, next_offset)
|
||||
log.log(0, "Returning fetched records at offset %d for assigned"
|
||||
" partition %s and update position to %s", position,
|
||||
tp, next_offset)
|
||||
self._subscriptions.assignment[tp].position = next_offset
|
||||
|
||||
for record in self._unpack_message_set(tp, messages):
|
||||
@@ -344,8 +344,8 @@ class Fetcher(six.Iterator):
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
# this can happen when a rebalance happened before
|
||||
# fetched records are returned
|
||||
log.warning("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
log.debug("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
continue
|
||||
|
||||
# note that the consumed position should always be available
|
||||
@@ -354,18 +354,20 @@ class Fetcher(six.Iterator):
|
||||
if not self._subscriptions.is_fetchable(tp):
|
||||
# this can happen when a partition consumption paused before
|
||||
# fetched records are returned
|
||||
log.warning("Not returning fetched records for assigned partition"
|
||||
" %s since it is no longer fetchable", tp)
|
||||
log.debug("Not returning fetched records for assigned partition"
|
||||
" %s since it is no longer fetchable", tp)
|
||||
|
||||
elif fetch_offset == position:
|
||||
log.log(0, "Returning fetched records at offset %d for assigned"
|
||||
" partition %s", position, tp)
|
||||
for msg in self._unpack_message_set(tp, messages):
|
||||
self._subscriptions.assignment[tp].position = msg.offset + 1
|
||||
yield msg
|
||||
else:
|
||||
# these records aren't next in line based on the last consumed
|
||||
# position, ignore them they must be from an obsolete request
|
||||
log.warning("Ignoring fetched records for %s at offset %s",
|
||||
tp, fetch_offset)
|
||||
log.debug("Ignoring fetched records for %s at offset %s",
|
||||
tp, fetch_offset)
|
||||
|
||||
# Send any additional FetchRequests that we can now
|
||||
# this will likely fetch each partition individually, rather than
|
||||
|
||||
@@ -639,6 +639,7 @@ class KafkaConsumer(six.Iterator):
|
||||
for msg in self._fetcher:
|
||||
yield msg
|
||||
if time.time() > timeout_at:
|
||||
log.debug("internal iterator timeout - breaking for poll")
|
||||
break
|
||||
|
||||
def __iter__(self): # pylint: disable=non-iterator-returned
|
||||
|
||||
Reference in New Issue
Block a user