Add RangePartitionAssignor (and use as default); add assignor tests

This commit is contained in:
Dana Powers 2016-02-16 12:35:28 -08:00
parent d5c05c811e
commit c8be93b44b
7 changed files with 171 additions and 13 deletions

View File

@ -10,6 +10,7 @@ from kafka.client_async import KafkaClient
from kafka.consumer.fetcher import Fetcher
from kafka.consumer.subscription_state import SubscriptionState
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.protocol.offset import OffsetResetStrategy
from kafka.version import __version__
@ -98,7 +99,8 @@ class KafkaConsumer(six.Iterator):
brokers or partitions. Default: 300000
partition_assignment_strategy (list): List of objects to use to
distribute partition ownership amongst consumer instances when
group management is used. Default: [RoundRobinPartitionAssignor]
group management is used.
Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure
@ -146,7 +148,7 @@ class KafkaConsumer(six.Iterator):
'auto_commit_interval_ms': 5000,
'check_crcs': True,
'metadata_max_age_ms': 5 * 60 * 1000,
'partition_assignment_strategy': (RoundRobinPartitionAssignor,),
'partition_assignment_strategy': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'heartbeat_interval_ms': 3000,
'session_timeout_ms': 30000,
'send_buffer_bytes': 128 * 1024,

View File

@ -0,0 +1,77 @@
import collections
import logging
import six
from .abstract import AbstractPartitionAssignor
from ..protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
log = logging.getLogger(__name__)
class RangePartitionAssignor(AbstractPartitionAssignor):
"""
The range assignor works on a per-topic basis. For each topic, we lay out
the available partitions in numeric order and the consumers in
lexicographic order. We then divide the number of partitions by the total
number of consumers to determine the number of partitions to assign to each
consumer. If it does not evenly divide, then the first few consumers will
have one extra partition.
For example, suppose there are two consumers C0 and C1, two topics t0 and
t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
C0: [t0p0, t0p1, t1p0, t1p1]
C1: [t0p2, t1p2]
"""
name = 'range'
version = 0
@classmethod
def assign(cls, cluster, member_metadata):
consumers_per_topic = collections.defaultdict(list)
for member, metadata in six.iteritems(member_metadata):
for topic in metadata.subscription:
consumers_per_topic[topic].append(member)
# construct {member_id: {topic: [partition, ...]}}
assignment = collections.defaultdict(dict)
for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
partitions = cluster.partitions_for_topic(topic)
if partitions is None:
log.warning('No partition metadata for topic %s', topic)
continue
partitions = sorted(list(partitions))
partitions_for_topic = len(partitions)
consumers_for_topic.sort()
partitions_per_consumer = len(partitions) // len(consumers_for_topic)
consumers_with_extra = len(partitions) % len(consumers_for_topic)
for i in range(len(consumers_for_topic)):
start = partitions_per_consumer * i
start += min(i, consumers_with_extra)
length = partitions_per_consumer
if not i + 1 > consumers_with_extra:
length += 1
member = consumers_for_topic[i]
assignment[member][topic] = partitions[start:start+length]
protocol_assignment = {}
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
cls.version,
sorted(assignment[member_id].items()),
b'')
return protocol_assignment
@classmethod
def metadata(cls, topics):
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
@classmethod
def on_assignment(cls, assignment):
pass

View File

@ -12,6 +12,22 @@ log = logging.getLogger(__name__)
class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
"""
The roundrobin assignor lays out all the available partitions and all the
available consumers. It then proceeds to do a roundrobin assignment from
partition to consumer. If the subscriptions of all consumer instances are
identical, then the partitions will be uniformly distributed. (i.e., the
partition ownership counts will be within a delta of exactly one across all
consumers.)
For example, suppose there are two consumers C0 and C1, two topics t0 and
t1, and each topic has 3 partitions, resulting in partitions t0p0, t0p1,
t0p2, t1p0, t1p1, and t1p2.
The assignment will be:
C0: [t0p0, t0p2, t1p1]
C1: [t0p1, t1p0, t1p2]
"""
name = 'roundrobin'
version = 0
@ -50,7 +66,7 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
for member_id in member_metadata:
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
cls.version,
assignment[member_id].items(),
sorted(assignment[member_id].items()),
b'')
return protocol_assignment

View File

