
Modified MultiProcessConsumer's and SimpleConsumer's `block` argument to allow integer value which defines the number of messages to block for. This allows callers to ask for a high number of messages and block only until some of them are received. Otherwise callers would have to request messages one by one or block for some time.
554 lines
20 KiB
Python
554 lines
20 KiB
Python
import logging
|
|
import os
|
|
|
|
from six.moves import xrange
|
|
|
|
from kafka import (
|
|
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
|
|
)
|
|
from kafka.common import (
|
|
ProduceRequest, ConsumerFetchSizeTooSmall, ConsumerTimeout,
|
|
OffsetOutOfRangeError
|
|
)
|
|
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
|
|
|
|
from test.fixtures import ZookeeperFixture, KafkaFixture
|
|
from test.testutil import (
|
|
KafkaIntegrationTestCase, kafka_versions, random_string, Timer
|
|
)
|
|
|
|
|
|
class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
if not os.environ.get('KAFKA_VERSION'):
|
|
return
|
|
|
|
cls.zk = ZookeeperFixture.instance()
|
|
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
|
|
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
|
|
|
|
cls.server = cls.server1 # Bootstrapping server
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
if not os.environ.get('KAFKA_VERSION'):
|
|
return
|
|
|
|
cls.server1.close()
|
|
cls.server2.close()
|
|
cls.zk.close()
|
|
|
|
def send_messages(self, partition, messages):
|
|
messages = [ create_message(self.msg(str(msg))) for msg in messages ]
|
|
produce = ProduceRequest(self.bytes_topic, partition, messages = messages)
|
|
resp, = self.client.send_produce_request([produce])
|
|
self.assertEqual(resp.error, 0)
|
|
|
|
return [ x.value for x in messages ]
|
|
|
|
def assert_message_count(self, messages, num_messages):
|
|
# Make sure we got them all
|
|
self.assertEqual(len(messages), num_messages)
|
|
|
|
# Make sure there are no duplicates
|
|
self.assertEqual(len(set(messages)), num_messages)
|
|
|
|
def consumer(self, **kwargs):
|
|
if os.environ['KAFKA_VERSION'] == "0.8.0":
|
|
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
|
|
kwargs['group'] = None
|
|
kwargs['auto_commit'] = False
|
|
else:
|
|
kwargs.setdefault('auto_commit', True)
|
|
|
|
consumer_class = kwargs.pop('consumer', SimpleConsumer)
|
|
group = kwargs.pop('group', self.id().encode('utf-8'))
|
|
topic = kwargs.pop('topic', self.topic)
|
|
|
|
if consumer_class in [SimpleConsumer, MultiProcessConsumer]:
|
|
kwargs.setdefault('iter_timeout', 0)
|
|
|
|
return consumer_class(self.client, group, topic, **kwargs)
|
|
|
|
def kafka_consumer(self, **configs):
|
|
brokers = '%s:%d' % (self.server.host, self.server.port)
|
|
consumer = KafkaConsumer(self.topic,
|
|
bootstrap_servers=brokers,
|
|
**configs)
|
|
return consumer
|
|
|
|
@kafka_versions("all")
|
|
def test_simple_consumer(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
# Start a consumer
|
|
consumer = self.consumer()
|
|
|
|
self.assert_message_count([ message for message in consumer ], 200)
|
|
|
|
consumer.stop()
|
|
|
|
@kafka_versions('all')
|
|
def test_simple_consumer_smallest_offset_reset(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
consumer = self.consumer(auto_offset_reset='smallest')
|
|
# Move fetch offset ahead of 300 message (out of range)
|
|
consumer.seek(300, 2)
|
|
# Since auto_offset_reset is set to smallest we should read all 200
|
|
# messages from beginning.
|
|
self.assert_message_count([message for message in consumer], 200)
|
|
|
|
@kafka_versions('all')
|
|
def test_simple_consumer_largest_offset_reset(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
# Default largest
|
|
consumer = self.consumer()
|
|
# Move fetch offset ahead of 300 message (out of range)
|
|
consumer.seek(300, 2)
|
|
# Since auto_offset_reset is set to largest we should not read any
|
|
# messages.
|
|
self.assert_message_count([message for message in consumer], 0)
|
|
# Send 200 new messages to the queue
|
|
self.send_messages(0, range(200, 300))
|
|
self.send_messages(1, range(300, 400))
|
|
# Since the offset is set to largest we should read all the new messages.
|
|
self.assert_message_count([message for message in consumer], 200)
|
|
|
|
@kafka_versions('all')
|
|
def test_simple_consumer_no_reset(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
# Default largest
|
|
consumer = self.consumer(auto_offset_reset=None)
|
|
# Move fetch offset ahead of 300 message (out of range)
|
|
consumer.seek(300, 2)
|
|
with self.assertRaises(OffsetOutOfRangeError):
|
|
consumer.get_message()
|
|
|
|
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
|
|
def test_simple_consumer_load_initial_offsets(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
# Create 1st consumer and change offsets
|
|
consumer = self.consumer()
|
|
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
|
|
consumer.offsets.update({0:51, 1:101})
|
|
# Update counter after manual offsets update
|
|
consumer.count_since_commit += 1
|
|
consumer.commit()
|
|
|
|
# Create 2nd consumer and check initial offsets
|
|
consumer = self.consumer(auto_commit=False)
|
|
self.assertEqual(consumer.offsets, {0: 51, 1: 101})
|
|
|
|
@kafka_versions("all")
|
|
def test_simple_consumer__seek(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
consumer = self.consumer()
|
|
|
|
# Rewind 10 messages from the end
|
|
consumer.seek(-10, 2)
|
|
self.assert_message_count([ message for message in consumer ], 10)
|
|
|
|
# Rewind 13 messages from the end
|
|
consumer.seek(-13, 2)
|
|
self.assert_message_count([ message for message in consumer ], 13)
|
|
|
|
# Set absolute offset
|
|
consumer.seek(100)
|
|
self.assert_message_count([ message for message in consumer ], 0)
|
|
consumer.seek(100, partition=0)
|
|
self.assert_message_count([ message for message in consumer ], 0)
|
|
consumer.seek(101, partition=1)
|
|
self.assert_message_count([ message for message in consumer ], 0)
|
|
consumer.seek(90, partition=0)
|
|
self.assert_message_count([ message for message in consumer ], 10)
|
|
consumer.seek(20, partition=1)
|
|
self.assert_message_count([ message for message in consumer ], 80)
|
|
consumer.seek(0, partition=1)
|
|
self.assert_message_count([ message for message in consumer ], 100)
|
|
|
|
consumer.stop()
|
|
|
|
@kafka_versions("all")
|
|
def test_simple_consumer_blocking(self):
|
|
consumer = self.consumer()
|
|
|
|
# Ask for 5 messages, nothing in queue, block 1 second
|
|
with Timer() as t:
|
|
messages = consumer.get_messages(block=True, timeout=1)
|
|
self.assert_message_count(messages, 0)
|
|
self.assertGreaterEqual(t.interval, 1)
|
|
|
|
self.send_messages(0, range(0, 10))
|
|
|
|
# Ask for 5 messages, 10 in queue. Get 5 back, no blocking
|
|
with Timer() as t:
|
|
messages = consumer.get_messages(count=5, block=True, timeout=5)
|
|
self.assert_message_count(messages, 5)
|
|
self.assertLessEqual(t.interval, 1)
|
|
|
|
# Ask for 10 messages, get 5 back, block 1 second
|
|
with Timer() as t:
|
|
messages = consumer.get_messages(count=10, block=True, timeout=1)
|
|
self.assert_message_count(messages, 5)
|
|
self.assertGreaterEqual(t.interval, 1)
|
|
|
|
# Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
|
|
# second, get 5 back, no blocking
|
|
self.send_messages(0, range(0, 5))
|
|
with Timer() as t:
|
|
messages = consumer.get_messages(count=10, block=1, timeout=1)
|
|
self.assert_message_count(messages, 5)
|
|
self.assertLessEqual(t.interval, 1)
|
|
|
|
consumer.stop()
|
|
|
|
@kafka_versions("all")
|
|
def test_simple_consumer_pending(self):
|
|
# make sure that we start with no pending messages
|
|
consumer = self.consumer()
|
|
self.assertEquals(consumer.pending(), 0)
|
|
self.assertEquals(consumer.pending(partitions=[0]), 0)
|
|
self.assertEquals(consumer.pending(partitions=[1]), 0)
|
|
|
|
# Produce 10 messages to partitions 0 and 1
|
|
self.send_messages(0, range(0, 10))
|
|
self.send_messages(1, range(10, 20))
|
|
|
|
consumer = self.consumer()
|
|
|
|
self.assertEqual(consumer.pending(), 20)
|
|
self.assertEqual(consumer.pending(partitions=[0]), 10)
|
|
self.assertEqual(consumer.pending(partitions=[1]), 10)
|
|
|
|
# move to last message, so one partition should have 1 pending
|
|
# message and other 0
|
|
consumer.seek(-1, 2)
|
|
self.assertEqual(consumer.pending(), 1)
|
|
|
|
pending_part1 = consumer.pending(partitions=[0])
|
|
pending_part2 = consumer.pending(partitions=[1])
|
|
self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
|
|
consumer.stop()
|
|
|
|
@kafka_versions("all")
|
|
def test_multi_process_consumer(self):
|
|
# Produce 100 messages to partitions 0 and 1
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
consumer = self.consumer(consumer = MultiProcessConsumer)
|
|
|
|
self.assert_message_count([ message for message in consumer ], 200)
|
|
|
|
consumer.stop()
|
|
|
|
@kafka_versions("all")
|
|
def test_multi_process_consumer_blocking(self):
|
|
consumer = self.consumer(consumer = MultiProcessConsumer)
|
|
|
|
# Ask for 5 messages, No messages in queue, block 1 second
|
|
with Timer() as t:
|
|
messages = consumer.get_messages(block=True, timeout=1)
|
|
self.assert_message_count(messages, 0)
|
|
|
|
self.assertGreaterEqual(t.interval, 1)
|
|
|
|
# Send 10 messages
|
|
self.send_messages(0, range(0, 10))
|
|
|
|
# Ask for 5 messages, 10 messages in queue, block 0 seconds
|
|
with Timer() as t:
|
|
messages = consumer.get_messages(count=5, block=True, timeout=5)
|
|
self.assert_message_count(messages, 5)
|
|
self.assertLessEqual(t.interval, 1)
|
|
|
|
# Ask for 10 messages, 5 in queue, block 1 second
|
|
with Timer() as t:
|
|
messages = consumer.get_messages(count=10, block=True, timeout=1)
|
|
self.assert_message_count(messages, 5)
|
|
self.assertGreaterEqual(t.interval, 1)
|
|
|
|
# Ask for 10 messages, 5 in queue, ask to block for 1 message or 1
|
|
# second, get at least one back, no blocking
|
|
self.send_messages(0, range(0, 5))
|
|
with Timer() as t:
|
|
messages = consumer.get_messages(count=10, block=1, timeout=1)
|
|
received_message_count = len(messages)
|
|
self.assertGreaterEqual(received_message_count, 1)
|
|
self.assert_message_count(messages, received_message_count)
|
|
self.assertLessEqual(t.interval, 1)
|
|
|
|
consumer.stop()
|
|
|
|
@kafka_versions("all")
|
|
def test_multi_proc_pending(self):
|
|
self.send_messages(0, range(0, 10))
|
|
self.send_messages(1, range(10, 20))
|
|
|
|
# set group to None and auto_commit to False to avoid interactions w/
|
|
# offset commit/fetch apis
|
|
consumer = MultiProcessConsumer(self.client, None, self.topic,
|
|
auto_commit=False, iter_timeout=0)
|
|
|
|
self.assertEqual(consumer.pending(), 20)
|
|
self.assertEqual(consumer.pending(partitions=[0]), 10)
|
|
self.assertEqual(consumer.pending(partitions=[1]), 10)
|
|
|
|
consumer.stop()
|
|
|
|
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
|
|
def test_multi_process_consumer_load_initial_offsets(self):
|
|
self.send_messages(0, range(0, 10))
|
|
self.send_messages(1, range(10, 20))
|
|
|
|
# Create 1st consumer and change offsets
|
|
consumer = self.consumer()
|
|
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
|
|
consumer.offsets.update({0:5, 1:15})
|
|
# Update counter after manual offsets update
|
|
consumer.count_since_commit += 1
|
|
consumer.commit()
|
|
|
|
# Create 2nd consumer and check initial offsets
|
|
consumer = self.consumer(consumer = MultiProcessConsumer,
|
|
auto_commit=False)
|
|
self.assertEqual(consumer.offsets, {0: 5, 1: 15})
|
|
|
|
@kafka_versions("all")
|
|
def test_large_messages(self):
|
|
# Produce 10 "normal" size messages
|
|
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
|
|
|
|
# Produce 10 messages that are large (bigger than default fetch size)
|
|
large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
|
|
|
|
# Consumer should still get all of them
|
|
consumer = self.consumer()
|
|
|
|
expected_messages = set(small_messages + large_messages)
|
|
actual_messages = set([ x.message.value for x in consumer ])
|
|
self.assertEqual(expected_messages, actual_messages)
|
|
|
|
consumer.stop()
|
|
|
|
@kafka_versions("all")
|
|
def test_huge_messages(self):
|
|
huge_message, = self.send_messages(0, [
|
|
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
|
|
])
|
|
|
|
# Create a consumer with the default buffer size
|
|
consumer = self.consumer()
|
|
|
|
# This consumer failes to get the message
|
|
with self.assertRaises(ConsumerFetchSizeTooSmall):
|
|
consumer.get_message(False, 0.1)
|
|
|
|
consumer.stop()
|
|
|
|
# Create a consumer with no fetch size limit
|
|
big_consumer = self.consumer(
|
|
max_buffer_size = None,
|
|
partitions = [0],
|
|
)
|
|
|
|
# Seek to the last message
|
|
big_consumer.seek(-1, 2)
|
|
|
|
# Consume giant message successfully
|
|
message = big_consumer.get_message(block=False, timeout=10)
|
|
self.assertIsNotNone(message)
|
|
self.assertEqual(message.message.value, huge_message)
|
|
|
|
big_consumer.stop()
|
|
|
|
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
|
|
def test_offset_behavior__resuming_behavior(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
# Start a consumer
|
|
consumer1 = self.consumer(
|
|
auto_commit_every_t = None,
|
|
auto_commit_every_n = 20,
|
|
)
|
|
|
|
# Grab the first 195 messages
|
|
output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
|
|
self.assert_message_count(output_msgs1, 195)
|
|
|
|
# The total offset across both partitions should be at 180
|
|
consumer2 = self.consumer(
|
|
auto_commit_every_t = None,
|
|
auto_commit_every_n = 20,
|
|
)
|
|
|
|
# 181-200
|
|
self.assert_message_count([ message for message in consumer2 ], 20)
|
|
|
|
consumer1.stop()
|
|
consumer2.stop()
|
|
|
|
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
|
|
def test_multi_process_offset_behavior__resuming_behavior(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
# Start a consumer
|
|
consumer1 = self.consumer(
|
|
consumer=MultiProcessConsumer,
|
|
auto_commit_every_t = None,
|
|
auto_commit_every_n = 20,
|
|
)
|
|
|
|
# Grab the first 195 messages
|
|
output_msgs1 = []
|
|
idx = 0
|
|
for message in consumer1:
|
|
output_msgs1.append(message.message.value)
|
|
idx += 1
|
|
if idx >= 195:
|
|
break
|
|
self.assert_message_count(output_msgs1, 195)
|
|
|
|
# The total offset across both partitions should be at 180
|
|
consumer2 = self.consumer(
|
|
consumer=MultiProcessConsumer,
|
|
auto_commit_every_t = None,
|
|
auto_commit_every_n = 20,
|
|
)
|
|
|
|
# 181-200
|
|
self.assert_message_count([ message for message in consumer2 ], 20)
|
|
|
|
consumer1.stop()
|
|
consumer2.stop()
|
|
|
|
# TODO: Make this a unit test -- should not require integration
|
|
@kafka_versions("all")
|
|
def test_fetch_buffer_size(self):
|
|
|
|
# Test parameters (see issue 135 / PR 136)
|
|
TEST_MESSAGE_SIZE=1048
|
|
INIT_BUFFER_SIZE=1024
|
|
MAX_BUFFER_SIZE=2048
|
|
assert TEST_MESSAGE_SIZE > INIT_BUFFER_SIZE
|
|
assert TEST_MESSAGE_SIZE < MAX_BUFFER_SIZE
|
|
assert MAX_BUFFER_SIZE == 2 * INIT_BUFFER_SIZE
|
|
|
|
self.send_messages(0, [ "x" * 1048 ])
|
|
self.send_messages(1, [ "x" * 1048 ])
|
|
|
|
consumer = self.consumer(buffer_size=1024, max_buffer_size=2048)
|
|
messages = [ message for message in consumer ]
|
|
self.assertEqual(len(messages), 2)
|
|
|
|
@kafka_versions("all")
|
|
def test_kafka_consumer(self):
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
# Start a consumer
|
|
consumer = self.kafka_consumer(auto_offset_reset='smallest',
|
|
consumer_timeout_ms=5000)
|
|
n = 0
|
|
messages = {0: set(), 1: set()}
|
|
logging.debug("kafka consumer offsets: %s" % consumer.offsets())
|
|
for m in consumer:
|
|
logging.debug("Consumed message %s" % repr(m))
|
|
n += 1
|
|
messages[m.partition].add(m.offset)
|
|
if n >= 200:
|
|
break
|
|
|
|
self.assertEqual(len(messages[0]), 100)
|
|
self.assertEqual(len(messages[1]), 100)
|
|
|
|
@kafka_versions("all")
|
|
def test_kafka_consumer__blocking(self):
|
|
TIMEOUT_MS = 500
|
|
consumer = self.kafka_consumer(auto_offset_reset='smallest',
|
|
consumer_timeout_ms=TIMEOUT_MS)
|
|
|
|
# Ask for 5 messages, nothing in queue, block 500ms
|
|
with Timer() as t:
|
|
with self.assertRaises(ConsumerTimeout):
|
|
msg = consumer.next()
|
|
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
|
|
|
|
self.send_messages(0, range(0, 10))
|
|
|
|
# Ask for 5 messages, 10 in queue. Get 5 back, no blocking
|
|
messages = set()
|
|
with Timer() as t:
|
|
for i in range(5):
|
|
msg = consumer.next()
|
|
messages.add((msg.partition, msg.offset))
|
|
self.assertEqual(len(messages), 5)
|
|
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
|
|
|
|
# Ask for 10 messages, get 5 back, block 500ms
|
|
messages = set()
|
|
with Timer() as t:
|
|
with self.assertRaises(ConsumerTimeout):
|
|
for i in range(10):
|
|
msg = consumer.next()
|
|
messages.add((msg.partition, msg.offset))
|
|
self.assertEqual(len(messages), 5)
|
|
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
|
|
|
|
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
|
|
def test_kafka_consumer__offset_commit_resume(self):
|
|
GROUP_ID = random_string(10).encode('utf-8')
|
|
|
|
self.send_messages(0, range(0, 100))
|
|
self.send_messages(1, range(100, 200))
|
|
|
|
# Start a consumer
|
|
consumer1 = self.kafka_consumer(
|
|
group_id = GROUP_ID,
|
|
auto_commit_enable = True,
|
|
auto_commit_interval_ms = None,
|
|
auto_commit_interval_messages = 20,
|
|
auto_offset_reset='smallest',
|
|
)
|
|
|
|
# Grab the first 195 messages
|
|
output_msgs1 = []
|
|
for _ in xrange(195):
|
|
m = consumer1.next()
|
|
output_msgs1.append(m)
|
|
consumer1.task_done(m)
|
|
self.assert_message_count(output_msgs1, 195)
|
|
|
|
# The total offset across both partitions should be at 180
|
|
consumer2 = self.kafka_consumer(
|
|
group_id = GROUP_ID,
|
|
auto_commit_enable = True,
|
|
auto_commit_interval_ms = None,
|
|
auto_commit_interval_messages = 20,
|
|
consumer_timeout_ms = 100,
|
|
auto_offset_reset='smallest',
|
|
)
|
|
|
|
# 181-200
|
|
output_msgs2 = []
|
|
with self.assertRaises(ConsumerTimeout):
|
|
while True:
|
|
m = consumer2.next()
|
|
output_msgs2.append(m)
|
|
self.assert_message_count(output_msgs2, 20)
|
|
self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)
|