Add test case for MP Consumer auto commit
Tweak MP Consumer test to use iterator
This commit is contained in:
committed by
Dana Powers
parent
d05fccb9ef
commit
4bc30a2ec8
@@ -327,6 +327,41 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
consumer1.stop()
|
consumer1.stop()
|
||||||
consumer2.stop()
|
consumer2.stop()
|
||||||
|
|
||||||
|
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
|
||||||
|
def test_multi_process_offset_behavior__resuming_behavior(self):
|
||||||
|
self.send_messages(0, range(0, 100))
|
||||||
|
self.send_messages(1, range(100, 200))
|
||||||
|
|
||||||
|
# Start a consumer
|
||||||
|
consumer1 = self.consumer(
|
||||||
|
consumer=MultiProcessConsumer,
|
||||||
|
auto_commit_every_t = None,
|
||||||
|
auto_commit_every_n = 20,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Grab the first 195 messages
|
||||||
|
output_msgs1 = []
|
||||||
|
idx = 0
|
||||||
|
for message in consumer1:
|
||||||
|
output_msgs1.append(message.message.value)
|
||||||
|
idx += 1
|
||||||
|
if idx >= 195:
|
||||||
|
break
|
||||||
|
self.assert_message_count(output_msgs1, 195)
|
||||||
|
|
||||||
|
# The total offset across both partitions should be at 180
|
||||||
|
consumer2 = self.consumer(
|
||||||
|
consumer=MultiProcessConsumer,
|
||||||
|
auto_commit_every_t = None,
|
||||||
|
auto_commit_every_n = 20,
|
||||||
|
)
|
||||||
|
|
||||||
|
# 181-200
|
||||||
|
self.assert_message_count([ message for message in consumer2 ], 20)
|
||||||
|
|
||||||
|
consumer1.stop()
|
||||||
|
consumer2.stop()
|
||||||
|
|
||||||
# TODO: Make this a unit test -- should not require integration
|
# TODO: Make this a unit test -- should not require integration
|
||||||
@kafka_versions("all")
|
@kafka_versions("all")
|
||||||
def test_fetch_buffer_size(self):
|
def test_fetch_buffer_size(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user