@ -8,6 +8,7 @@ import time
import six
from .base import BaseCoordinator
from .assignors.range import RangePartitionAssignor
from .assignors.roundrobin import RoundRobinPartitionAssignor
from .protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment,
@ -30,7 +31,7 @@ class ConsumerCoordinator(BaseCoordinator):
'enable_auto_commit': True,
'auto_commit_interval_ms': 5000,
'default_offset_commit_callback': lambda offsets, response: True,
'assignors': (RoundRobinPartitionAssignor,),
'assignors': (RangePartitionAssignor, RoundRobinPartitionAssignor),
'session_timeout_ms': 30000,
'heartbeat_interval_ms': 3000,
'retry_backoff_ms': 100,
@ -54,7 +55,7 @@ class ConsumerCoordinator(BaseCoordinator):
trigger custom actions when a commit request completes.
assignors (list): List of objects to use to distribute partition
ownership amongst consumer instances when group management is
used. Default: [RoundRobinPartitionAssignor]
used. Default: [RangePartitionAssignor, RoundRobinPartitionAssignor]
heartbeat_interval_ms (int): The expected time in milliseconds
between heartbeats to the consumer coordinator when using
Kafka's group management feature. Heartbeats are used to ensure

View File

@ -28,6 +28,6 @@ class ConsumerProtocolMemberAssignment(Struct):
class ConsumerProtocol(object):
PROTOCOL_TYPE = 'consumer'
ASSIGNMENT_STRATEGIES = ('roundrobin',)
ASSIGNMENT_STRATEGIES = ('range', 'roundrobin')
METADATA = ConsumerProtocolMemberMetadata
ASSIGNMENT = ConsumerProtocolMemberAssignment

58
test/test_assignors.py Normal file
View File

@ -0,0 +1,58 @@
# pylint: skip-file
from __future__ import absolute_import
import pytest
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.protocol import (
ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment)
@pytest.fixture
def cluster(mocker):
cluster = mocker.MagicMock()
cluster.partitions_for_topic.return_value = set([0, 1, 2])
return cluster
def test_assignor_roundrobin(cluster):
assignor = RoundRobinPartitionAssignor
member_metadata = {
'C0': assignor.metadata(set(['t0', 't1'])),
'C1': assignor.metadata(set(['t0', 't1'])),
}
ret = assignor.assign(cluster, member_metadata)
expected = {
'C0': ConsumerProtocolMemberAssignment(
assignor.version, [('t0', [0, 2]), ('t1', [1])], b''),
'C1': ConsumerProtocolMemberAssignment(
assignor.version, [('t0', [1]), ('t1', [0, 2])], b'')
}
assert ret == expected
assert set(ret) == set(expected)
for member in ret:
assert ret[member].encode() == expected[member].encode()
def test_assignor_range(cluster):
assignor = RangePartitionAssignor
member_metadata = {
'C0': assignor.metadata(set(['t0', 't1'])),
'C1': assignor.metadata(set(['t0', 't1'])),
}
ret = assignor.assign(cluster, member_metadata)
expected = {
'C0': ConsumerProtocolMemberAssignment(
assignor.version, [('t0', [0, 1]), ('t1', [0, 1])], b''),
'C1': ConsumerProtocolMemberAssignment(
assignor.version, [('t0', [2]), ('t1', [2])], b'')
}
assert ret == expected
assert set(ret) == set(expected)
for member in ret:
assert ret[member].encode() == expected[member].encode()

View File

@ -7,6 +7,7 @@ from kafka.client_async import KafkaClient
from kafka.common import TopicPartition, OffsetAndMetadata
from kafka.consumer.subscription_state import (
SubscriptionState, ConsumerRebalanceListener)
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.coordinator.consumer import ConsumerCoordinator
from kafka.coordinator.protocol import (
@ -72,13 +73,16 @@ def test_group_protocols(coordinator):
assert False, 'Exception not raised when expected'
coordinator._subscription.subscribe(topics=['foobar'])
assert coordinator.group_protocols() == [(
'roundrobin',
ConsumerProtocolMemberMetadata(
assert coordinator.group_protocols() == [
('range', ConsumerProtocolMemberMetadata(
RangePartitionAssignor.version,
['foobar'],
b'')),
('roundrobin', ConsumerProtocolMemberMetadata(
RoundRobinPartitionAssignor.version,
['foobar'],
b'')
)]
b'')),
]
@pytest.mark.parametrize('api_version', [(0, 8), (0, 8, 1), (0, 8, 2), (0, 9)])
@ -113,8 +117,8 @@ def test_pattern_subscription(coordinator, api_version):
def test_lookup_assignor(coordinator):
assignor = coordinator._lookup_assignor('roundrobin')
assert assignor is RoundRobinPartitionAssignor
assert coordinator._lookup_assignor('roundrobin') is RoundRobinPartitionAssignor
assert coordinator._lookup_assignor('range') is RangePartitionAssignor
assert coordinator._lookup_assignor('foobar') is None