Add api_version config to KafkaConsumer; disable features inline by version
This commit is contained in:
@@ -42,6 +42,7 @@ class KafkaConsumer(six.Iterator):
|
||||
'session_timeout_ms': 30000,
|
||||
'send_buffer_bytes': 128 * 1024,
|
||||
'receive_buffer_bytes': 32 * 1024,
|
||||
'api_version': 'auto',
|
||||
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
|
||||
#'metric_reporters': None,
|
||||
#'metrics_num_samples': 2,
|
||||
@@ -144,6 +145,12 @@ class KafkaConsumer(six.Iterator):
|
||||
(SO_SNDBUF) to use when sending data. Default: 131072
|
||||
receive_buffer_bytes (int): The size of the TCP receive buffer
|
||||
(SO_RCVBUF) to use when reading data. Default: 32768
|
||||
api_version (str): specify which kafka API version to use.
|
||||
0.9 enables full group coordination features; 0.8.2 enables
|
||||
kafka-storage offset commits; 0.8.1 enables zookeeper-storage
|
||||
offset commits; 0.8.0 is what is left. If set to 'auto', will
|
||||
attempt to infer the broker version by probing various APIs.
|
||||
Default: auto
|
||||
|
||||
Configuration parameters are described in more detail at
|
||||
https://kafka.apache.org/090/configuration.html#newconsumerconfigs
|
||||
@@ -157,6 +164,16 @@ class KafkaConsumer(six.Iterator):
|
||||
assert not configs, 'Unrecognized configs: %s' % configs
|
||||
|
||||
self._client = KafkaClient(**self.config)
|
||||
|
||||
# Check Broker Version if not set explicitly
|
||||
if self.config['api_version'] == 'auto':
|
||||
self.config['api_version'] = self._client.check_version()
|
||||
assert self.config['api_version'] in ('0.9', '0.8.2', '0.8.1', '0.8.0')
|
||||
|
||||
# Convert api_version config to tuple for easy comparisons
|
||||
self.config['api_version'] = tuple(
|
||||
map(int, self.config['api_version'].split('.')))
|
||||
|
||||
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
|
||||
self._fetcher = Fetcher(
|
||||
self._client, self._subscription, **self.config)
|
||||
@@ -250,6 +267,7 @@ class KafkaConsumer(six.Iterator):
|
||||
Returns:
|
||||
kafka.future.Future
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1)
|
||||
if offsets is None:
|
||||
offsets = self._subscription.all_consumed_offsets()
|
||||
log.debug("Committing offsets: %s", offsets)
|
||||
@@ -275,6 +293,7 @@ class KafkaConsumer(six.Iterator):
|
||||
to commit with the configured group_id. Defaults to current
|
||||
consumed offsets for all subscribed partitions.
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1)
|
||||
if offsets is None:
|
||||
offsets = self._subscription.all_consumed_offsets()
|
||||
self._coordinator.commit_offsets_sync(offsets)
|
||||
@@ -295,6 +314,7 @@ class KafkaConsumer(six.Iterator):
|
||||
Returns:
|
||||
The last committed offset, or None if there was no prior commit.
|
||||
"""
|
||||
assert self.config['api_version'] >= (0, 8, 1)
|
||||
if self._subscription.is_assigned(partition):
|
||||
committed = self._subscription.assignment[partition].committed
|
||||
if committed is None:
|
||||
@@ -382,12 +402,14 @@ class KafkaConsumer(six.Iterator):
|
||||
Returns:
|
||||
dict: map of topic to list of records (may be empty)
|
||||
"""
|
||||
# TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
if self.config['api_version'] >= (0, 8, 2):
|
||||
# TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
|
||||
# ensure we have partitions assigned if we expect to
|
||||
if self._subscription.partitions_auto_assigned():
|
||||
self._coordinator.ensure_active_group()
|
||||
if self.config['api_version'] >= (0, 9):
|
||||
# ensure we have partitions assigned if we expect to
|
||||
if self._subscription.partitions_auto_assigned():
|
||||
self._coordinator.ensure_active_group()
|
||||
|
||||
# fetch positions if we have partitions we're subscribed to that we
|
||||
# don't know the offset for
|
||||
@@ -565,19 +587,22 @@ class KafkaConsumer(six.Iterator):
|
||||
NoOffsetForPartitionError: If no offset is stored for a given
|
||||
partition and no offset reset policy is defined
|
||||
"""
|
||||
# refresh commits for all assigned partitions
|
||||
self._coordinator.refresh_committed_offsets_if_needed()
|
||||
if self.config['api_version'] >= (0, 8, 1):
|
||||
# refresh commits for all assigned partitions
|
||||
self._coordinator.refresh_committed_offsets_if_needed()
|
||||
|
||||
# then do any offset lookups in case some positions are not known
|
||||
self._fetcher.update_fetch_positions(partitions)
|
||||
|
||||
def _message_generator(self):
|
||||
while True:
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
if self.config['api_version'] >= (0, 8, 2):
|
||||
self._coordinator.ensure_coordinator_known()
|
||||
|
||||
# ensure we have partitions assigned if we expect to
|
||||
if self._subscription.partitions_auto_assigned():
|
||||
self._coordinator.ensure_active_group()
|
||||
if self.config['api_version'] >= (0, 9):
|
||||
# ensure we have partitions assigned if we expect to
|
||||
if self._subscription.partitions_auto_assigned():
|
||||
self._coordinator.ensure_active_group()
|
||||
|
||||
# fetch positions if we have partitions we're subscribed to that we
|
||||
# don't know the offset for
|
||||
|
||||
Reference in New Issue
Block a user