Update consumer integration tests to use new (group) KafkaConsumer

- Remove debug call to deprecated .offsets() method
  - Manually assign TopicPartition to avoid group subscription overhead
  - Use next(consumer), not consumer.next()
  - consumer_timeout_ms now raises StopIteration, not ConsumerTimeout
  - auto_commit_enable is now enable_auto_commit
  - auto_offset_reset -> earliest, not smallest
  - new consumer does not support auto_commit_interval_messages
This commit is contained in:
Dana Powers
2016-01-03 16:18:08 -08:00
parent 9acb689015
commit 50f6a25ceb

View File

@@ -7,8 +7,8 @@ from kafka import (
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message
) )
from kafka.common import ( from kafka.common import (
ProduceRequestPayload, ConsumerFetchSizeTooSmall, ConsumerTimeout, ProduceRequestPayload, ConsumerFetchSizeTooSmall,
OffsetOutOfRangeError OffsetOutOfRangeError, TopicPartition
) )
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
@@ -475,11 +475,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.send_messages(1, range(100, 200)) self.send_messages(1, range(100, 200))
# Start a consumer # Start a consumer
consumer = self.kafka_consumer(auto_offset_reset='smallest', consumer = self.kafka_consumer(auto_offset_reset='earliest',
consumer_timeout_ms=5000) consumer_timeout_ms=5000)
n = 0 n = 0
messages = {0: set(), 1: set()} messages = {0: set(), 1: set()}
logging.debug("kafka consumer offsets: %s" % consumer.offsets())
for m in consumer: for m in consumer:
logging.debug("Consumed message %s" % repr(m)) logging.debug("Consumed message %s" % repr(m))
n += 1 n += 1
@@ -493,13 +492,17 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
@kafka_versions("all") @kafka_versions("all")
def test_kafka_consumer__blocking(self): def test_kafka_consumer__blocking(self):
TIMEOUT_MS = 500 TIMEOUT_MS = 500
consumer = self.kafka_consumer(auto_offset_reset='smallest', consumer = self.kafka_consumer(auto_offset_reset='earliest',
consumer_timeout_ms=TIMEOUT_MS) consumer_timeout_ms=TIMEOUT_MS)
# Manual assignment avoids overhead of consumer group mgmt
consumer.unsubscribe()
consumer.assign([TopicPartition(self.topic, 0)])
# Ask for 5 messages, nothing in queue, block 500ms # Ask for 5 messages, nothing in queue, block 500ms
with Timer() as t: with Timer() as t:
with self.assertRaises(ConsumerTimeout): with self.assertRaises(StopIteration):
msg = consumer.next() msg = next(consumer)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
self.send_messages(0, range(0, 10)) self.send_messages(0, range(0, 10))
@@ -508,7 +511,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
messages = set() messages = set()
with Timer() as t: with Timer() as t:
for i in range(5): for i in range(5):
msg = consumer.next() msg = next(consumer)
messages.add((msg.partition, msg.offset)) messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5) self.assertEqual(len(messages), 5)
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
@@ -516,9 +519,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Ask for 10 messages, get 5 back, block 500ms # Ask for 10 messages, get 5 back, block 500ms
messages = set() messages = set()
with Timer() as t: with Timer() as t:
with self.assertRaises(ConsumerTimeout): with self.assertRaises(StopIteration):
for i in range(10): for i in range(10):
msg = consumer.next() msg = next(consumer)
messages.add((msg.partition, msg.offset)) messages.add((msg.partition, msg.offset))
self.assertEqual(len(messages), 5) self.assertEqual(len(messages), 5)
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 ) self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
@@ -532,36 +535,35 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
# Start a consumer # Start a consumer
consumer1 = self.kafka_consumer( consumer1 = self.kafka_consumer(
group_id = GROUP_ID, group_id=GROUP_ID,
auto_commit_enable = True, enable_auto_commit=True,
auto_commit_interval_ms = None, auto_commit_interval_ms=100,
auto_commit_interval_messages = 20, auto_offset_reset='earliest',
auto_offset_reset='smallest', consumer_timeout_ms=100
) )
# Grab the first 195 messages # Grab the first 180 messages
output_msgs1 = [] output_msgs1 = []
for _ in xrange(195): for _ in xrange(180):
m = consumer1.next() m = next(consumer1)
output_msgs1.append(m) output_msgs1.append(m)
consumer1.task_done(m) self.assert_message_count(output_msgs1, 180)
self.assert_message_count(output_msgs1, 195) consumer1.close()
# The total offset across both partitions should be at 180 # The total offset across both partitions should be at 180
consumer2 = self.kafka_consumer( consumer2 = self.kafka_consumer(
group_id = GROUP_ID, group_id=GROUP_ID,
auto_commit_enable = True, enable_auto_commit=True,
auto_commit_interval_ms = None, auto_commit_interval_ms=100,
auto_commit_interval_messages = 20, auto_offset_reset='earliest',
consumer_timeout_ms = 100, consumer_timeout_ms=100
auto_offset_reset='smallest',
) )
# 181-200 # 181-200
output_msgs2 = [] output_msgs2 = []
with self.assertRaises(ConsumerTimeout): with self.assertRaises(StopIteration):
while True: while True:
m = consumer2.next() m = next(consumer2)
output_msgs2.append(m) output_msgs2.append(m)
self.assert_message_count(output_msgs2, 20) self.assert_message_count(output_msgs2, 20)
self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15) #self.assertEqual(len(set(output_msgs1) & set(output_msgs2)), 15)