First pass at max_poll_records
This commit is contained in:
@@ -4,6 +4,7 @@ import collections
|
||||
import copy
|
||||
import logging
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
|
||||
from kafka.vendor import six
|
||||
@@ -39,6 +40,7 @@ class Fetcher(six.Iterator):
|
||||
'fetch_min_bytes': 1,
|
||||
'fetch_max_wait_ms': 500,
|
||||
'max_partition_fetch_bytes': 1048576,
|
||||
'max_poll_records': sys.maxsize,
|
||||
'check_crcs': True,
|
||||
'skip_double_compressed_messages': False,
|
||||
'iterator_refetch_records': 1, # undocumented -- interface may change
|
||||
@@ -96,7 +98,7 @@ class Fetcher(six.Iterator):
|
||||
self._fetch_futures = collections.deque()
|
||||
self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix'])
|
||||
|
||||
def init_fetches(self):
|
||||
def send_fetches(self):
|
||||
"""Send FetchRequests asynchronously for all assigned partitions.
|
||||
|
||||
Note: noop if there are unconsumed records internal to the fetcher
|
||||
@@ -108,12 +110,12 @@ class Fetcher(six.Iterator):
|
||||
# so we verify that there are no records in the deque, or in an
|
||||
# iterator
|
||||
if self._records or self._iterator:
|
||||
log.debug('Skipping init_fetches because there are unconsumed'
|
||||
log.debug('Skipping send_fetches because there are unconsumed'
|
||||
' records internally')
|
||||
return []
|
||||
return self._init_fetches()
|
||||
return self._send_fetches()
|
||||
|
||||
def _init_fetches(self):
|
||||
def _send_fetches(self):
|
||||
futures = []
|
||||
for node_id, request in six.iteritems(self._create_fetch_requests()):
|
||||
if self._client.ready(node_id):
|
||||
@@ -319,17 +321,23 @@ class Fetcher(six.Iterator):
|
||||
self._raise_if_unauthorized_topics()
|
||||
self._raise_if_record_too_large()
|
||||
|
||||
# Loop over the records deque
|
||||
while self._records:
|
||||
(fetch_offset, tp, messages) = self._records.popleft()
|
||||
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
# this can happen when a rebalance happened before
|
||||
# fetched records are returned to the consumer's poll call
|
||||
log.debug("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
continue
|
||||
max_records = self.config['max_poll_records']
|
||||
while self._records and max_records > 0:
|
||||
part = self._records.popleft()
|
||||
max_records -= self._append(drained, part, max_records)
|
||||
if part.messages:
|
||||
self._records.appendleft(part)
|
||||
return dict(drained)
|
||||
|
||||
def _append(self, drained, part, max_records):
|
||||
tp = part.topic_partition
|
||||
fetch_offset = part.fetch_offset
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
# this can happen when a rebalance happened before
|
||||
# fetched records are returned to the consumer's poll call
|
||||
log.debug("Not returning fetched records for partition %s"
|
||||
" since it is no longer assigned", tp)
|
||||
else:
|
||||
# note that the position should always be available
|
||||
# as long as the partition is still assigned
|
||||
position = self._subscriptions.assignment[tp].position
|
||||
@@ -340,26 +348,32 @@ class Fetcher(six.Iterator):
|
||||
" %s since it is no longer fetchable", tp)
|
||||
|
||||
elif fetch_offset == position:
|
||||
next_offset = messages[-1][0] + 1
|
||||
part_records = part.take(max_records)
|
||||
next_offset = part_records[-1][0] + 1
|
||||
|
||||
log.log(0, "Returning fetched records at offset %d for assigned"
|
||||
" partition %s and update position to %s", position,
|
||||
tp, next_offset)
|
||||
self._subscriptions.assignment[tp].position = next_offset
|
||||
|
||||
for record in self._unpack_message_set(tp, messages):
|
||||
for record in self._unpack_message_set(tp, part_records):
|
||||
# Fetched compressed messages may include additional records
|
||||
if record.offset < fetch_offset:
|
||||
log.debug("Skipping message offset: %s (expecting %s)",
|
||||
record.offset, fetch_offset)
|
||||
continue
|
||||
drained[tp].append(record)
|
||||
|
||||
self._subscriptions.assignment[tp].position = next_offset
|
||||
return len(part_records)
|
||||
|
||||
else:
|
||||
# these records aren't next in line based on the last consumed
|
||||
# position, ignore them they must be from an obsolete request
|
||||
log.debug("Ignoring fetched records for %s at offset %s since"
|
||||
" the current position is %d", tp, fetch_offset,
|
||||
" the current position is %d", tp, part.fetch_offset,
|
||||
position)
|
||||
return dict(drained)
|
||||
part.discard()
|
||||
return 0
|
||||
|
||||
def _unpack_message_set(self, tp, messages):
|
||||
try:
|
||||
@@ -445,9 +459,10 @@ class Fetcher(six.Iterator):
|
||||
# Send additional FetchRequests when the internal queue is low
|
||||
# this should enable moderate pipelining
|
||||
if len(self._records) <= self.config['iterator_refetch_records']:
|
||||
self._init_fetches()
|
||||
self._send_fetches()
|
||||
|
||||
(fetch_offset, tp, messages) = self._records.popleft()
|
||||
part = self._records.popleft()
|
||||
tp = part.topic_partition
|
||||
|
||||
if not self._subscriptions.is_assigned(tp):
|
||||
# this can happen when a rebalance happened before
|
||||
@@ -465,7 +480,7 @@ class Fetcher(six.Iterator):
|
||||
log.debug("Not returning fetched records for assigned partition"
|
||||
" %s since it is no longer fetchable", tp)
|
||||
|
||||
elif fetch_offset == position:
|
||||
elif part.fetch_offset == position:
|
||||
log.log(0, "Returning fetched records at offset %d for assigned"
|
||||
" partition %s", position, tp)
|
||||
|
||||
@@ -474,7 +489,7 @@ class Fetcher(six.Iterator):
|
||||
# i.e., the user seek()'d to this position
|
||||
self._subscriptions.assignment[tp].drop_pending_message_set = False
|
||||
|
||||
for msg in self._unpack_message_set(tp, messages):
|
||||
for msg in self._unpack_message_set(tp, part.messages):
|
||||
|
||||
# Because we are in a generator, it is possible for
|
||||
# subscription state to change between yield calls
|
||||
@@ -508,7 +523,7 @@ class Fetcher(six.Iterator):
|
||||
# these records aren't next in line based on the last consumed
|
||||
# position, ignore them they must be from an obsolete request
|
||||
log.debug("Ignoring fetched records for %s at offset %s",
|
||||
tp, fetch_offset)
|
||||
tp, part.fetch_offset)
|
||||
|
||||
def __iter__(self): # pylint: disable=non-iterator-returned
|
||||
return self
|
||||
@@ -601,6 +616,11 @@ class Fetcher(six.Iterator):
|
||||
" %s", partition, error_type)
|
||||
future.failure(error_type(partition))
|
||||
|
||||
def _fetchable_partitions(self):
|
||||
fetchable = self._subscriptions.fetchable_partitions()
|
||||
pending = set([part.topic_partition for part in self._records])
|
||||
return fetchable.difference(pending)
|
||||
|
||||
def _create_fetch_requests(self):
|
||||
"""Create fetch requests for all assigned partitions, grouped by node.
|
||||
|
||||
@@ -613,24 +633,17 @@ class Fetcher(six.Iterator):
|
||||
# which can be passed to FetchRequest() via .items()
|
||||
fetchable = collections.defaultdict(lambda: collections.defaultdict(list))
|
||||
|
||||
# avoid re-fetching pending offsets
|
||||
pending = set()
|
||||
for fetch_offset, tp, _ in self._records:
|
||||
pending.add((tp, fetch_offset))
|
||||
|
||||
for partition in self._subscriptions.fetchable_partitions():
|
||||
for partition in self._fetchable_partitions():
|
||||
node_id = self._client.cluster.leader_for_partition(partition)
|
||||
position = self._subscriptions.assignment[partition].position
|
||||
|
||||
# fetch if there is a leader, no in-flight requests, and no _records
|
||||
# fetch if there is a leader and no in-flight requests
|
||||
if node_id is None or node_id == -1:
|
||||
log.debug("No leader found for partition %s."
|
||||
" Requesting metadata update", partition)
|
||||
self._client.cluster.request_update()
|
||||
|
||||
elif ((partition, position) not in pending and
|
||||
self._client.in_flight_request_count(node_id) == 0):
|
||||
|
||||
elif self._client.in_flight_request_count(node_id) == 0:
|
||||
partition_info = (
|
||||
partition.partition,
|
||||
position,
|
||||
@@ -704,7 +717,7 @@ class Fetcher(six.Iterator):
|
||||
log.debug("Adding fetched record for partition %s with"
|
||||
" offset %d to buffered record list", tp,
|
||||
position)
|
||||
self._records.append((fetch_offset, tp, messages))
|
||||
self._records.append(self.PartitionRecords(fetch_offset, tp, messages))
|
||||
last_offset, _, _ = messages[-1]
|
||||
self._sensors.records_fetch_lag.record(highwater - last_offset)
|
||||
num_bytes = sum(msg[1] for msg in messages)
|
||||
@@ -744,6 +757,33 @@ class Fetcher(six.Iterator):
|
||||
self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms)
|
||||
self._sensors.fetch_latency.record((recv_time - send_time) * 1000)
|
||||
|
||||
class PartitionRecords(object):
|
||||
|
||||
def __init__(self, fetch_offset, tp, messages):
|
||||
self.fetch_offset = fetch_offset
|
||||
self.topic_partition = tp
|
||||
self.messages = messages
|
||||
|
||||
def discard(self):
|
||||
self.messages = None
|
||||
|
||||
def take(self, n):
|
||||
if self.messages is None:
|
||||
return []
|
||||
|
||||
if n >= len(self.messages):
|
||||
res = self.messages
|
||||
self.messages = None
|
||||
return res
|
||||
|
||||
res = self.messages[:n]
|
||||
self.messages = self.messages[n:]
|
||||
|
||||
if self.messages:
|
||||
self.fetch_offset = self.messages[0][0]
|
||||
|
||||
return res
|
||||
|
||||
|
||||
class FetchManagerMetrics(object):
|
||||
def __init__(self, metrics, prefix):
|
||||
|
||||
@@ -3,6 +3,7 @@ from __future__ import absolute_import
|
||||
import copy
|
||||
import logging
|
||||
import socket
|
||||
import sys
|
||||
import time
|
||||
|
||||
from kafka.vendor import six
|
||||
@@ -115,6 +116,7 @@ class KafkaConsumer(six.Iterator):
|
||||
rebalances. Default: 3000
|
||||
session_timeout_ms (int): The timeout used to detect failures when
|
||||
using Kafka's group managementment facilities. Default: 30000
|
||||
max_poll_records (int): ....
|
||||
receive_buffer_bytes (int): The size of the TCP receive buffer
|
||||
(SO_RCVBUF) to use when reading data. Default: None (relies on
|
||||
system defaults). The java client defaults to 32768.
|
||||
@@ -220,6 +222,7 @@ class KafkaConsumer(six.Iterator):
|
||||
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
|
||||
'heartbeat_interval_ms': 3000,
|
||||
'session_timeout_ms': 30000,
|
||||
'max_poll_records': sys.maxsize,
|
||||
'receive_buffer_bytes': None,
|
||||
'send_buffer_bytes': None,
|
||||
'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)],
|
||||
@@ -517,7 +520,7 @@ class KafkaConsumer(six.Iterator):
|
||||
# next round of fetches and avoid block waiting for their
|
||||
# responses to enable pipelining while the user is handling the
|
||||
# fetched records.
|
||||
self._fetcher.init_fetches()
|
||||
self._fetcher.send_fetches()
|
||||
return records
|
||||
|
||||
elapsed_ms = (time.time() - start) * 1000
|
||||
@@ -559,7 +562,7 @@ class KafkaConsumer(six.Iterator):
|
||||
if records:
|
||||
return records
|
||||
|
||||
self._fetcher.init_fetches()
|
||||
self._fetcher.send_fetches()
|
||||
self._client.poll(timeout_ms=timeout_ms, sleep=True)
|
||||
return self._fetcher.fetched_records()
|
||||
|
||||
@@ -881,10 +884,10 @@ class KafkaConsumer(six.Iterator):
|
||||
|
||||
# an else block on a for loop only executes if there was no break
|
||||
# so this should only be called on a StopIteration from the fetcher
|
||||
# and we assume that it is safe to init_fetches when fetcher is done
|
||||
# and we assume that it is safe to send_fetches when fetcher is done
|
||||
# i.e., there are no more records stored internally
|
||||
else:
|
||||
self._fetcher.init_fetches()
|
||||
self._fetcher.send_fetches()
|
||||
|
||||
def _next_timeout(self):
|
||||
timeout = min(self._consumer_timeout,
|
||||
|
||||
Reference in New Issue
Block a user