Added max_bytes option and FetchRequest_v3 usage. (#962)

* Added `max_bytes` option and FetchRequest_v3 usage.
* Add checks for versions above 0.10 based on ApiVersionResponse
This commit is contained in:
Taras Voinarovskyi
2017-03-07 00:59:26 +02:00
committed by Dana Powers
parent ff6f7bf085
commit 9c19ea7cbe
6 changed files with 119 additions and 9 deletions

View File

@@ -156,6 +156,8 @@ class KafkaClient(object):
'sasl_plain_password': None, 'sasl_plain_password': None,
} }
API_VERSIONS = [ API_VERSIONS = [
(0, 10, 1),
(0, 10, 0),
(0, 10), (0, 10),
(0, 9), (0, 9),
(0, 8, 2), (0, 8, 2),

View File

@@ -18,6 +18,7 @@ from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.api import RequestHeader from kafka.protocol.api import RequestHeader
from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import GroupCoordinatorResponse from kafka.protocol.commit import GroupCoordinatorResponse
from kafka.protocol.metadata import MetadataRequest
from kafka.protocol.types import Int32 from kafka.protocol.types import Int32
from kafka.version import __version__ from kafka.version import __version__
@@ -760,6 +761,24 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31 self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id return self._correlation_id
def _check_version_above_0_10(self, response):
test_cases = [
# format (<broker verion>, <needed struct>)
((0, 10, 1), MetadataRequest[2])
]
error_type = Errors.for_code(response.error_code)
assert error_type is Errors.NoError, "API version check failed"
max_versions = dict([
(api_key, max_version)
for api_key, _, max_version in response.api_versions
])
# Get the best match of test cases
for broker_version, struct in test_cases:
if max_versions.get(struct.API_KEY, -1) >= struct.API_VERSION:
return broker_version
return (0, 10, 0)
def check_version(self, timeout=2, strict=False): def check_version(self, timeout=2, strict=False):
"""Attempt to guess the broker version. """Attempt to guess the broker version.
@@ -784,7 +803,6 @@ class BrokerConnection(object):
# socket.error (32, 54, or 104) # socket.error (32, 54, or 104)
from .protocol.admin import ApiVersionRequest, ListGroupsRequest from .protocol.admin import ApiVersionRequest, ListGroupsRequest
from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
from .protocol.metadata import MetadataRequest
# Socket errors are logged as exceptions and can alarm users. Mute them # Socket errors are logged as exceptions and can alarm users. Mute them
from logging import Filter from logging import Filter
@@ -798,6 +816,7 @@ class BrokerConnection(object):
log.addFilter(log_filter) log.addFilter(log_filter)
test_cases = [ test_cases = [
# All cases starting from 0.10 will be based on ApiVersionResponse
((0, 10), ApiVersionRequest[0]()), ((0, 10), ApiVersionRequest[0]()),
((0, 9), ListGroupsRequest[0]()), ((0, 9), ListGroupsRequest[0]()),
((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')), ((0, 8, 2), GroupCoordinatorRequest[0]('kafka-python-default-group')),
@@ -838,6 +857,10 @@ class BrokerConnection(object):
self._sock.setblocking(False) self._sock.setblocking(False)
if f.succeeded(): if f.succeeded():
if version == (0, 10):
# Starting from 0.10 kafka broker we determine version
# by looking at ApiVersionResponse
version = self._check_version_above_0_10(f.value)
log.info('Broker version identifed as %s', '.'.join(map(str, version))) log.info('Broker version identifed as %s', '.'.join(map(str, version)))
log.info('Set configuration api_version=%s to skip auto' log.info('Set configuration api_version=%s to skip auto'
' check_version requests on startup', version) ' check_version requests on startup', version)

View File

@@ -40,6 +40,7 @@ class Fetcher(six.Iterator):
'value_deserializer': None, 'value_deserializer': None,
'fetch_min_bytes': 1, 'fetch_min_bytes': 1,
'fetch_max_wait_ms': 500, 'fetch_max_wait_ms': 500,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1048576, 'max_partition_fetch_bytes': 1048576,
'max_poll_records': sys.maxsize, 'max_poll_records': sys.maxsize,
'check_crcs': True, 'check_crcs': True,
@@ -64,6 +65,15 @@ class Fetcher(six.Iterator):
the server will block before answering the fetch request if the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500. requirement given by fetch_min_bytes. Default: 500.
fetch_max_bytes (int): The maximum amount of data the server should
return for a fetch request. This is not an absolute maximum, if
the first message in the first non-empty partition of the fetch
is larger than this value, the message will still be returned
to ensure that the consumer can make progress. NOTE: consumer
performs fetches to multiple brokers in parallel so memory
usage will depend on the number of brokers containing
partitions for the topic.
Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
max_partition_fetch_bytes (int): The maximum amount of data max_partition_fetch_bytes (int): The maximum amount of data
per-partition the server will return. The maximum total memory per-partition the server will return. The maximum total memory
used for a request = #partitions * max_partition_fetch_bytes. used for a request = #partitions * max_partition_fetch_bytes.
@@ -617,7 +627,7 @@ class Fetcher(six.Iterator):
log.debug("Fetched offset %d for partition %s", offset, partition) log.debug("Fetched offset %d for partition %s", offset, partition)
future.success(offset) future.success(offset)
elif error_type in (Errors.NotLeaderForPartitionError, elif error_type in (Errors.NotLeaderForPartitionError,
Errors.UnknownTopicOrPartitionError): Errors.UnknownTopicOrPartitionError):
log.debug("Attempt to fetch offsets for partition %s failed due" log.debug("Attempt to fetch offsets for partition %s failed due"
" to obsolete leadership information, retrying.", " to obsolete leadership information, retrying.",
partition) partition)
@@ -664,7 +674,9 @@ class Fetcher(six.Iterator):
log.debug("Adding fetch request for partition %s at offset %d", log.debug("Adding fetch request for partition %s at offset %d",
partition, position) partition, position)
if self.config['api_version'] >= (0, 10): if self.config['api_version'] >= (0, 10, 1):
version = 3
elif self.config['api_version'] >= (0, 10):
version = 2 version = 2
elif self.config['api_version'] == (0, 9): elif self.config['api_version'] == (0, 9):
version = 1 version = 1
@@ -672,11 +684,28 @@ class Fetcher(six.Iterator):
version = 0 version = 0
requests = {} requests = {}
for node_id, partition_data in six.iteritems(fetchable): for node_id, partition_data in six.iteritems(fetchable):
requests[node_id] = FetchRequest[version]( if version < 3:
-1, # replica_id requests[node_id] = FetchRequest[version](
self.config['fetch_max_wait_ms'], -1, # replica_id
self.config['fetch_min_bytes'], self.config['fetch_max_wait_ms'],
partition_data.items()) self.config['fetch_min_bytes'],
partition_data.items())
else:
# As of version == 3 partitions will be returned in order as
# they are requested, so to avoid starvation with
# `fetch_max_bytes` option we need this shuffle
# NOTE: we do have partition_data in random order due to usage
# of unordered structures like dicts, but that does not
# guaranty equal distribution, and starting Python3.6
# dicts retain insert order.
partition_data = list(partition_data.items())
random.shuffle(partition_data)
requests[node_id] = FetchRequest[version](
-1, # replica_id
self.config['fetch_max_wait_ms'],
self.config['fetch_min_bytes'],
self.config['fetch_max_bytes'],
partition_data)
return requests return requests
def _handle_fetch_response(self, request, send_time, response): def _handle_fetch_response(self, request, send_time, response):

View File

@@ -65,6 +65,14 @@ class KafkaConsumer(six.Iterator):
the server will block before answering the fetch request if the server will block before answering the fetch request if
there isn't sufficient data to immediately satisfy the there isn't sufficient data to immediately satisfy the
requirement given by fetch_min_bytes. Default: 500. requirement given by fetch_min_bytes. Default: 500.
fetch_max_bytes (int): The maximum amount of data the server should
return for a fetch request. This is not an absolute maximum, if the
first message in the first non-empty partition of the fetch is
larger than this value, the message will still be returned to
ensure that the consumer can make progress. NOTE: consumer performs
fetches to multiple brokers in parallel so memory usage will depend
on the number of brokers containing partitions for the topic.
Supported Kafka version >= 0.10.1.0. Default: 52428800 (50 Mb).
max_partition_fetch_bytes (int): The maximum amount of data max_partition_fetch_bytes (int): The maximum amount of data
per-partition the server will return. The maximum total memory per-partition the server will return. The maximum total memory
used for a request = #partitions * max_partition_fetch_bytes. used for a request = #partitions * max_partition_fetch_bytes.
@@ -212,6 +220,7 @@ class KafkaConsumer(six.Iterator):
'value_deserializer': None, 'value_deserializer': None,
'fetch_max_wait_ms': 500, 'fetch_max_wait_ms': 500,
'fetch_min_bytes': 1, 'fetch_min_bytes': 1,
'fetch_max_bytes': 52428800,
'max_partition_fetch_bytes': 1 * 1024 * 1024, 'max_partition_fetch_bytes': 1 * 1024 * 1024,
'request_timeout_ms': 40 * 1000, 'request_timeout_ms': 40 * 1000,
'retry_backoff_ms': 100, 'retry_backoff_ms': 100,

View File

@@ -2,6 +2,7 @@ import logging
import os import os
from six.moves import xrange from six.moves import xrange
import six
from . import unittest from . import unittest
from kafka import ( from kafka import (
@@ -572,3 +573,48 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
output_msgs2.append(m) output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20) self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200) self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
@kafka_versions('>=0.10.1')
def test_kafka_consumer_max_bytes_simple(self):
self.send_messages(0, range(100, 200))
self.send_messages(1, range(200, 300))
# Start a consumer
consumer = self.kafka_consumer(
auto_offset_reset='earliest', fetch_max_bytes=300)
fetched_size = 0
seen_partitions = set([])
for i in range(10):
poll_res = consumer.poll(timeout_ms=100)
for partition, msgs in six.iteritems(poll_res):
for msg in msgs:
fetched_size += len(msg.value)
seen_partitions.add(partition)
# Check that we fetched at least 1 message from both partitions
self.assertEqual(
seen_partitions, set([
TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
self.assertLess(fetched_size, 3000)
@kafka_versions('>=0.10.1')
def test_kafka_consumer_max_bytes_one_msg(self):
# We send to only 1 partition so we don't have parallel requests to 2
# nodes for data.
self.send_messages(0, range(100, 200))
# Start a consumer. FetchResponse_v3 should always include at least 1
# full msg, so by setting fetch_max_bytes=1 we must get 1 msg at a time
consumer = self.kafka_consumer(
auto_offset_reset='earliest', fetch_max_bytes=1)
fetched_msgs = []
# A bit hacky, but we need this in order for message count to be exact
consumer._coordinator.ensure_active_group()
for i in range(10):
poll_res = consumer.poll(timeout_ms=2000)
print(poll_res)
for partition, msgs in six.iteritems(poll_res):
for msg in msgs:
fetched_msgs.append(msg)
self.assertEqual(len(fetched_msgs), 10)

View File

@@ -58,7 +58,8 @@ def test_send_fetches(fetcher, mocker):
@pytest.mark.parametrize(("api_version", "fetch_version"), [ @pytest.mark.parametrize(("api_version", "fetch_version"), [
((0, 10), 2), ((0, 10, 1), 3),
((0, 10, 0), 2),
((0, 9), 1), ((0, 9), 1),
((0, 8), 0) ((0, 8), 0)
]) ])