Merge pull request #439 from chrischamberlin/fix-murmur
Fix translation of Java murmur2 code, fix byte encoding for Python 3.
This commit is contained in:
@@ -1,3 +1,5 @@
|
|||||||
|
import six
|
||||||
|
|
||||||
from .base import Partitioner
|
from .base import Partitioner
|
||||||
|
|
||||||
|
|
||||||
@@ -43,14 +45,16 @@ def murmur2(key):
|
|||||||
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
|
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
key: if not a bytearray, converted via bytearray(str(key))
|
key: if not a bytes type, encoded using default encoding
|
||||||
|
|
||||||
Returns: MurmurHash2 of key bytearray
|
Returns: MurmurHash2 of key bytearray
|
||||||
"""
|
"""
|
||||||
|
|
||||||
# Convert key to a bytearray
|
# Convert key to bytes or bytearray
|
||||||
if not isinstance(key, bytearray):
|
if isinstance(key, bytearray) or (six.PY3 and isinstance(key, bytes)):
|
||||||
data = bytearray(str(key))
|
data = key
|
||||||
|
else:
|
||||||
|
data = bytearray(str(key).encode())
|
||||||
|
|
||||||
length = len(data)
|
length = len(data)
|
||||||
seed = 0x9747b28c
|
seed = 0x9747b28c
|
||||||
@@ -61,7 +65,7 @@ def murmur2(key):
|
|||||||
|
|
||||||
# Initialize the hash to a random value
|
# Initialize the hash to a random value
|
||||||
h = seed ^ length
|
h = seed ^ length
|
||||||
length4 = length / 4
|
length4 = length // 4
|
||||||
|
|
||||||
for i in range(length4):
|
for i in range(length4):
|
||||||
i4 = i * 4
|
i4 = i * 4
|
||||||
@@ -84,15 +88,13 @@ def murmur2(key):
|
|||||||
|
|
||||||
# Handle the last few bytes of the input array
|
# Handle the last few bytes of the input array
|
||||||
extra_bytes = length % 4
|
extra_bytes = length % 4
|
||||||
if extra_bytes == 3:
|
if extra_bytes >= 3:
|
||||||
h ^= (data[(length & ~3) + 2] & 0xff) << 16
|
h ^= (data[(length & ~3) + 2] & 0xff) << 16
|
||||||
h &= 0xffffffff
|
h &= 0xffffffff
|
||||||
|
if extra_bytes >= 2:
|
||||||
if extra_bytes == 2:
|
|
||||||
h ^= (data[(length & ~3) + 1] & 0xff) << 8
|
h ^= (data[(length & ~3) + 1] & 0xff) << 8
|
||||||
h &= 0xffffffff
|
h &= 0xffffffff
|
||||||
|
if extra_bytes >= 1:
|
||||||
if extra_bytes == 1:
|
|
||||||
h ^= (data[length & ~3] & 0xff)
|
h ^= (data[length & ~3] & 0xff)
|
||||||
h &= 0xffffffff
|
h &= 0xffffffff
|
||||||
h *= m
|
h *= m
|
||||||
|
|||||||
23
test/test_partitioner.py
Normal file
23
test/test_partitioner.py
Normal file
@@ -0,0 +1,23 @@
|
|||||||
|
import six
|
||||||
|
from . import unittest
|
||||||
|
|
||||||
|
from kafka.partitioner import (Murmur2Partitioner)
|
||||||
|
|
||||||
|
class TestMurmurPartitioner(unittest.TestCase):
|
||||||
|
def test_hash_bytes(self):
|
||||||
|
p = Murmur2Partitioner(range(1000))
|
||||||
|
self.assertEqual(p.partition(bytearray(b'test')), p.partition(b'test'))
|
||||||
|
|
||||||
|
def test_hash_encoding(self):
|
||||||
|
p = Murmur2Partitioner(range(1000))
|
||||||
|
self.assertEqual(p.partition('test'), p.partition(u'test'))
|
||||||
|
|
||||||
|
def test_murmur2_java_compatibility(self):
|
||||||
|
p = Murmur2Partitioner(range(1000))
|
||||||
|
# compare with output from Kafka's org.apache.kafka.clients.producer.Partitioner
|
||||||
|
self.assertEqual(681, p.partition(b''))
|
||||||
|
self.assertEqual(524, p.partition(b'a'))
|
||||||
|
self.assertEqual(434, p.partition(b'ab'))
|
||||||
|
self.assertEqual(107, p.partition(b'abc'))
|
||||||
|
self.assertEqual(566, p.partition(b'123456789'))
|
||||||
|
self.assertEqual(742, p.partition(b'\x00 '))
|
||||||
Reference in New Issue
Block a user