Merge pull request #500 from dpkp/kafka-2978
KAFKA-2978: consumer stops fetching when positions get out of sync
This commit is contained in:
@@ -209,11 +209,11 @@ class Fetcher(six.Iterator):
|
|||||||
log.debug("Ignoring fetched records for %s since it is no"
|
log.debug("Ignoring fetched records for %s since it is no"
|
||||||
" longer fetchable", partition)
|
" longer fetchable", partition)
|
||||||
continue
|
continue
|
||||||
consumed = self._subscriptions.assignment[partition].consumed
|
position = self._subscriptions.assignment[partition].position
|
||||||
# ignore partition if its consumed offset != offset in FetchResponse
|
# ignore partition if the current position != offset in FetchResponse
|
||||||
# e.g. after seek()
|
# e.g. after seek()
|
||||||
if consumed is not None and offset == consumed:
|
if position is not None and offset == position:
|
||||||
current_out_of_range_partitions[partition] = offset
|
current_out_of_range_partitions[partition] = position
|
||||||
|
|
||||||
self._offset_out_of_range_partitions.clear()
|
self._offset_out_of_range_partitions.clear()
|
||||||
if current_out_of_range_partitions:
|
if current_out_of_range_partitions:
|
||||||
@@ -290,31 +290,30 @@ class Fetcher(six.Iterator):
|
|||||||
" since it is no longer assigned", tp)
|
" since it is no longer assigned", tp)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# note that the consumed position should always be available
|
# note that the position should always be available
|
||||||
# as long as the partition is still assigned
|
# as long as the partition is still assigned
|
||||||
consumed = self._subscriptions.assignment[tp].consumed
|
position = self._subscriptions.assignment[tp].position
|
||||||
if not self._subscriptions.is_fetchable(tp):
|
if not self._subscriptions.is_fetchable(tp):
|
||||||
# this can happen when a partition consumption paused before
|
# this can happen when a partition is paused before
|
||||||
# fetched records are returned to the consumer's poll call
|
# fetched records are returned to the consumer's poll call
|
||||||
log.debug("Not returning fetched records for assigned partition"
|
log.debug("Not returning fetched records for assigned partition"
|
||||||
" %s since it is no longer fetchable", tp)
|
" %s since it is no longer fetchable", tp)
|
||||||
|
|
||||||
# we also need to reset the fetch positions to pretend we did
|
elif fetch_offset == position:
|
||||||
# not fetch this partition in the previous request at all
|
|
||||||
self._subscriptions.assignment[tp].fetched = consumed
|
|
||||||
elif fetch_offset == consumed:
|
|
||||||
next_offset = messages[-1][0] + 1
|
next_offset = messages[-1][0] + 1
|
||||||
log.debug("Returning fetched records for assigned partition %s"
|
log.debug("Returning fetched records at offset %d for assigned"
|
||||||
" and update consumed position to %s", tp, next_offset)
|
" partition %s and update position to %s", position,
|
||||||
self._subscriptions.assignment[tp].consumed = next_offset
|
tp, next_offset)
|
||||||
|
self._subscriptions.assignment[tp].position = next_offset
|
||||||
|
|
||||||
for record in self._unpack_message_set(tp, messages):
|
for record in self._unpack_message_set(tp, messages):
|
||||||
drained[tp].append(record)
|
drained[tp].append(record)
|
||||||
else:
|
else:
|
||||||
# these records aren't next in line based on the last consumed
|
# these records aren't next in line based on the last consumed
|
||||||
# position, ignore them they must be from an obsolete request
|
# position, ignore them they must be from an obsolete request
|
||||||
log.debug("Ignoring fetched records for %s at offset %s",
|
log.debug("Ignoring fetched records for %s at offset %s since"
|
||||||
tp, fetch_offset)
|
" the current position is %d", tp, fetch_offset,
|
||||||
|
position)
|
||||||
return dict(drained)
|
return dict(drained)
|
||||||
|
|
||||||
def _unpack_message_set(self, tp, messages):
|
def _unpack_message_set(self, tp, messages):
|
||||||
@@ -351,20 +350,16 @@ class Fetcher(six.Iterator):
|
|||||||
|
|
||||||
# note that the consumed position should always be available
|
# note that the consumed position should always be available
|
||||||
# as long as the partition is still assigned
|
# as long as the partition is still assigned
|
||||||
consumed = self._subscriptions.assignment[tp].consumed
|
position = self._subscriptions.assignment[tp].position
|
||||||
if not self._subscriptions.is_fetchable(tp):
|
if not self._subscriptions.is_fetchable(tp):
|
||||||
# this can happen when a partition consumption paused before
|
# this can happen when a partition consumption paused before
|
||||||
# fetched records are returned
|
# fetched records are returned
|
||||||
log.warning("Not returning fetched records for assigned partition"
|
log.warning("Not returning fetched records for assigned partition"
|
||||||
" %s since it is no longer fetchable", tp)
|
" %s since it is no longer fetchable", tp)
|
||||||
|
|
||||||
# we also need to reset the fetch positions to pretend we did
|
elif fetch_offset == position:
|
||||||
# not fetch this partition in the previous request at all
|
|
||||||
self._subscriptions.assignment[tp].fetched = consumed
|
|
||||||
|
|
||||||
elif fetch_offset == consumed:
|
|
||||||
for msg in self._unpack_message_set(tp, messages):
|
for msg in self._unpack_message_set(tp, messages):
|
||||||
self._subscriptions.assignment[tp].consumed = msg.offset + 1
|
self._subscriptions.assignment[tp].position = msg.offset + 1
|
||||||
yield msg
|
yield msg
|
||||||
else:
|
else:
|
||||||
# these records aren't next in line based on the last consumed
|
# these records aren't next in line based on the last consumed
|
||||||
@@ -494,19 +489,15 @@ class Fetcher(six.Iterator):
|
|||||||
# if there is a leader and no in-flight requests,
|
# if there is a leader and no in-flight requests,
|
||||||
# issue a new fetch but only fetch data for partitions whose
|
# issue a new fetch but only fetch data for partitions whose
|
||||||
# previously fetched data has been consumed
|
# previously fetched data has been consumed
|
||||||
fetched = self._subscriptions.assignment[partition].fetched
|
position = self._subscriptions.assignment[partition].position
|
||||||
consumed = self._subscriptions.assignment[partition].consumed
|
partition_info = (
|
||||||
if consumed == fetched:
|
partition.partition,
|
||||||
partition_info = (
|
position,
|
||||||
partition.partition,
|
self.config['max_partition_fetch_bytes']
|
||||||
fetched,
|
)
|
||||||
self.config['max_partition_fetch_bytes']
|
fetchable[node_id][partition.topic].append(partition_info)
|
||||||
)
|
log.debug("Adding fetch request for partition %d at offset %d",
|
||||||
fetchable[node_id][partition.topic].append(partition_info)
|
partition, position)
|
||||||
else:
|
|
||||||
log.debug("Skipping FetchRequest to %s because previously"
|
|
||||||
" fetched offsets (%s) have not been fully"
|
|
||||||
" consumed yet (%s)", node_id, fetched, consumed)
|
|
||||||
|
|
||||||
requests = {}
|
requests = {}
|
||||||
for node_id, partition_data in six.iteritems(fetchable):
|
for node_id, partition_data in six.iteritems(fetchable):
|
||||||
@@ -541,15 +532,12 @@ class Fetcher(six.Iterator):
|
|||||||
|
|
||||||
# we are interested in this fetch only if the beginning
|
# we are interested in this fetch only if the beginning
|
||||||
# offset matches the current consumed position
|
# offset matches the current consumed position
|
||||||
consumed = self._subscriptions.assignment[tp].consumed
|
position = self._subscriptions.assignment[tp].position
|
||||||
if consumed is None:
|
if position is None or position != fetch_offset:
|
||||||
continue
|
log.debug("Discarding fetch response for partition %s"
|
||||||
elif consumed != fetch_offset:
|
" since its offset %d does not match the"
|
||||||
# the fetched position has gotten out of sync with the
|
" expected offset %d", tp, fetch_offset,
|
||||||
# consumed position (which might happen when a
|
position)
|
||||||
# rebalance occurs with a fetch in-flight), so we need
|
|
||||||
# to reset the fetch position so the next fetch is right
|
|
||||||
self._subscriptions.assignment[tp].fetched = consumed
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
partial = None
|
partial = None
|
||||||
@@ -557,9 +545,11 @@ class Fetcher(six.Iterator):
|
|||||||
partial = messages.pop()
|
partial = messages.pop()
|
||||||
|
|
||||||
if messages:
|
if messages:
|
||||||
last_offset, _, _ = messages[-1]
|
log.debug("Adding fetched record for partition %s with"
|
||||||
self._subscriptions.assignment[tp].fetched = last_offset + 1
|
" offset %d to buffered record list", tp,
|
||||||
|
position)
|
||||||
self._records.append((fetch_offset, tp, messages))
|
self._records.append((fetch_offset, tp, messages))
|
||||||
|
#last_offset, _, _ = messages[-1]
|
||||||
#self.sensors.records_fetch_lag.record(highwater - last_offset)
|
#self.sensors.records_fetch_lag.record(highwater - last_offset)
|
||||||
elif partial:
|
elif partial:
|
||||||
# we did not read a single message from a non-empty
|
# we did not read a single message from a non-empty
|
||||||
@@ -581,7 +571,7 @@ class Fetcher(six.Iterator):
|
|||||||
else:
|
else:
|
||||||
self._offset_out_of_range_partitions[tp] = fetch_offset
|
self._offset_out_of_range_partitions[tp] = fetch_offset
|
||||||
log.info("Fetch offset %s is out of range, resetting offset",
|
log.info("Fetch offset %s is out of range, resetting offset",
|
||||||
self._subscriptions.assignment[tp].fetched)
|
fetch_offset)
|
||||||
elif error_type is Errors.TopicAuthorizationFailedError:
|
elif error_type is Errors.TopicAuthorizationFailedError:
|
||||||
log.warn("Not authorized to read from topic %s.", tp.topic)
|
log.warn("Not authorized to read from topic %s.", tp.topic)
|
||||||
self._unauthorized_topics.add(tp.topic)
|
self._unauthorized_topics.add(tp.topic)
|
||||||
|
|||||||
@@ -452,10 +452,10 @@ class KafkaConsumer(six.Iterator):
|
|||||||
"""
|
"""
|
||||||
assert self._subscription.is_assigned(partition)
|
assert self._subscription.is_assigned(partition)
|
||||||
|
|
||||||
offset = self._subscription.assignment[partition].consumed
|
offset = self._subscription.assignment[partition].position
|
||||||
if offset is None:
|
if offset is None:
|
||||||
self._update_fetch_positions(partition)
|
self._update_fetch_positions(partition)
|
||||||
offset = self._subscription.assignment[partition].consumed
|
offset = self._subscription.assignment[partition].position
|
||||||
return offset
|
return offset
|
||||||
|
|
||||||
def pause(self, *partitions):
|
def pause(self, *partitions):
|
||||||
|
|||||||
@@ -279,7 +279,7 @@ class SubscriptionState(object):
|
|||||||
all_consumed = {}
|
all_consumed = {}
|
||||||
for partition, state in six.iteritems(self.assignment):
|
for partition, state in six.iteritems(self.assignment):
|
||||||
if state.has_valid_position:
|
if state.has_valid_position:
|
||||||
all_consumed[partition] = OffsetAndMetadata(state.consumed, '')
|
all_consumed[partition] = OffsetAndMetadata(state.position, '')
|
||||||
return all_consumed
|
return all_consumed
|
||||||
|
|
||||||
def need_offset_reset(self, partition, offset_reset_strategy=None):
|
def need_offset_reset(self, partition, offset_reset_strategy=None):
|
||||||
@@ -335,41 +335,29 @@ class SubscriptionState(object):
|
|||||||
class TopicPartitionState(object):
|
class TopicPartitionState(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.committed = None # last committed position
|
self.committed = None # last committed position
|
||||||
self.has_valid_position = False # whether we have valid consumed and fetched positions
|
self.has_valid_position = False # whether we have valid position
|
||||||
self.paused = False # whether this partition has been paused by the user
|
self.paused = False # whether this partition has been paused by the user
|
||||||
self.awaiting_reset = False # whether we are awaiting reset
|
self.awaiting_reset = False # whether we are awaiting reset
|
||||||
self.reset_strategy = None # the reset strategy if awaitingReset is set
|
self.reset_strategy = None # the reset strategy if awaitingReset is set
|
||||||
self._consumed = None # offset exposed to the user
|
self._position = None # offset exposed to the user
|
||||||
self._fetched = None # current fetch position
|
|
||||||
|
|
||||||
def _set_fetched(self, offset):
|
def _set_position(self, offset):
|
||||||
assert self.has_valid_position, 'Valid consumed/fetch position required'
|
assert self.has_valid_position, 'Valid position required'
|
||||||
self._fetched = offset
|
self._position = offset
|
||||||
|
|
||||||
def _get_fetched(self):
|
def _get_position(self):
|
||||||
return self._fetched
|
return self._position
|
||||||
|
|
||||||
fetched = property(_get_fetched, _set_fetched, None, "current fetch position")
|
position = property(_get_position, _set_position, None, "last position")
|
||||||
|
|
||||||
def _set_consumed(self, offset):
|
|
||||||
assert self.has_valid_position, 'Valid consumed/fetch position required'
|
|
||||||
self._consumed = offset
|
|
||||||
|
|
||||||
def _get_consumed(self):
|
|
||||||
return self._consumed
|
|
||||||
|
|
||||||
consumed = property(_get_consumed, _set_consumed, None, "last consumed position")
|
|
||||||
|
|
||||||
def await_reset(self, strategy):
|
def await_reset(self, strategy):
|
||||||
self.awaiting_reset = True
|
self.awaiting_reset = True
|
||||||
self.reset_strategy = strategy
|
self.reset_strategy = strategy
|
||||||
self._consumed = None
|
self._position = None
|
||||||
self._fetched = None
|
|
||||||
self.has_valid_position = False
|
self.has_valid_position = False
|
||||||
|
|
||||||
def seek(self, offset):
|
def seek(self, offset):
|
||||||
self._consumed = offset
|
self._position = offset
|
||||||
self._fetched = offset
|
|
||||||
self.awaiting_reset = False
|
self.awaiting_reset = False
|
||||||
self.reset_strategy = None
|
self.reset_strategy = None
|
||||||
self.has_valid_position = True
|
self.has_valid_position = True
|
||||||
|
|||||||
@@ -146,7 +146,8 @@ class ConsumerCoordinator(BaseCoordinator):
|
|||||||
old_partitions_per_topic = self._partitions_per_topic
|
old_partitions_per_topic = self._partitions_per_topic
|
||||||
self._partitions_per_topic = {}
|
self._partitions_per_topic = {}
|
||||||
for topic in self._subscription.group_subscription():
|
for topic in self._subscription.group_subscription():
|
||||||
self._partitions_per_topic[topic] = set(self._cluster.partitions_for_topic(topic))
|
partitions = self._cluster.partitions_for_topic(topic) or []
|
||||||
|
self._partitions_per_topic[topic] = set(partitions)
|
||||||
|
|
||||||
if self._partitions_per_topic != old_partitions_per_topic:
|
if self._partitions_per_topic != old_partitions_per_topic:
|
||||||
return True
|
return True
|
||||||
|
|||||||
Reference in New Issue
Block a user