Check api_version in ConsumerCoordinator

- Full group support in 0.9
  - Kafka-storage offsets w/ GroupCoordinator in 0.8.2
  - Zookeeper-storage offsets in 0.8.1
  - Assign all partitions locally if < 0.9
This commit is contained in:
Dana Powers
2016-01-03 16:03:30 -08:00
parent fae1a227b1
commit 5c45ec13f3

View File

@@ -9,7 +9,9 @@ from .base import BaseCoordinator
import kafka.common as Errors
from kafka.common import OffsetAndMetadata, TopicPartition
from kafka.future import Future
from kafka.protocol.commit import OffsetCommitRequest_v2, OffsetFetchRequest_v1
from kafka.protocol.commit import (
OffsetCommitRequest_v2, OffsetCommitRequest_v1, OffsetCommitRequest_v0,
OffsetFetchRequest_v0, OffsetFetchRequest_v1)
from kafka.protocol.struct import Struct
from kafka.protocol.types import Array, Bytes, Int16, Int32, Schema, String
@@ -55,6 +57,7 @@ class ConsumerCoordinator(BaseCoordinator):
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
'api_version': (0, 9),
}
def __init__(self, client, subscription, **configs):
@@ -99,14 +102,16 @@ class ConsumerCoordinator(BaseCoordinator):
self._subscription = subscription
self._partitions_per_topic = {}
self._auto_commit_task = None
assert self.config['assignors'], 'Coordinator require assignors'
if self.config['api_version'] >= (0, 9):
assert self.config['assignors'], 'Coordinator require assignors'
self._cluster.request_update()
self._cluster.add_listener(self._handle_metadata_update)
if self.config['enable_auto_commit']:
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(self, interval)
if self.config['api_version'] >= (0, 8, 1):
if self.config['enable_auto_commit']:
interval = self.config['auto_commit_interval_ms'] / 1000.0
self._auto_commit_task = AutoCommitTask(self, interval)
# metrics=None,
# metric_group_prefix=None,
@@ -143,7 +148,17 @@ class ConsumerCoordinator(BaseCoordinator):
# check if there are any changes to the metadata which should trigger a rebalance
if self._subscription_metadata_changed():
self._subscription.mark_for_reassignment()
if self.config['api_version'] >= (0, 9):
self._subscription.mark_for_reassignment()
# If we haven't got group coordinator support,
# just assign all partitions locally
else:
self._subscription.assign_from_subscribed([
TopicPartition(topic, partition)
for topic in self._subscription.subscription
for partition in self._partitions_per_topic[topic]
])
def _subscription_metadata_changed(self):
if not self._subscription.partitions_auto_assigned():
@@ -273,7 +288,8 @@ class ConsumerCoordinator(BaseCoordinator):
dict: {TopicPartition: OffsetAndMetadata}
"""
while True:
self.ensure_coordinator_known()
if self.config['api_version'] >= (0, 8, 2):
self.ensure_coordinator_known()
# contact coordinator to fetch committed offsets
future = self._send_offset_fetch_request(partitions)
@@ -331,7 +347,8 @@ class ConsumerCoordinator(BaseCoordinator):
return
while True:
self.ensure_coordinator_known()
if self.config['api_version'] >= (0, 8, 2):
self.ensure_coordinator_known()
future = self._send_offset_commit_request(offsets)
self._client.poll(future=future)
@@ -345,6 +362,8 @@ class ConsumerCoordinator(BaseCoordinator):
time.sleep(self.config['retry_backoff_ms'] / 1000.0)
def _maybe_auto_commit_offsets_sync(self):
if self.config['api_version'] < (0, 8, 1):
return
if self.config['enable_auto_commit']:
# disable periodic commits prior to committing synchronously. note that they will
# be re-enabled after a rebalance completes
@@ -379,8 +398,12 @@ class ConsumerCoordinator(BaseCoordinator):
Returns:
Future: indicating whether the commit was successful or not
"""
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
if self.config['api_version'] >= (0, 8, 2):
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
node_id = self.coordinator_id
else:
node_id = self._client.least_loaded_node()
if not offsets:
return Future().failure(None)
@@ -390,25 +413,49 @@ class ConsumerCoordinator(BaseCoordinator):
for tp, offset in six.iteritems(offsets):
offset_data[tp.topic][tp.partition] = offset
request = OffsetCommitRequest_v2(
self.group_id,
self.generation,
self.member_id,
OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME,
[(
topic, [(
partition,
offset.offset,
offset.metadata
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
)
if self.config['api_version'] >= (0, 9):
request = OffsetCommitRequest_v2(
self.group_id,
self.generation,
self.member_id,
OffsetCommitRequest_v2.DEFAULT_RETENTION_TIME,
[(
topic, [(
partition,
offset.offset,
offset.metadata
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
)
elif self.config['api_version'] >= (0, 8, 2):
request = OffsetCommitRequest_v1(
self.group_id, -1, '',
[(
topic, [(
partition,
offset.offset,
-1,
offset.metadata
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
)
elif self.config['api_version'] >= (0, 8, 1):
request = OffsetCommitRequest_v0(
self.group_id,
[(
topic, [(
partition,
offset.offset,
offset.metadata
) for partition, offset in six.iteritems(partitions)]
) for topic, partitions in six.iteritems(offset_data)]
)
log.debug("Sending offset-commit request with %s to %s",
offsets, self.coordinator_id)
offsets, node_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_commit_response, offsets, future)
_f.add_errback(self._failed_request, future)
return future
@@ -495,22 +542,33 @@ class ConsumerCoordinator(BaseCoordinator):
Returns:
Future: resolves to dict of offsets: {TopicPartition: int}
"""
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
if self.config['api_version'] >= (0, 8, 2):
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
node_id = self.coordinator_id
else:
node_id = self._client.least_loaded_node()
log.debug("Fetching committed offsets for partitions: %s", partitions)
# construct the request
topic_partitions = collections.defaultdict(set)
for tp in partitions:
topic_partitions[tp.topic].add(tp.partition)
request = OffsetFetchRequest_v1(
self.group_id,
list(topic_partitions.items())
)
if self.config['api_version'] >= (0, 8, 2):
request = OffsetFetchRequest_v1(
self.group_id,
list(topic_partitions.items())
)
else:
request = OffsetFetchRequest_v0(
self.group_id,
list(topic_partitions.items())
)
# send the request with a callback
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f = self._client.send(node_id, request)
_f.add_callback(self._handle_offset_fetch_response, future)
_f.add_errback(self._failed_request, future)
return future
@@ -536,6 +594,10 @@ class ConsumerCoordinator(BaseCoordinator):
# need to re-join group
self._subscription.mark_for_reassignment()
future.failure(error)
elif error_type is Errors.UnknownTopicOrPartitionError:
log.warning("OffsetFetchRequest -- unknown topic %s",
topic)
continue
else:
log.error("Unknown error fetching offsets for %s: %s",
tp, error)