Update consumer_integration to flip the autocommit switch when testing kafka 0.8.1
This commit is contained in:
@@ -50,9 +50,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
self.send_messages(1, range(100, 200))
|
self.send_messages(1, range(100, 200))
|
||||||
|
|
||||||
# Start a consumer
|
# Start a consumer
|
||||||
consumer = SimpleConsumer(self.client, "group1",
|
consumer = self.consumer()
|
||||||
self.topic, auto_commit=False,
|
|
||||||
iter_timeout=0)
|
|
||||||
|
|
||||||
self.assert_message_count([ message for message in consumer ], 200)
|
self.assert_message_count([ message for message in consumer ], 200)
|
||||||
|
|
||||||
@@ -63,9 +61,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
self.send_messages(0, range(0, 100))
|
self.send_messages(0, range(0, 100))
|
||||||
self.send_messages(1, range(100, 200))
|
self.send_messages(1, range(100, 200))
|
||||||
|
|
||||||
consumer = SimpleConsumer(self.client, "group1",
|
consumer = self.consumer()
|
||||||
self.topic, auto_commit=False,
|
|
||||||
iter_timeout=0)
|
|
||||||
|
|
||||||
# Rewind 10 messages from the end
|
# Rewind 10 messages from the end
|
||||||
consumer.seek(-10, 2)
|
consumer.seek(-10, 2)
|
||||||
@@ -79,9 +75,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
@kafka_versions("all")
|
@kafka_versions("all")
|
||||||
def test_simple_consumer_blocking(self):
|
def test_simple_consumer_blocking(self):
|
||||||
consumer = SimpleConsumer(self.client, "group1",
|
consumer = self.consumer()
|
||||||
self.topic,
|
|
||||||
auto_commit=False, iter_timeout=0)
|
|
||||||
|
|
||||||
# Ask for 5 messages, nothing in queue, block 5 seconds
|
# Ask for 5 messages, nothing in queue, block 5 seconds
|
||||||
with Timer() as t:
|
with Timer() as t:
|
||||||
@@ -111,8 +105,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
self.send_messages(0, range(0, 10))
|
self.send_messages(0, range(0, 10))
|
||||||
self.send_messages(1, range(10, 20))
|
self.send_messages(1, range(10, 20))
|
||||||
|
|
||||||
consumer = SimpleConsumer(self.client, "group1", self.topic,
|
consumer = self.consumer()
|
||||||
auto_commit=False, iter_timeout=0)
|
|
||||||
|
|
||||||
self.assertEquals(consumer.pending(), 20)
|
self.assertEquals(consumer.pending(), 20)
|
||||||
self.assertEquals(consumer.pending(partitions=[0]), 10)
|
self.assertEquals(consumer.pending(partitions=[0]), 10)
|
||||||
@@ -126,7 +119,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
self.send_messages(0, range(0, 100))
|
self.send_messages(0, range(0, 100))
|
||||||
self.send_messages(1, range(100, 200))
|
self.send_messages(1, range(100, 200))
|
||||||
|
|
||||||
consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
|
consumer = self.consumer(consumer = MultiProcessConsumer)
|
||||||
|
|
||||||
self.assert_message_count([ message for message in consumer ], 200)
|
self.assert_message_count([ message for message in consumer ], 200)
|
||||||
|
|
||||||
@@ -134,7 +127,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
|
|
||||||
@kafka_versions("all")
|
@kafka_versions("all")
|
||||||
def test_multi_process_consumer_blocking(self):
|
def test_multi_process_consumer_blocking(self):
|
||||||
consumer = MultiProcessConsumer(self.client, "grp1", self.topic, auto_commit=False)
|
consumer = self.consumer(consumer = MultiProcessConsumer)
|
||||||
|
|
||||||
# Ask for 5 messages, No messages in queue, block 5 seconds
|
# Ask for 5 messages, No messages in queue, block 5 seconds
|
||||||
with Timer() as t:
|
with Timer() as t:
|
||||||
@@ -182,8 +175,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
|
large_messages = self.send_messages(0, [ random_string(5000) for x in range(10) ])
|
||||||
|
|
||||||
# Consumer should still get all of them
|
# Consumer should still get all of them
|
||||||
consumer = SimpleConsumer(self.client, "group1", self.topic,
|
consumer = self.consumer()
|
||||||
auto_commit=False, iter_timeout=0)
|
|
||||||
|
|
||||||
expected_messages = set(small_messages + large_messages)
|
expected_messages = set(small_messages + large_messages)
|
||||||
actual_messages = set([ x.message.value for x in consumer ])
|
actual_messages = set([ x.message.value for x in consumer ])
|
||||||
@@ -198,8 +190,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
])
|
])
|
||||||
|
|
||||||
# Create a consumer with the default buffer size
|
# Create a consumer with the default buffer size
|
||||||
consumer = SimpleConsumer(self.client, "group1", self.topic,
|
consumer = self.consumer()
|
||||||
auto_commit=False, iter_timeout=0)
|
|
||||||
|
|
||||||
# This consumer failes to get the message
|
# This consumer failes to get the message
|
||||||
with self.assertRaises(ConsumerFetchSizeTooSmall):
|
with self.assertRaises(ConsumerFetchSizeTooSmall):
|
||||||
@@ -208,9 +199,10 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
consumer.stop()
|
consumer.stop()
|
||||||
|
|
||||||
# Create a consumer with no fetch size limit
|
# Create a consumer with no fetch size limit
|
||||||
big_consumer = SimpleConsumer(self.client, "group1", self.topic,
|
big_consumer = self.consumer(
|
||||||
max_buffer_size=None, partitions=[0],
|
max_buffer_size = None,
|
||||||
auto_commit=False, iter_timeout=0)
|
partitions = [0],
|
||||||
|
)
|
||||||
|
|
||||||
# Seek to the last message
|
# Seek to the last message
|
||||||
big_consumer.seek(-1, 2)
|
big_consumer.seek(-1, 2)
|
||||||
@@ -228,25 +220,39 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
|||||||
msgs2 = self.send_messages(1, range(100, 200))
|
msgs2 = self.send_messages(1, range(100, 200))
|
||||||
|
|
||||||
# Start a consumer
|
# Start a consumer
|
||||||
consumer1 = SimpleConsumer(self.client, "group1",
|
consumer1 = self.consumer(
|
||||||
self.topic, auto_commit=True,
|
auto_commit_every_t = 600,
|
||||||
auto_commit_every_t=600,
|
auto_commit_every_n = 20,
|
||||||
auto_commit_every_n=20,
|
)
|
||||||
iter_timeout=0)
|
|
||||||
|
|
||||||
# Grab the first 195 messages
|
# Grab the first 195 messages
|
||||||
output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
|
output_msgs1 = [ consumer1.get_message().message.value for _ in xrange(195) ]
|
||||||
self.assert_message_count(output_msgs1, 195)
|
self.assert_message_count(output_msgs1, 195)
|
||||||
|
|
||||||
# The offset should be at 180
|
# The offset should be at 180
|
||||||
consumer2 = SimpleConsumer(self.client, "group1",
|
consumer2 = self.consumer(
|
||||||
self.topic, auto_commit=True,
|
auto_commit_every_t = 600,
|
||||||
auto_commit_every_t=600,
|
auto_commit_every_n = 20,
|
||||||
auto_commit_every_n=20,
|
)
|
||||||
iter_timeout=0)
|
|
||||||
|
|
||||||
# 180-200
|
# 180-200
|
||||||
self.assert_message_count([ message for message in consumer2 ], 20)
|
self.assert_message_count([ message for message in consumer2 ], 20)
|
||||||
|
|
||||||
consumer1.stop()
|
consumer1.stop()
|
||||||
consumer2.stop()
|
consumer2.stop()
|
||||||
|
|
||||||
|
def consumer(self, **kwargs):
|
||||||
|
if os.environ['KAFKA_VERSION'] == "0.8.0":
|
||||||
|
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
|
||||||
|
kwargs['auto_commit'] = False
|
||||||
|
else:
|
||||||
|
kwargs.setdefault('auto_commit', True)
|
||||||
|
|
||||||
|
consumer_class = kwargs.pop('consumer', SimpleConsumer)
|
||||||
|
group = kwargs.pop('group', self.id())
|
||||||
|
topic = kwargs.pop('topic', self.topic)
|
||||||
|
|
||||||
|
if consumer_class == SimpleConsumer:
|
||||||
|
kwargs.setdefault('iter_timeout', 0)
|
||||||
|
|
||||||
|
return consumer_class(self.client, group, topic, **kwargs)
|
||||||
|
|||||||
Reference in New Issue
Block a user