Prefer python-lz4 over lz4f if available
This commit is contained in:
@@ -113,9 +113,8 @@ Compression
|
||||
***********
|
||||
|
||||
kafka-python supports gzip compression/decompression natively. To produce or
|
||||
consume lz4 compressed messages, you must install lz4tools and xxhash (modules
|
||||
may not work on python2.6). To enable snappy, install python-snappy (also
|
||||
requires snappy library).
|
||||
consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
|
||||
To enable snappy, install python-snappy (also requires snappy library).
|
||||
See `Installation <install.html#optional-snappy-install>`_ for more information.
|
||||
|
||||
|
||||
|
||||
@@ -26,12 +26,10 @@ Bleeding-Edge
|
||||
Optional LZ4 install
|
||||
********************
|
||||
|
||||
To enable LZ4 compression/decompression, install lz4tools and xxhash:
|
||||
To enable LZ4 compression/decompression, install python-lz4:
|
||||
|
||||
>>> pip install lz4tools
|
||||
>>> pip install xxhash
|
||||
>>> pip install lz4
|
||||
|
||||
*Note*: these modules do not support python2.6
|
||||
|
||||
Optional Snappy install
|
||||
***********************
|
||||
|
||||
@@ -16,12 +16,21 @@ try:
|
||||
except ImportError:
|
||||
snappy = None
|
||||
|
||||
try:
|
||||
import lz4.frame as lz4
|
||||
except ImportError:
|
||||
lz4 = None
|
||||
|
||||
try:
|
||||
import lz4f
|
||||
import xxhash
|
||||
except ImportError:
|
||||
lz4f = None
|
||||
|
||||
try:
|
||||
import xxhash
|
||||
except ImportError:
|
||||
xxhash = None
|
||||
|
||||
PYPY = bool(platform.python_implementation() == 'PyPy')
|
||||
|
||||
def has_gzip():
|
||||
@@ -33,7 +42,11 @@ def has_snappy():
|
||||
|
||||
|
||||
def has_lz4():
|
||||
return lz4f is not None
|
||||
if lz4 is not None:
|
||||
return True
|
||||
if lz4f is not None:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def gzip_encode(payload, compresslevel=None):
|
||||
@@ -181,13 +194,15 @@ def snappy_decode(payload):
|
||||
return snappy.decompress(payload)
|
||||
|
||||
|
||||
def lz4_encode(payload):
|
||||
"""Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
|
||||
# pylint: disable-msg=no-member
|
||||
return lz4f.compressFrame(payload)
|
||||
if lz4:
|
||||
lz4_encode = lz4.compress # pylint: disable-msg=no-member
|
||||
elif lz4f:
|
||||
lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
|
||||
else:
|
||||
lz4_encode = None
|
||||
|
||||
|
||||
def lz4_decode(payload):
|
||||
def lz4f_decode(payload):
|
||||
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
|
||||
# pylint: disable-msg=no-member
|
||||
ctx = lz4f.createDecompContext()
|
||||
@@ -201,8 +216,17 @@ def lz4_decode(payload):
|
||||
return data['decomp']
|
||||
|
||||
|
||||
if lz4:
|
||||
lz4_decode = lz4.decompress # pylint: disable-msg=no-member
|
||||
elif lz4f:
|
||||
lz4_decode = lz4f_decode
|
||||
else:
|
||||
lz4_decode = None
|
||||
|
||||
|
||||
def lz4_encode_old_kafka(payload):
|
||||
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
|
||||
assert xxhash is not None
|
||||
data = lz4_encode(payload)
|
||||
header_size = 7
|
||||
if isinstance(data[4], int):
|
||||
@@ -224,6 +248,7 @@ def lz4_encode_old_kafka(payload):
|
||||
|
||||
|
||||
def lz4_decode_old_kafka(payload):
|
||||
assert xxhash is not None
|
||||
# Kafka's LZ4 code has a bug in its header checksum implementation
|
||||
header_size = 7
|
||||
if isinstance(payload[4], int):
|
||||
|
||||
Reference in New Issue
Block a user