Use standard LZ4 framing for v1 messages / kafka 0.10 (#695)
* LZ4 framing fixed in 0.10 / message v1 -- retain broken lz4 code for compatibility * lz4f does not support easy incremental decompression - raise RuntimeError * Update lz4 codec tests
This commit is contained in:
@@ -180,8 +180,27 @@ def snappy_decode(payload):
|
|||||||
|
|
||||||
|
|
||||||
def lz4_encode(payload):
|
def lz4_encode(payload):
|
||||||
data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member
|
"""Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
|
||||||
# Kafka's LZ4 code has a bug in its header checksum implementation
|
# pylint: disable-msg=no-member
|
||||||
|
return lz4f.compressFrame(payload)
|
||||||
|
|
||||||
|
|
||||||
|
def lz4_decode(payload):
|
||||||
|
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
|
||||||
|
# pylint: disable-msg=no-member
|
||||||
|
ctx = lz4f.createDecompContext()
|
||||||
|
data = lz4f.decompressFrame(payload, ctx)
|
||||||
|
|
||||||
|
# lz4f python module does not expose how much of the payload was
|
||||||
|
# actually read if the decompression was only partial.
|
||||||
|
if data['next'] != 0:
|
||||||
|
raise RuntimeError('lz4f unable to decompress full payload')
|
||||||
|
return data['decomp']
|
||||||
|
|
||||||
|
|
||||||
|
def lz4_encode_old_kafka(payload):
|
||||||
|
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
|
||||||
|
data = lz4_encode(payload)
|
||||||
header_size = 7
|
header_size = 7
|
||||||
if isinstance(data[4], int):
|
if isinstance(data[4], int):
|
||||||
flg = data[4]
|
flg = data[4]
|
||||||
@@ -201,7 +220,7 @@ def lz4_encode(payload):
|
|||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
def lz4_decode(payload):
|
def lz4_decode_old_kafka(payload):
|
||||||
# Kafka's LZ4 code has a bug in its header checksum implementation
|
# Kafka's LZ4 code has a bug in its header checksum implementation
|
||||||
header_size = 7
|
header_size = 7
|
||||||
if isinstance(payload[4], int):
|
if isinstance(payload[4], int):
|
||||||
@@ -220,7 +239,4 @@ def lz4_decode(payload):
|
|||||||
hc,
|
hc,
|
||||||
payload[header_size:]
|
payload[header_size:]
|
||||||
])
|
])
|
||||||
|
return lz4_decode(munged_payload)
|
||||||
cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member
|
|
||||||
data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member
|
|
||||||
return data['decomp']
|
|
||||||
|
@@ -81,8 +81,8 @@ class OffsetOutOfRangeError(BrokerResponseError):
|
|||||||
class InvalidMessageError(BrokerResponseError):
|
class InvalidMessageError(BrokerResponseError):
|
||||||
errno = 2
|
errno = 2
|
||||||
message = 'INVALID_MESSAGE'
|
message = 'INVALID_MESSAGE'
|
||||||
description = ('This indicates that a message contents does not match its'
|
description = ('This message has failed its CRC checksum, exceeds the'
|
||||||
' CRC.')
|
' valid size, or is otherwise corrupt.')
|
||||||
|
|
||||||
|
|
||||||
class UnknownTopicOrPartitionError(BrokerResponseError):
|
class UnknownTopicOrPartitionError(BrokerResponseError):
|
||||||
|
@@ -6,7 +6,8 @@ import threading
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from ..codec import (has_gzip, has_snappy, has_lz4,
|
from ..codec import (has_gzip, has_snappy, has_lz4,
|
||||||
gzip_encode, snappy_encode, lz4_encode)
|
gzip_encode, snappy_encode,
|
||||||
|
lz4_encode, lz4_encode_old_kafka)
|
||||||
from .. import errors as Errors
|
from .. import errors as Errors
|
||||||
from ..protocol.types import Int32, Int64
|
from ..protocol.types import Int32, Int64
|
||||||
from ..protocol.message import MessageSet, Message
|
from ..protocol.message import MessageSet, Message
|
||||||
@@ -28,10 +29,16 @@ class MessageSetBuffer(object):
|
|||||||
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
|
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
|
||||||
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
|
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
|
||||||
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
|
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
|
||||||
|
'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
|
||||||
}
|
}
|
||||||
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
|
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
|
||||||
if compression_type is not None:
|
if compression_type is not None:
|
||||||
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
|
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
|
||||||
|
|
||||||
|
# Kafka 0.8/0.9 had a quirky lz4...
|
||||||
|
if compression_type == 'lz4' and message_version == 0:
|
||||||
|
compression_type = 'lz4-old-kafka'
|
||||||
|
|
||||||
checker, encoder, attributes = self._COMPRESSORS[compression_type]
|
checker, encoder, attributes = self._COMPRESSORS[compression_type]
|
||||||
assert checker(), 'Compression Libraries Not Found'
|
assert checker(), 'Compression Libraries Not Found'
|
||||||
self._compressor = encoder
|
self._compressor = encoder
|
||||||
|
@@ -2,7 +2,8 @@ import io
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from ..codec import (has_gzip, has_snappy, has_lz4,
|
from ..codec import (has_gzip, has_snappy, has_lz4,
|
||||||
gzip_decode, snappy_decode, lz4_decode)
|
gzip_decode, snappy_decode,
|
||||||
|
lz4_decode, lz4_decode_old_kafka)
|
||||||
from . import pickle
|
from . import pickle
|
||||||
from .struct import Struct
|
from .struct import Struct
|
||||||
from .types import (
|
from .types import (
|
||||||
@@ -116,7 +117,10 @@ class Message(Struct):
|
|||||||
raw_bytes = snappy_decode(self.value)
|
raw_bytes = snappy_decode(self.value)
|
||||||
elif codec == self.CODEC_LZ4:
|
elif codec == self.CODEC_LZ4:
|
||||||
assert has_lz4(), 'LZ4 decompression unsupported'
|
assert has_lz4(), 'LZ4 decompression unsupported'
|
||||||
raw_bytes = lz4_decode(self.value)
|
if self.magic == 0:
|
||||||
|
raw_bytes = lz4_decode_old_kafka(self.value)
|
||||||
|
else:
|
||||||
|
raw_bytes = lz4_decode(self.value)
|
||||||
else:
|
else:
|
||||||
raise Exception('This should be impossible')
|
raise Exception('This should be impossible')
|
||||||
|
|
||||||
|
@@ -8,6 +8,7 @@ from kafka.codec import (
|
|||||||
gzip_encode, gzip_decode,
|
gzip_encode, gzip_decode,
|
||||||
snappy_encode, snappy_decode,
|
snappy_encode, snappy_decode,
|
||||||
lz4_encode, lz4_decode,
|
lz4_encode, lz4_decode,
|
||||||
|
lz4_encode_old_kafka, lz4_decode_old_kafka,
|
||||||
)
|
)
|
||||||
|
|
||||||
from test.testutil import random_string
|
from test.testutil import random_string
|
||||||
@@ -84,4 +85,26 @@ def test_lz4():
|
|||||||
for i in xrange(1000):
|
for i in xrange(1000):
|
||||||
b1 = random_string(100).encode('utf-8')
|
b1 = random_string(100).encode('utf-8')
|
||||||
b2 = lz4_decode(lz4_encode(b1))
|
b2 = lz4_decode(lz4_encode(b1))
|
||||||
|
assert len(b1) == len(b2)
|
||||||
|
assert b1 == b2
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
|
||||||
|
def test_lz4_old():
|
||||||
|
for i in xrange(1000):
|
||||||
|
b1 = random_string(100).encode('utf-8')
|
||||||
|
b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1))
|
||||||
|
assert len(b1) == len(b2)
|
||||||
|
assert b1 == b2
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail(reason="lz4tools library doesnt support incremental decompression")
|
||||||
|
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
|
||||||
|
def test_lz4_incremental():
|
||||||
|
for i in xrange(1000):
|
||||||
|
# lz4 max single block size is 4MB
|
||||||
|
# make sure we test with multiple-blocks
|
||||||
|
b1 = random_string(100).encode('utf-8') * 50000
|
||||||
|
b2 = lz4_decode(lz4_encode(b1))
|
||||||
|
assert len(b1) == len(b2)
|
||||||
assert b1 == b2
|
assert b1 == b2
|
||||||
|
Reference in New Issue
Block a user