Revert consumer iterators from max_poll_records (#856)

This commit is contained in:
Dana Powers
2016-10-24 11:24:16 -07:00
committed by GitHub
parent 8de40a20d9
commit 77591afa78
2 changed files with 180 additions and 14 deletions

View File

@@ -94,6 +94,7 @@ class Fetcher(six.Iterator):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
@@ -375,6 +376,90 @@ class Fetcher(six.Iterator):
part.discard()
return 0
def _message_generator(self):
"""Iterate over fetched_records"""
if self._subscriptions.needs_partition_assignment:
raise StopIteration('Subscription needs partition assignment')
while self._records:
# Check on each iteration since this is a generator
self._raise_if_offset_out_of_range()
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
# Send additional FetchRequests when the internal queue is low
# this should enable moderate pipelining
if len(self._records) <= self.config['iterator_refetch_records']:
self.send_fetches()
part = self._records.popleft()
tp = part.topic_partition
fetch_offset = part.fetch_offset
if not self._subscriptions.is_assigned(tp):
# this can happen when a rebalance happened before
# fetched records are returned
log.debug("Not returning fetched records for partition %s"
" since it is no longer assigned", tp)
continue
# note that the position should always be available
# as long as the partition is still assigned
position = self._subscriptions.assignment[tp].position
if not self._subscriptions.is_fetchable(tp):
# this can happen when a partition is paused before
# fetched records are returned
log.debug("Not returning fetched records for assigned partition"
" %s since it is no longer fetchable", tp)
elif fetch_offset == position:
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s", position, tp)
# We can ignore any prior signal to drop pending message sets
# because we are starting from a fresh one where fetch_offset == position
# i.e., the user seek()'d to this position
self._subscriptions.assignment[tp].drop_pending_message_set = False
for msg in part.messages:
# Because we are in a generator, it is possible for
# subscription state to change between yield calls
# so we need to re-check on each loop
# this should catch assignment changes, pauses
# and resets via seek_to_beginning / seek_to_end
if not self._subscriptions.is_fetchable(tp):
log.debug("Not returning fetched records for partition %s"
" since it is no longer fetchable", tp)
break
# If there is a seek during message iteration,
# we should stop unpacking this message set and
# wait for a new fetch response that aligns with the
# new seek position
elif self._subscriptions.assignment[tp].drop_pending_message_set:
log.debug("Skipping remainder of message set for partition %s", tp)
self._subscriptions.assignment[tp].drop_pending_message_set = False
break
# Compressed messagesets may include earlier messages
elif msg.offset < self._subscriptions.assignment[tp].position:
log.debug("Skipping message offset: %s (expecting %s)",
msg.offset,
self._subscriptions.assignment[tp].position)
continue
self._subscriptions.assignment[tp].position = msg.offset + 1
yield msg
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
log.debug("Ignoring fetched records for %s at offset %s since"
" the current position is %d", tp, part.fetch_offset,
position)
def _unpack_message_set(self, tp, messages):
try:
for offset, size, msg in messages:
@@ -448,13 +533,13 @@ class Fetcher(six.Iterator):
return self
def __next__(self):
ret, _ = self.fetched_records(max_records=1)
if not ret:
raise StopIteration
assert len(ret) == 1
(messages,) = ret.values()
assert len(messages) == 1
return messages[0]
if not self._iterator:
self._iterator = self._message_generator()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise
def _deserialize(self, msg):
if self.config['key_deserializer']:

View File

@@ -298,6 +298,8 @@ class KafkaConsumer(six.Iterator):
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
self._iterator = None
self._consumer_timeout = float('inf')
if topics:
self._subscription.subscribe(topics=topics)
@@ -835,17 +837,96 @@ class KafkaConsumer(six.Iterator):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
if self._use_consumer_group():
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
# fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = 0
self._client.poll(timeout_ms=poll_ms, sleep=True)
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()
# Because the consumer client poll does not sleep unless blocking on
# network IO, we need to explicitly sleep when we know we are idle
# because we haven't been assigned any partitions to fetch / consume
if self._use_consumer_group() and not self.assignment():
sleep_time = max(timeout_at - time.time(), 0)
if sleep_time > 0 and not self._client.in_flight_request_count():
log.debug('No partitions assigned; sleeping for %s', sleep_time)
time.sleep(sleep_time)
continue
# Short-circuit the fetch iterator if we are already timed out
# to avoid any unintentional interaction with fetcher setup
if time.time() > timeout_at:
continue
for msg in self._fetcher:
yield msg
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
# an else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
# and we assume that it is safe to init_fetches when fetcher is done
# i.e., there are no more records stored internally
else:
self._fetcher.send_fetches()
def _next_timeout(self):
timeout = min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
# Although the delayed_tasks timeout above should cover processing
# HeartbeatRequests, it is still possible that HeartbeatResponses
# are left unprocessed during a long _fetcher iteration without
# an intermediate poll(). And because tasks are responsible for
# rescheduling themselves, an unprocessed response will prevent
# the next heartbeat from being sent. This check should help
# avoid that.
if self._use_consumer_group():
heartbeat = time.time() + self._coordinator.heartbeat.ttl()
timeout = min(timeout, heartbeat)
return timeout
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1)
if not ret:
raise StopIteration
assert len(ret) == 1
(messages,) = ret.values()
assert len(messages) == 1
return messages[0]
if not self._iterator:
self._iterator = self._message_generator()
self._set_consumer_timeout()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise
def _set_consumer_timeout(self):
# consumer_timeout_ms can be used to stop iteration early
if self.config['consumer_timeout_ms'] >= 0:
self._consumer_timeout = time.time() + (
self.config['consumer_timeout_ms'] / 1000.0)
# old KafkaConsumer methods are deprecated
def configure(self, **configs):