Add tests for limited and unlimited consumer max_buffer_size
This commit is contained in:
@@ -8,6 +8,7 @@ import random
|
||||
from kafka import * # noqa
|
||||
from kafka.common import * # noqa
|
||||
from kafka.codec import has_gzip, has_snappy
|
||||
from kafka.consumer import FETCH_BUFFER_SIZE_BYTES, MAX_FETCH_BUFFER_SIZE_BYTES
|
||||
from .fixtures import ZookeeperFixture, KafkaFixture
|
||||
|
||||
|
||||
@@ -760,7 +761,7 @@ class TestConsumer(unittest.TestCase):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 0)
|
||||
|
||||
# Produce 10 messages that are too large (bigger than default fetch size)
|
||||
# Produce 10 messages that are large (bigger than default fetch size)
|
||||
messages2 = [create_message(random_string(5000)) for i in range(10)]
|
||||
produce2 = ProduceRequest("test_large_messages", 0, messages2)
|
||||
|
||||
@@ -776,6 +777,29 @@ class TestConsumer(unittest.TestCase):
|
||||
self.assertEquals(all_messages[i], message.message)
|
||||
self.assertEquals(i, 19)
|
||||
|
||||
# Produce 1 message that is too large (bigger than max fetch size)
|
||||
big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10
|
||||
big_message = create_message(random_string(big_message_size))
|
||||
produce3 = ProduceRequest("test_large_messages", 0, [big_message])
|
||||
for resp in self.client.send_produce_request([produce3]):
|
||||
self.assertEquals(resp.error, 0)
|
||||
self.assertEquals(resp.offset, 20)
|
||||
|
||||
self.assertRaises(ConsumerFetchSizeTooSmall, consumer.get_message, False, 0.1)
|
||||
|
||||
# Create a consumer with no fetch size limit
|
||||
big_consumer = SimpleConsumer(self.client, "group1", "test_large_messages",
|
||||
max_buffer_size=None, partitions=[0],
|
||||
auto_commit=False, iter_timeout=0)
|
||||
|
||||
# Seek to the last message
|
||||
big_consumer.seek(-1, 2)
|
||||
|
||||
# Consume giant message successfully
|
||||
message = big_consumer.get_message(block=False, timeout=10)
|
||||
self.assertIsNotNone(message)
|
||||
self.assertEquals(message.message.value, big_message.value)
|
||||
|
||||
class TestFailover(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
|
||||
Reference in New Issue
Block a user