diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index f5d44b1..21c0b30 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -4,6 +4,7 @@ import collections import copy import logging import random +import sys import time from kafka.vendor import six @@ -39,6 +40,7 @@ class Fetcher(six.Iterator): 'fetch_min_bytes': 1, 'fetch_max_wait_ms': 500, 'max_partition_fetch_bytes': 1048576, + 'max_poll_records': sys.maxsize, 'check_crcs': True, 'skip_double_compressed_messages': False, 'iterator_refetch_records': 1, # undocumented -- interface may change @@ -96,7 +98,7 @@ class Fetcher(six.Iterator): self._fetch_futures = collections.deque() self._sensors = FetchManagerMetrics(metrics, self.config['metric_group_prefix']) - def init_fetches(self): + def send_fetches(self): """Send FetchRequests asynchronously for all assigned partitions. Note: noop if there are unconsumed records internal to the fetcher @@ -108,12 +110,12 @@ class Fetcher(six.Iterator): # so we verify that there are no records in the deque, or in an # iterator if self._records or self._iterator: - log.debug('Skipping init_fetches because there are unconsumed' + log.debug('Skipping send_fetches because there are unconsumed' ' records internally') return [] - return self._init_fetches() + return self._send_fetches() - def _init_fetches(self): + def _send_fetches(self): futures = [] for node_id, request in six.iteritems(self._create_fetch_requests()): if self._client.ready(node_id): @@ -319,17 +321,23 @@ class Fetcher(six.Iterator): self._raise_if_unauthorized_topics() self._raise_if_record_too_large() - # Loop over the records deque - while self._records: - (fetch_offset, tp, messages) = self._records.popleft() - - if not self._subscriptions.is_assigned(tp): - # this can happen when a rebalance happened before - # fetched records are returned to the consumer's poll call - log.debug("Not returning fetched records for partition %s" - " since it is no longer assigned", tp) - continue + max_records = self.config['max_poll_records'] + while self._records and max_records > 0: + part = self._records.popleft() + max_records -= self._append(drained, part, max_records) + if part.messages: + self._records.appendleft(part) + return dict(drained) + def _append(self, drained, part, max_records): + tp = part.topic_partition + fetch_offset = part.fetch_offset + if not self._subscriptions.is_assigned(tp): + # this can happen when a rebalance happened before + # fetched records are returned to the consumer's poll call + log.debug("Not returning fetched records for partition %s" + " since it is no longer assigned", tp) + else: # note that the position should always be available # as long as the partition is still assigned position = self._subscriptions.assignment[tp].position @@ -340,26 +348,32 @@ class Fetcher(six.Iterator): " %s since it is no longer fetchable", tp) elif fetch_offset == position: - next_offset = messages[-1][0] + 1 + part_records = part.take(max_records) + next_offset = part_records[-1][0] + 1 + log.log(0, "Returning fetched records at offset %d for assigned" " partition %s and update position to %s", position, tp, next_offset) - self._subscriptions.assignment[tp].position = next_offset - for record in self._unpack_message_set(tp, messages): + for record in self._unpack_message_set(tp, part_records): # Fetched compressed messages may include additional records if record.offset < fetch_offset: log.debug("Skipping message offset: %s (expecting %s)", record.offset, fetch_offset) continue drained[tp].append(record) + + self._subscriptions.assignment[tp].position = next_offset + return len(part_records) + else: # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request log.debug("Ignoring fetched records for %s at offset %s since" - " the current position is %d", tp, fetch_offset, + " the current position is %d", tp, part.fetch_offset, position) - return dict(drained) + part.discard() + return 0 def _unpack_message_set(self, tp, messages): try: @@ -445,9 +459,10 @@ class Fetcher(six.Iterator): # Send additional FetchRequests when the internal queue is low # this should enable moderate pipelining if len(self._records) <= self.config['iterator_refetch_records']: - self._init_fetches() + self._send_fetches() - (fetch_offset, tp, messages) = self._records.popleft() + part = self._records.popleft() + tp = part.topic_partition if not self._subscriptions.is_assigned(tp): # this can happen when a rebalance happened before @@ -465,7 +480,7 @@ class Fetcher(six.Iterator): log.debug("Not returning fetched records for assigned partition" " %s since it is no longer fetchable", tp) - elif fetch_offset == position: + elif part.fetch_offset == position: log.log(0, "Returning fetched records at offset %d for assigned" " partition %s", position, tp) @@ -474,7 +489,7 @@ class Fetcher(six.Iterator): # i.e., the user seek()'d to this position self._subscriptions.assignment[tp].drop_pending_message_set = False - for msg in self._unpack_message_set(tp, messages): + for msg in self._unpack_message_set(tp, part.messages): # Because we are in a generator, it is possible for # subscription state to change between yield calls @@ -508,7 +523,7 @@ class Fetcher(six.Iterator): # these records aren't next in line based on the last consumed # position, ignore them they must be from an obsolete request log.debug("Ignoring fetched records for %s at offset %s", - tp, fetch_offset) + tp, part.fetch_offset) def __iter__(self): # pylint: disable=non-iterator-returned return self @@ -601,6 +616,11 @@ class Fetcher(six.Iterator): " %s", partition, error_type) future.failure(error_type(partition)) + def _fetchable_partitions(self): + fetchable = self._subscriptions.fetchable_partitions() + pending = set([part.topic_partition for part in self._records]) + return fetchable.difference(pending) + def _create_fetch_requests(self): """Create fetch requests for all assigned partitions, grouped by node. @@ -613,24 +633,17 @@ class Fetcher(six.Iterator): # which can be passed to FetchRequest() via .items() fetchable = collections.defaultdict(lambda: collections.defaultdict(list)) - # avoid re-fetching pending offsets - pending = set() - for fetch_offset, tp, _ in self._records: - pending.add((tp, fetch_offset)) - - for partition in self._subscriptions.fetchable_partitions(): + for partition in self._fetchable_partitions(): node_id = self._client.cluster.leader_for_partition(partition) position = self._subscriptions.assignment[partition].position - # fetch if there is a leader, no in-flight requests, and no _records + # fetch if there is a leader and no in-flight requests if node_id is None or node_id == -1: log.debug("No leader found for partition %s." " Requesting metadata update", partition) self._client.cluster.request_update() - elif ((partition, position) not in pending and - self._client.in_flight_request_count(node_id) == 0): - + elif self._client.in_flight_request_count(node_id) == 0: partition_info = ( partition.partition, position, @@ -704,7 +717,7 @@ class Fetcher(six.Iterator): log.debug("Adding fetched record for partition %s with" " offset %d to buffered record list", tp, position) - self._records.append((fetch_offset, tp, messages)) + self._records.append(self.PartitionRecords(fetch_offset, tp, messages)) last_offset, _, _ = messages[-1] self._sensors.records_fetch_lag.record(highwater - last_offset) num_bytes = sum(msg[1] for msg in messages) @@ -744,6 +757,33 @@ class Fetcher(six.Iterator): self._sensors.fetch_throttle_time_sensor.record(response.throttle_time_ms) self._sensors.fetch_latency.record((recv_time - send_time) * 1000) + class PartitionRecords(object): + + def __init__(self, fetch_offset, tp, messages): + self.fetch_offset = fetch_offset + self.topic_partition = tp + self.messages = messages + + def discard(self): + self.messages = None + + def take(self, n): + if self.messages is None: + return [] + + if n >= len(self.messages): + res = self.messages + self.messages = None + return res + + res = self.messages[:n] + self.messages = self.messages[n:] + + if self.messages: + self.fetch_offset = self.messages[0][0] + + return res + class FetchManagerMetrics(object): def __init__(self, metrics, prefix): diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index d4e0ff3..08f50e9 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -3,6 +3,7 @@ from __future__ import absolute_import import copy import logging import socket +import sys import time from kafka.vendor import six @@ -115,6 +116,7 @@ class KafkaConsumer(six.Iterator): rebalances. Default: 3000 session_timeout_ms (int): The timeout used to detect failures when using Kafka's group managementment facilities. Default: 30000 + max_poll_records (int): .... receive_buffer_bytes (int): The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. Default: None (relies on system defaults). The java client defaults to 32768. @@ -220,6 +222,7 @@ class KafkaConsumer(six.Iterator): 'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor), 'heartbeat_interval_ms': 3000, 'session_timeout_ms': 30000, + 'max_poll_records': sys.maxsize, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'socket_options': [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)], @@ -517,7 +520,7 @@ class KafkaConsumer(six.Iterator): # next round of fetches and avoid block waiting for their # responses to enable pipelining while the user is handling the # fetched records. - self._fetcher.init_fetches() + self._fetcher.send_fetches() return records elapsed_ms = (time.time() - start) * 1000 @@ -559,7 +562,7 @@ class KafkaConsumer(six.Iterator): if records: return records - self._fetcher.init_fetches() + self._fetcher.send_fetches() self._client.poll(timeout_ms=timeout_ms, sleep=True) return self._fetcher.fetched_records() @@ -881,10 +884,10 @@ class KafkaConsumer(six.Iterator): # an else block on a for loop only executes if there was no break # so this should only be called on a StopIteration from the fetcher - # and we assume that it is safe to init_fetches when fetcher is done + # and we assume that it is safe to send_fetches when fetcher is done # i.e., there are no more records stored internally else: - self._fetcher.init_fetches() + self._fetcher.send_fetches() def _next_timeout(self): timeout = min(self._consumer_timeout,