Add KafkaConsumer.highwater(partition) to get highwater offsets from FetchResponses

This commit is contained in:
Dana Powers
2016-01-31 23:46:31 -08:00
parent 33b9ff2e33
commit 843b34732d
3 changed files with 26 additions and 1 deletions

View File

@@ -574,11 +574,13 @@ class Fetcher(six.Iterator):
# consumption paused while fetch is still in-flight
log.debug("Ignoring fetched records for partition %s"
" since it is no longer fetchable", tp)
elif error_type is Errors.NoError:
fetch_offset = fetch_offsets[tp]
self._subscriptions.assignment[tp].highwater = highwater
# we are interested in this fetch only if the beginning
# offset matches the current consumed position
fetch_offset = fetch_offsets[tp]
position = self._subscriptions.assignment[tp].position
if position is None or position != fetch_offset:
log.debug("Discarding fetch response for partition %s"

View File

@@ -463,6 +463,28 @@ class KafkaConsumer(six.Iterator):
offset = self._subscription.assignment[partition].position
return offset
def highwater(self, partition):
"""Last known highwater offset for a partition
A highwater offset is the offset that will be assigned to the next
message that is produced. It may be useful for calculating lag, by
comparing with the reported position. Note that both position and
highwater refer to the *next* offset -- i.e., highwater offset is
one greater than the newest availabel message.
Highwater offsets are returned in FetchResponse messages, so will
not be available if not FetchRequests have been sent for this partition
yet.
Arguments:
partition (TopicPartition): partition to check
Returns:
int or None: offset if available
"""
assert self._subscription.is_assigned(partition), 'Partition is not assigned'
return self._subscription.assignment[partition].highwater
def pause(self, *partitions):
"""Suspend fetching from the requested partitions.

View File

@@ -340,6 +340,7 @@ class TopicPartitionState(object):
self.awaiting_reset = False # whether we are awaiting reset
self.reset_strategy = None # the reset strategy if awaitingReset is set
self._position = None # offset exposed to the user
self.highwater = None
def _set_position(self, offset):
assert self.has_valid_position, 'Valid position required'