KAFKA-3007: KafkaConsumer max_poll_records (#831)

This commit is contained in:
Dana Powers
2016-09-28 12:39:34 -07:00
committed by GitHub
parent b8717b4b79
commit 9ee77dfdbc
5 changed files with 123 additions and 262 deletions

View File

@@ -4,6 +4,7 @@ import collections
import copy
import logging
import random
import sys
import time
from kafka.vendor import six
@@ -39,6 +40,7 @@ class Fetcher(six.Iterator):
'fetch_min_bytes': 1,
'fetch_max_wait_ms': 500,
'max_partition_fetch_bytes': 1048576,
'max_poll_records': sys.maxsize,
'check_crcs': True,
'skip_double_compressed_messages': False,
'iterator_refetch_records': 1, # undocumented -- interface may change
@@ -92,11 +94,10 @@ class Fetcher(six.Iterator):
self._unauthorized_topics = set()
self._offset_out_of_range_partitions = dict() # {topic_partition: offset}
self._record_too_large_partitions = dict() # {topic_partition: offset}
self._iterator = None
self._fetch_futures = collections.deque()
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
def init_fetches(self):
def send_fetches(self):
"""Send FetchRequests asynchronously for all assigned partitions.
Note: noop if there are unconsumed records internal to the fetcher
@@ -104,16 +105,6 @@ class Fetcher(six.Iterator):
Returns:
List of Futures: each future resolves to a FetchResponse
"""
# We need to be careful when creating fetch records during iteration
# so we verify that there are no records in the deque, or in an
# iterator
if self._records or self._iterator:
log.debug('Skipping init_fetches because there are unconsumed'
' records internally')
return []
return self._init_fetches()
def _init_fetches(self):
futures = []
for node_id, request in six.iteritems(self._create_fetch_requests()):
if self._client.ready(node_id):
@@ -291,10 +282,12 @@ class Fetcher(six.Iterator):
copied_record_too_large_partitions,
self.config['max_partition_fetch_bytes'])
def fetched_records(self):
def fetched_records(self, max_records=None):
"""Returns previously fetched records and updates consumed offsets.
Incompatible with iterator interface - use one or the other, not both.
Arguments:
max_records (int): Maximum number of records returned. Defaults
to max_poll_records configuration.
Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
@@ -304,32 +297,44 @@ class Fetcher(six.Iterator):
configured max_partition_fetch_bytes
TopicAuthorizationError: if consumer is not authorized to fetch
messages from the topic
AssertionError: if used with iterator (incompatible)
Returns:
dict: {TopicPartition: [messages]}
Returns: (records (dict), partial (bool))
records: {TopicPartition: [messages]}
partial: True if records returned did not fully drain any pending
partition requests. This may be useful for choosing when to
pipeline additional fetch requests.
"""
assert self._iterator is None, (
'fetched_records is incompatible with message iterator')
if self._subscriptions.needs_partition_assignment:
return {}
if max_records is None:
max_records = self.config['max_poll_records']
assert max_records > 0
if self._subscriptions.needs_partition_assignment:
return {}, False
drained = collections.defaultdict(list)
self._raise_if_offset_out_of_range()
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
# Loop over the records deque
while self._records:
(fetch_offset, tp, messages) = self._records.popleft()
if not self._subscriptions.is_assigned(tp):
# this can happen when a rebalance happened before
# fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition %s"
" since it is no longer assigned", tp)
continue
drained = collections.defaultdict(list)
partial = bool(self._records and max_records)
while self._records and max_records > 0:
part = self._records.popleft()
max_records -= self._append(drained, part, max_records)
if part.has_more():
self._records.appendleft(part)
else:
partial &= False
return dict(drained), partial
def _append(self, drained, part, max_records):
tp = part.topic_partition
fetch_offset = part.fetch_offset
if not self._subscriptions.is_assigned(tp):
# this can happen when a rebalance happened before
# fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition %s"
" since it is no longer assigned", tp)
else:
# note that the position should always be available
# as long as the partition is still assigned
position = self._subscriptions.assignment[tp].position
@@ -340,26 +345,35 @@ class Fetcher(six.Iterator):
" %s since it is no longer fetchable", tp)
elif fetch_offset == position:
next_offset = messages[-1][0] + 1
part_records = part.take(max_records)
if not part_records:
return 0
next_offset = part_records[-1].offset + 1
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s and update position to %s", position,
tp, next_offset)
self._subscriptions.assignment[tp].position = next_offset
for record in self._unpack_message_set(tp, messages):
for record in part_records:
# Fetched compressed messages may include additional records
if record.offset < fetch_offset:
log.debug("Skipping message offset: %s (expecting %s)",
record.offset, fetch_offset)
continue
drained[tp].append(record)
self._subscriptions.assignment[tp].position = next_offset
return len(part_records)
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
log.debug("Ignoring fetched records for %s at offset %s since"
" the current position is %d", tp, fetch_offset,
" the current position is %d", tp, part.fetch_offset,
position)
return dict(drained)
part.discard()
return 0
def _unpack_message_set(self, tp, messages):
try:
@@ -430,97 +444,17 @@ class Fetcher(six.Iterator):
log.exception('StopIteration raised unpacking messageset: %s', e)
raise Exception('StopIteration raised unpacking messageset')
def _message_generator(self):
"""Iterate over fetched_records"""
if self._subscriptions.needs_partition_assignment:
raise StopIteration('Subscription needs partition assignment')
while self._records:
# Check on each iteration since this is a generator
self._raise_if_offset_out_of_range()
self._raise_if_unauthorized_topics()
self._raise_if_record_too_large()
# Send additional FetchRequests when the internal queue is low
# this should enable moderate pipelining
if len(self._records) <= self.config['iterator_refetch_records']:
self._init_fetches()
(fetch_offset, tp, messages) = self._records.popleft()
if not self._subscriptions.is_assigned(tp):
# this can happen when a rebalance happened before
# fetched records are returned
log.debug("Not returning fetched records for partition %s"
" since it is no longer assigned", tp)
continue
# note that the consumed position should always be available
# as long as the partition is still assigned
position = self._subscriptions.assignment[tp].position
if not self._subscriptions.is_fetchable(tp):
# this can happen when a partition consumption paused before
# fetched records are returned
log.debug("Not returning fetched records for assigned partition"
" %s since it is no longer fetchable", tp)
elif fetch_offset == position:
log.log(0, "Returning fetched records at offset %d for assigned"
" partition %s", position, tp)
# We can ignore any prior signal to drop pending message sets
# because we are starting from a fresh one where fetch_offset == position
# i.e., the user seek()'d to this position
self._subscriptions.assignment[tp].drop_pending_message_set = False
for msg in self._unpack_message_set(tp, messages):
# Because we are in a generator, it is possible for
# subscription state to change between yield calls
# so we need to re-check on each loop
# this should catch assignment changes, pauses
# and resets via seek_to_beginning / seek_to_end
if not self._subscriptions.is_fetchable(tp):
log.debug("Not returning fetched records for partition %s"
" since it is no longer fetchable", tp)
break
# If there is a seek during message iteration,
# we should stop unpacking this message set and
# wait for a new fetch response that aligns with the
# new seek position
elif self._subscriptions.assignment[tp].drop_pending_message_set:
log.debug("Skipping remainder of message set for partition %s", tp)
self._subscriptions.assignment[tp].drop_pending_message_set = False
break
# Compressed messagesets may include earlier messages
elif msg.offset < self._subscriptions.assignment[tp].position:
log.debug("Skipping message offset: %s (expecting %s)",
msg.offset,
self._subscriptions.assignment[tp].position)
continue
self._subscriptions.assignment[tp].position = msg.offset + 1
yield msg
else:
# these records aren't next in line based on the last consumed
# position, ignore them they must be from an obsolete request
log.debug("Ignoring fetched records for %s at offset %s",
tp, fetch_offset)
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
if not self._iterator:
self._iterator = self._message_generator()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise
ret, _ = self.fetched_records(max_records=1)
if not ret:
raise StopIteration
assert len(ret) == 1
(messages,) = ret.values()
assert len(messages) == 1
return messages[0]
def _deserialize(self, msg):
if self.config['key_deserializer']:
@@ -601,6 +535,11 @@ class Fetcher(six.Iterator):
" %s", partition, error_type)
future.failure(error_type(partition))
def _fetchable_partitions(self):
fetchable = self._subscriptions.fetchable_partitions()
pending = set([part.topic_partition for part in self._records])
return fetchable.difference(pending)
def _create_fetch_requests(self):
"""Create fetch requests for all assigned partitions, grouped by node.
@@ -613,24 +552,17 @@ class Fetcher(six.Iterator):
# which can be passed to FetchRequest() via .items()
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
# avoid re-fetching pending offsets
pending = set()
for fetch_offset, tp, _ in self._records:
pending.add((tp, fetch_offset))
for partition in self._subscriptions.fetchable_partitions():
for partition in self._fetchable_partitions():
node_id = self._client.cluster.leader_for_partition(partition)
position = self._subscriptions.assignment[partition].position
# fetch if there is a leader, no in-flight requests, and no _records
# fetch if there is a leader and no in-flight requests
if node_id is None or node_id == -1:
log.debug("No leader found for partition %s."
" Requesting metadata update", partition)
self._client.cluster.request_update()
elif ((partition, position) not in pending and
self._client.in_flight_request_count(node_id) == 0):
elif self._client.in_flight_request_count(node_id) == 0:
partition_info = (
partition.partition,
position,
@@ -704,7 +636,8 @@ class Fetcher(six.Iterator):
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
position)
self._records.append((fetch_offset, tp, messages))
unpacked = list(self._unpack_message_set(tp, messages))
self._records.append(self.PartitionRecords(fetch_offset, tp, unpacked))
last_offset, _, _ = messages[-1]
self._sensors.records_fetch_lag.record(highwater - last_offset)
num_bytes = sum(msg[1] for msg in messages)
@@ -744,6 +677,29 @@ class Fetcher(six.Iterator):
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
class PartitionRecords(six.Iterator):
def __init__(self, fetch_offset, tp, messages):
self.fetch_offset = fetch_offset
self.topic_partition = tp
self.messages = messages
self.message_idx = 0
def discard(self):
self.messages = None
def take(self, n):
if not self.has_more():
return []
next_idx = self.message_idx + n
res = self.messages[self.message_idx:next_idx]
self.message_idx = next_idx
if self.has_more():
self.fetch_offset = self.messages[self.message_idx].offset
return res
def has_more(self):
return self.message_idx < len(self.messages)
class FetchManagerMetrics(object):
def __init__(self, metrics, prefix):

View File

@@ -3,6 +3,7 @@ from __future__ import absolute_import
import copy
import logging
import socket
import sys
import time
from kafka.vendor import six
@@ -115,6 +116,7 @@ class KafkaConsumer(six.Iterator):
rebalances. Default: 3000
session_timeout_ms (int): The timeout used to detect failures when
using Kafka's group managementment facilities. Default: 30000
max_poll_records (int): ....
receive_buffer_bytes (int): The size of the TCP receive buffer
(SO_RCVBUF) to use when reading data. Default: None (relies on
system defaults). The java client defaults to 32768.
@@ -126,7 +128,7 @@ class KafkaConsumer(six.Iterator):
[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]
consumer_timeout_ms (int): number of milliseconds to block during
message iteration before raising StopIteration (i.e., ending the
iterator). Default -1 (block forever).
iterator). Default block forever [float('inf')].
skip_double_compressed_messages (bool): A bug in KafkaProducer <= 1.2.4
caused some messages to be corrupted via double-compression.
By default, the fetcher will return these messages as a compressed
@@ -220,10 +222,11 @@ class KafkaConsumer(six.Iterator):
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
'max_poll_records': sys.maxsize,
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
'consumer_timeout_ms': -1,
'consumer_timeout_ms': float('inf'),
'skip_double_compressed_messages': False,
'security_protocol': 'PLAINTEXT',
'ssl_context': None,
@@ -295,8 +298,6 @@ class KafkaConsumer(six.Iterator):
assignors=self.config['partition_assignment_strategy'],
**self.config)
self._closed = False
self._iterator = None
self._consumer_timeout = float('inf')
if topics:
self._subscription.subscribe(topics=topics)
@@ -483,7 +484,7 @@ class KafkaConsumer(six.Iterator):
"""
return self._client.cluster.partitions_for_topic(topic)
def poll(self, timeout_ms=0):
def poll(self, timeout_ms=0, max_records=None):
"""Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
@@ -505,19 +506,15 @@ class KafkaConsumer(six.Iterator):
subscribed list of topics and partitions
"""
assert timeout_ms >= 0, 'Timeout must not be negative'
assert self._iterator is None, 'Incompatible with iterator interface'
if max_records is None:
max_records = self.config['max_poll_records']
# poll for new data until the timeout expires
start = time.time()
remaining = timeout_ms
while True:
records = self._poll_once(remaining)
records = self._poll_once(remaining, max_records)
if records:
# before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
# responses to enable pipelining while the user is handling the
# fetched records.
self._fetcher.init_fetches()
return records
elapsed_ms = (time.time() - start) * 1000
@@ -526,7 +523,7 @@ class KafkaConsumer(six.Iterator):
if remaining <= 0:
return {}
def _poll_once(self, timeout_ms):
def _poll_once(self, timeout_ms, max_records):
"""
Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
@@ -545,23 +542,29 @@ class KafkaConsumer(six.Iterator):
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
# fetch positions if we have partitions we're subscribed to that we
# don't know the offset for
if not self._subscription.has_all_fetch_positions():
self._update_fetch_positions(self._subscription.missing_fetch_positions())
# init any new fetches (won't resend pending fetches)
records = self._fetcher.fetched_records()
# if data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
records, partial = self._fetcher.fetched_records(max_records)
if records:
# before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
# responses to enable pipelining while the user is handling the
# fetched records.
if not partial:
self._fetcher.send_fetches()
return records
self._fetcher.init_fetches()
# send any new fetches (won't resend pending fetches)
self._fetcher.send_fetches()
self._client.poll(timeout_ms=timeout_ms, sleep=True)
return self._fetcher.fetched_records()
records, _ = self._fetcher.fetched_records(max_records)
return records
def position(self, partition):
"""Get the offset of the next record that will be fetched
@@ -832,96 +835,17 @@ class KafkaConsumer(six.Iterator):
# then do any offset lookups in case some positions are not known
self._fetcher.update_fetch_positions(partitions)
def _message_generator(self):
assert self.assignment() or self.subscription() is not None, 'No topic subscription or manual partition assignment'
while time.time() < self._consumer_timeout:
if self._use_consumer_group():
self._coordinator.ensure_coordinator_known()
self._coordinator.ensure_active_group()
# 0.8.2 brokers support kafka-backed offset storage via group coordinator
elif self.config['group_id'] is not None and self.config['api_version'] >= (0, 8, 2):
self._coordinator.ensure_coordinator_known()
# fetch offsets for any subscribed partitions that we arent tracking yet
if not self._subscription.has_all_fetch_positions():
partitions = self._subscription.missing_fetch_positions()
self._update_fetch_positions(partitions)
poll_ms = 1000 * (self._consumer_timeout - time.time())
if not self._fetcher.in_flight_fetches():
poll_ms = 0
self._client.poll(timeout_ms=poll_ms, sleep=True)
# We need to make sure we at least keep up with scheduled tasks,
# like heartbeats, auto-commits, and metadata refreshes
timeout_at = self._next_timeout()
# Because the consumer client poll does not sleep unless blocking on
# network IO, we need to explicitly sleep when we know we are idle
# because we haven't been assigned any partitions to fetch / consume
if self._use_consumer_group() and not self.assignment():
sleep_time = max(timeout_at - time.time(), 0)
if sleep_time > 0 and not self._client.in_flight_request_count():
log.debug('No partitions assigned; sleeping for %s', sleep_time)
time.sleep(sleep_time)
continue
# Short-circuit the fetch iterator if we are already timed out
# to avoid any unintentional interaction with fetcher setup
if time.time() > timeout_at:
continue
for msg in self._fetcher:
yield msg
if time.time() > timeout_at:
log.debug("internal iterator timeout - breaking for poll")
break
# an else block on a for loop only executes if there was no break
# so this should only be called on a StopIteration from the fetcher
# and we assume that it is safe to init_fetches when fetcher is done
# i.e., there are no more records stored internally
else:
self._fetcher.init_fetches()
def _next_timeout(self):
timeout = min(self._consumer_timeout,
self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
# Although the delayed_tasks timeout above should cover processing
# HeartbeatRequests, it is still possible that HeartbeatResponses
# are left unprocessed during a long _fetcher iteration without
# an intermediate poll(). And because tasks are responsible for
# rescheduling themselves, an unprocessed response will prevent
# the next heartbeat from being sent. This check should help
# avoid that.
if self._use_consumer_group():
heartbeat = time.time() + self._coordinator.heartbeat.ttl()
timeout = min(timeout, heartbeat)
return timeout
def __iter__(self): # pylint: disable=non-iterator-returned
return self
def __next__(self):
if not self._iterator:
self._iterator = self._message_generator()
self._set_consumer_timeout()
try:
return next(self._iterator)
except StopIteration:
self._iterator = None
raise
def _set_consumer_timeout(self):
# consumer_timeout_ms can be used to stop iteration early
if self.config['consumer_timeout_ms'] >= 0:
self._consumer_timeout = time.time() + (
self.config['consumer_timeout_ms'] / 1000.0)
ret = self.poll(timeout_ms=self.config['consumer_timeout_ms'], max_records=1)
if not ret:
raise StopIteration
assert len(ret) == 1
(messages,) = ret.values()
assert len(messages) == 1
return messages[0]
# old KafkaConsumer methods are deprecated
def configure(self, **configs):

View File

@@ -141,11 +141,3 @@ def test_paused(kafka_broker, topic):
consumer.unsubscribe()
assert set() == consumer.paused()
def test_heartbeat_timeout(conn, mocker):
mocker.patch('kafka.client_async.KafkaClient.check_version', return_value = (0, 9))
mocker.patch('time.time', return_value = 1234)
consumer = KafkaConsumer('foobar')
mocker.patch.object(consumer._coordinator.heartbeat, 'ttl', return_value = 0)
assert consumer._next_timeout() == 1234

View File

@@ -500,6 +500,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='earliest',
enable_auto_commit=False,
consumer_timeout_ms=TIMEOUT_MS)
# Manual assignment avoids overhead of consumer group mgmt

View File

@@ -33,7 +33,7 @@ def fetcher(client, subscription_state):
return Fetcher(client, subscription_state, Metrics())
def test_init_fetches(fetcher, mocker):
def test_send_fetches(fetcher, mocker):
fetch_requests = [
FetchRequest[0](
-1, fetcher.config['fetch_max_wait_ms'],
@@ -53,19 +53,7 @@ def test_init_fetches(fetcher, mocker):
mocker.patch.object(fetcher, '_create_fetch_requests',
return_value = dict(enumerate(fetch_requests)))
fetcher._records.append('foobar')
ret = fetcher.init_fetches()
assert fetcher._create_fetch_requests.call_count == 0
assert ret == []
fetcher._records.clear()
fetcher._iterator = 'foo'
ret = fetcher.init_fetches()
assert fetcher._create_fetch_requests.call_count == 0
assert ret == []
fetcher._iterator = None
ret = fetcher.init_fetches()
ret = fetcher.send_fetches()
for node, request in enumerate(fetch_requests):
fetcher._client.send.assert_any_call(node, request)
assert len(ret) == len(fetch_requests)