* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages* [Legacy Protocol] Update legacy protocol to handle compressed messages

* [SimpleConsumer] Fix legacy SimpleConsumer when using compressed messages
This commit is contained in:
Paul Cavallaro
2016-05-10 11:34:06 -04:00
committed by Dana Powers
parent 0684302d8f
commit 2c9930dea4
2 changed files with 30 additions and 3 deletions

View File

@@ -204,12 +204,22 @@ class KafkaProtocol(object):
return [
kafka.structs.FetchResponsePayload(
topic, partition, error, highwater_offset, [
kafka.structs.OffsetAndMessage(offset, message)
for offset, _, message in messages])
offset_and_msg
for offset_and_msg in cls.decode_message_set(messages)])
for topic, partitions in response.topics
for partition, error, highwater_offset, messages in partitions
]
@classmethod
def decode_message_set(cls, messages):
for offset, _, message in messages:
if isinstance(message, kafka.protocol.message.Message) and message.is_compressed():
inner_messages = message.decompress()
for (inner_offset, _msg_size, inner_msg) in inner_messages:
yield kafka.structs.OffsetAndMessage(inner_offset, inner_msg)
else:
yield kafka.structs.OffsetAndMessage(offset, message)
@classmethod
def encode_offset_request(cls, payloads=()):
return kafka.protocol.offset.OffsetRequest[0](

View File

@@ -5,7 +5,7 @@ from six.moves import xrange
from . import unittest
from kafka import (
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message, create_gzip_message
)
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.errors import ConsumerFetchSizeTooSmall, OffsetOutOfRangeError
@@ -49,6 +49,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
return [ x.value for x in messages ]
def send_gzip_message(self, partition, messages):
message = create_gzip_message([(self.msg(str(msg)), None) for msg in messages])
produce = ProduceRequestPayload(self.topic, partition, messages = [message])
resp, = self.client.send_produce_request([produce])
self.assertEqual(resp.error, 0)
def assert_message_count(self, messages, num_messages):
# Make sure we got them all
self.assertEqual(len(messages), num_messages)
@@ -92,6 +98,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer.stop()
def test_simple_consumer_gzip(self):
self.send_gzip_message(0, range(0, 100))
self.send_gzip_message(1, range(100, 200))
# Start a consumer
consumer = self.consumer()
self.assert_message_count([ message for message in consumer ], 200)
consumer.stop()
def test_simple_consumer_smallest_offset_reset(self):
self.send_messages(0, range(0, 100))
self.send_messages(1, range(100, 200))