Update Partitioners for use with KafkaProducer (#827)

This commit is contained in:
barrotsteindev
2016-09-28 20:30:32 +03:00
committed by Dana Powers
parent 5c784890b6
commit b8717b4b79
5 changed files with 113 additions and 25 deletions

2
.gitignore vendored
View File

@@ -12,3 +12,5 @@ servers/*/resources/ssl*
docs/_build
.cache*
.idea/
integration-test/
tests-env/

View File

@@ -5,22 +5,23 @@ class Partitioner(object):
"""
Base class for a partitioner
"""
def __init__(self, partitions):
def __init__(self, partitions=None):
"""
Initialize the partitioner
Arguments:
partitions: A list of available partitions (during startup)
partitions: A list of available partitions (during startup) OPTIONAL.
"""
self.partitions = partitions
def partition(self, key, partitions=None):
def __call__(self, key, all_partitions=None, available_partitions=None):
"""
Takes a string key and num_partitions as argument and returns
Takes a string key, num_partitions and available_partitions as argument and returns
a partition to be used for the message
Arguments:
key: the key to use for partitioning
partitions: (optional) a list of partitions.
key: the key to use for partitioning.
all_partitions: a list of the topic's partitions.
available_partitions: a list of the broker's currently avaliable partitions(optional).
"""
raise NotImplementedError('partition function has to be implemented')

View File

@@ -11,6 +11,11 @@ class Murmur2Partitioner(Partitioner):
the hash of the key. Attempts to apply the same hashing
function as mainline java client.
"""
def __call__(self, key, partitions=None, available=None):
if available:
return self.partition(key, available)
return self.partition(key, partitions)
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions
@@ -21,12 +26,15 @@ class Murmur2Partitioner(Partitioner):
return partitions[idx]
class LegacyPartitioner(Partitioner):
class LegacyPartitioner(object):
"""DEPRECATED -- See Issue 374
Implements a partitioner which selects the target partition based on
the hash of the key
"""
def __init__(self, partitions):
self.partitions = partitions
def partition(self, key, partitions=None):
if not partitions:
partitions = self.partitions

View File

@@ -1,26 +1,70 @@
from __future__ import absolute_import
from itertools import cycle
from .base import Partitioner
class RoundRobinPartitioner(Partitioner):
"""
Implements a round robin partitioner which sends data to partitions
in a round robin fashion
"""
def __init__(self, partitions):
super(RoundRobinPartitioner, self).__init__(partitions)
self.iterpart = cycle(partitions)
def _set_partitions(self, partitions):
self.partitions = partitions
self.iterpart = cycle(partitions)
def partition(self, key, partitions=None):
# Refresh the partition list if necessary
if partitions and self.partitions != partitions:
def __init__(self, partitions=None):
self.partitions_iterable = CachedPartitionCycler(partitions)
if partitions:
self._set_partitions(partitions)
else:
self.partitions = None
return next(self.iterpart)
def __call__(self, key, all_partitions=None, available_partitions=None):
if available_partitions:
cur_partitions = available_partitions
else:
cur_partitions = all_partitions
if not self.partitions:
self._set_partitions(cur_partitions)
elif cur_partitions != self.partitions_iterable.partitions and cur_partitions is not None:
self._set_partitions(cur_partitions)
return next(self.partitions_iterable)
def _set_partitions(self, available_partitions):
self.partitions = available_partitions
self.partitions_iterable.set_partitions(available_partitions)
def partition(self, key, all_partitions=None, available_partitions=None):
return self.__call__(key, all_partitions, available_partitions)
class CachedPartitionCycler(object):
def __init__(self, partitions=None):
self.partitions = partitions
if partitions:
assert type(partitions) is list
self.cur_pos = None
def __next__(self):
return self.next()
@staticmethod
def _index_available(cur_pos, partitions):
return cur_pos < len(partitions)
def set_partitions(self, partitions):
if self.cur_pos:
if not self._index_available(self.cur_pos, partitions):
self.cur_pos = 0
self.partitions = partitions
return None
self.partitions = partitions
next_item = self.partitions[self.cur_pos]
if next_item in partitions:
self.cur_pos = partitions.index(next_item)
else:
self.cur_pos = 0
return None
self.partitions = partitions
def next(self):
assert self.partitions is not None
if self.cur_pos is None or not self._index_available(self.cur_pos, self.partitions):
self.cur_pos = 1
return self.partitions[0]
cur_item = self.partitions[self.cur_pos]
self.cur_pos += 1
return cur_item

View File

@@ -3,6 +3,7 @@ import six
from kafka.partitioner import Murmur2Partitioner
from kafka.partitioner.default import DefaultPartitioner
from kafka.partitioner import RoundRobinPartitioner
def test_default_partitioner():
@@ -22,6 +23,38 @@ def test_default_partitioner():
assert partitioner(None, all_partitions, []) in all_partitions
def test_roundrobin_partitioner():
partitioner = RoundRobinPartitioner()
all_partitions = list(range(100))
available = all_partitions
# partitioner should cycle between partitions
i = 0
max_partition = all_partitions[len(all_partitions) - 1]
while i <= max_partition:
assert i == partitioner(None, all_partitions, available)
i += 1
i = 0
while i <= int(max_partition / 2):
assert i == partitioner(None, all_partitions, available)
i += 1
# test dynamic partition re-assignment
available = available[:-25]
while i <= max(available):
assert i == partitioner(None, all_partitions, available)
i += 1
all_partitions = list(range(200))
available = all_partitions
max_partition = all_partitions[len(all_partitions) - 1]
while i <= max_partition:
assert i == partitioner(None, all_partitions, available)
i += 1
def test_hash_bytes():
p = Murmur2Partitioner(range(1000))
assert p.partition(bytearray(b'test')) == p.partition(b'test')