Handle broken LZ4 framing; switch to lz4tools + xxhash
This commit is contained in:
@@ -102,8 +102,9 @@ Compression
|
|||||||
***********
|
***********
|
||||||
|
|
||||||
kafka-python supports gzip compression/decompression natively. To produce or
|
kafka-python supports gzip compression/decompression natively. To produce or
|
||||||
consume snappy and lz4 compressed messages, you must install `lz4` (`lz4-cffi`
|
consume lz4 compressed messages, you must install lz4tools and xxhash (modules
|
||||||
if using pypy) and/or `python-snappy` (also requires snappy library).
|
may not work on python2.6). To enable snappy compression/decompression install
|
||||||
|
python-snappy (also requires snappy library).
|
||||||
See `Installation <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install>`_
|
See `Installation <http://kafka-python.readthedocs.org/en/master/install.html#optional-snappy-install>`_
|
||||||
for more information.
|
for more information.
|
||||||
|
|
||||||
|
@@ -101,8 +101,9 @@ Compression
|
|||||||
***********
|
***********
|
||||||
|
|
||||||
kafka-python supports gzip compression/decompression natively. To produce or
|
kafka-python supports gzip compression/decompression natively. To produce or
|
||||||
consume snappy and lz4 compressed messages, you must install lz4 (lz4-cffi
|
consume lz4 compressed messages, you must install lz4tools and xxhash (modules
|
||||||
if using pypy) and/or python-snappy (also requires snappy library).
|
may not work on python2.6). To enable snappy, install python-snappy (also
|
||||||
|
requires snappy library).
|
||||||
See `Installation <install.html#optional-snappy-install>`_ for more information.
|
See `Installation <install.html#optional-snappy-install>`_ for more information.
|
||||||
|
|
||||||
|
|
||||||
|
@@ -40,14 +40,12 @@ Using `setup.py` directly:
|
|||||||
Optional LZ4 install
|
Optional LZ4 install
|
||||||
********************
|
********************
|
||||||
|
|
||||||
To enable LZ4 compression/decompression, install `lz4`:
|
To enable LZ4 compression/decompression, install lz4tools and xxhash:
|
||||||
|
|
||||||
>>> pip install lz4
|
>>> pip install lz4tools
|
||||||
|
>>> pip install xxhash
|
||||||
Or `lz4-cffi` if using pypy:
|
|
||||||
|
|
||||||
>>> pip install lz4-cffi
|
|
||||||
|
|
||||||
|
*Note*: these modules do not support python2.6
|
||||||
|
|
||||||
Optional Snappy install
|
Optional Snappy install
|
||||||
***********************
|
***********************
|
||||||
|
@@ -15,13 +15,10 @@ except ImportError:
|
|||||||
snappy = None
|
snappy = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import lz4
|
import lz4f
|
||||||
from lz4 import compress as lz4_encode
|
import xxhash
|
||||||
from lz4 import decompress as lz4_decode
|
|
||||||
except ImportError:
|
except ImportError:
|
||||||
lz4 = None
|
lz4f = None
|
||||||
lz4_encode = None
|
|
||||||
lz4_decode = None
|
|
||||||
|
|
||||||
PYPY = bool(platform.python_implementation() == 'PyPy')
|
PYPY = bool(platform.python_implementation() == 'PyPy')
|
||||||
|
|
||||||
@@ -34,7 +31,7 @@ def has_snappy():
|
|||||||
|
|
||||||
|
|
||||||
def has_lz4():
|
def has_lz4():
|
||||||
return lz4 is not None
|
return lz4f is not None
|
||||||
|
|
||||||
|
|
||||||
def gzip_encode(payload, compresslevel=None):
|
def gzip_encode(payload, compresslevel=None):
|
||||||
@@ -180,3 +177,50 @@ def snappy_decode(payload):
|
|||||||
return out.read()
|
return out.read()
|
||||||
else:
|
else:
|
||||||
return snappy.decompress(payload)
|
return snappy.decompress(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def lz4_encode(payload):
|
||||||
|
data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member
|
||||||
|
# Kafka's LZ4 code has a bug in its header checksum implementation
|
||||||
|
header_size = 7
|
||||||
|
if isinstance(data[4], int):
|
||||||
|
flg = data[4]
|
||||||
|
else:
|
||||||
|
flg = ord(data[4])
|
||||||
|
content_size_bit = ((flg >> 3) & 1)
|
||||||
|
if content_size_bit:
|
||||||
|
header_size += 8
|
||||||
|
|
||||||
|
# This is the incorrect hc
|
||||||
|
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
|
||||||
|
|
||||||
|
return b''.join([
|
||||||
|
data[0:header_size-1],
|
||||||
|
hc,
|
||||||
|
data[header_size:]
|
||||||
|
])
|
||||||
|
|
||||||
|
|
||||||
|
def lz4_decode(payload):
|
||||||
|
# Kafka's LZ4 code has a bug in its header checksum implementation
|
||||||
|
header_size = 7
|
||||||
|
if isinstance(payload[4], int):
|
||||||
|
flg = payload[4]
|
||||||
|
else:
|
||||||
|
flg = ord(payload[4])
|
||||||
|
content_size_bit = ((flg >> 3) & 1)
|
||||||
|
if content_size_bit:
|
||||||
|
header_size += 8
|
||||||
|
|
||||||
|
# This should be the correct hc
|
||||||
|
hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
|
||||||
|
|
||||||
|
munged_payload = b''.join([
|
||||||
|
payload[0:header_size-1],
|
||||||
|
hc,
|
||||||
|
payload[header_size:]
|
||||||
|
])
|
||||||
|
|
||||||
|
cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member
|
||||||
|
data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member
|
||||||
|
return data['decomp']
|
||||||
|
@@ -1,3 +1,5 @@
|
|||||||
|
import sys
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from kafka import KafkaConsumer, KafkaProducer
|
from kafka import KafkaConsumer, KafkaProducer
|
||||||
@@ -9,9 +11,13 @@ from test.testutil import random_string
|
|||||||
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
|
@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4'])
|
||||||
def test_end_to_end(kafka_broker, compression):
|
def test_end_to_end(kafka_broker, compression):
|
||||||
|
|
||||||
# LZ4 requires 0.8.2
|
if compression == 'lz4':
|
||||||
if compression == 'lz4' and version() < (0, 8, 2):
|
# LZ4 requires 0.8.2
|
||||||
return
|
if version() < (0, 8, 2):
|
||||||
|
return
|
||||||
|
# LZ4 python libs dont work on python2.6
|
||||||
|
elif sys.version_info < (2, 7):
|
||||||
|
return
|
||||||
|
|
||||||
connect_str = 'localhost:' + str(kafka_broker.port)
|
connect_str = 'localhost:' + str(kafka_broker.port)
|
||||||
producer = KafkaProducer(bootstrap_servers=connect_str,
|
producer = KafkaProducer(bootstrap_servers=connect_str,
|
||||||
|
Reference in New Issue
Block a user