Fix murmur2 bug handling python2 bytes that do not ascii encode (#815)
* Add test for murmur2 py2 bytes bug * Fix murmur2 handling of python2 bytes * Drop bytearray / str / unicode MurmurPartitioner tests -- no longer supported * Make DefaultPartitioner importable from kafka.partitioner
This commit is contained in:
@@ -1,9 +1,10 @@
|
|||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
|
||||||
from .roundrobin import RoundRobinPartitioner
|
from .default import DefaultPartitioner
|
||||||
from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner
|
from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner
|
||||||
|
from .roundrobin import RoundRobinPartitioner
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner',
|
'DefaultPartitioner', 'RoundRobinPartitioner', 'HashedPartitioner',
|
||||||
'LegacyPartitioner'
|
'Murmur2Partitioner', 'LegacyPartitioner'
|
||||||
]
|
]
|
||||||
|
@@ -49,22 +49,20 @@ HashedPartitioner = LegacyPartitioner
|
|||||||
|
|
||||||
|
|
||||||
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
|
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
|
||||||
def murmur2(key):
|
def murmur2(data):
|
||||||
"""Pure-python Murmur2 implementation.
|
"""Pure-python Murmur2 implementation.
|
||||||
|
|
||||||
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
|
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
key: if not a bytes type, encoded using default encoding
|
data (bytes): opaque bytes
|
||||||
|
|
||||||
Returns: MurmurHash2 of key bytearray
|
Returns: MurmurHash2 of data
|
||||||
"""
|
"""
|
||||||
|
# Python2 bytes is really a str, causing the bitwise operations below to fail
|
||||||
# Convert key to bytes or bytearray
|
# so convert to bytearray.
|
||||||
if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
|
if six.PY2:
|
||||||
data = key
|
data = bytearray(bytes(data))
|
||||||
else:
|
|
||||||
data = bytearray(str(key).encode())
|
|
||||||
|
|
||||||
length = len(data)
|
length = len(data)
|
||||||
seed = 0x9747b28c
|
seed = 0x9747b28c
|
||||||
|
@@ -1,9 +1,7 @@
|
|||||||
import pytest
|
from __future__ import absolute_import
|
||||||
import six
|
|
||||||
|
|
||||||
from kafka.partitioner import Murmur2Partitioner
|
from kafka.partitioner import DefaultPartitioner, Murmur2Partitioner, RoundRobinPartitioner
|
||||||
from kafka.partitioner.default import DefaultPartitioner
|
from kafka.partitioner.hashed import murmur2
|
||||||
from kafka.partitioner import RoundRobinPartitioner
|
|
||||||
|
|
||||||
|
|
||||||
def test_default_partitioner():
|
def test_default_partitioner():
|
||||||
@@ -55,16 +53,6 @@ def test_roundrobin_partitioner():
|
|||||||
i += 1
|
i += 1
|
||||||
|
|
||||||
|
|
||||||
def test_hash_bytes():
|
|
||||||
p = Murmur2Partitioner(range(1000))
|
|
||||||
assert p.partition(bytearray(b'test')) == p.partition(b'test')
|
|
||||||
|
|
||||||
|
|
||||||
def test_hash_encoding():
|
|
||||||
p = Murmur2Partitioner(range(1000))
|
|
||||||
assert p.partition('test') == p.partition(u'test')
|
|
||||||
|
|
||||||
|
|
||||||
def test_murmur2_java_compatibility():
|
def test_murmur2_java_compatibility():
|
||||||
p = Murmur2Partitioner(range(1000))
|
p = Murmur2Partitioner(range(1000))
|
||||||
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
|
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
|
||||||
@@ -74,3 +62,9 @@ def test_murmur2_java_compatibility():
|
|||||||
assert p.partition(b'abc') == 107
|
assert p.partition(b'abc') == 107
|
||||||
assert p.partition(b'123456789') == 566
|
assert p.partition(b'123456789') == 566
|
||||||
assert p.partition(b'\x00 ') == 742
|
assert p.partition(b'\x00 ') == 742
|
||||||
|
|
||||||
|
|
||||||
|
def test_murmur2_not_ascii():
|
||||||
|
# Verify no regression of murmur2() bug encoding py2 bytes that dont ascii encode
|
||||||
|
murmur2(b'\xa4')
|
||||||
|
murmur2(b'\x81' * 1000)
|
||||||
|
Reference in New Issue
Block a user