From 4bc30a2ec8665b1faef0e668c12138c3cc52e38c Mon Sep 17 00:00:00 2001 From: Ali-Akber Saifee Date: Wed, 18 Mar 2015 10:27:04 +0800 Subject: [PATCH 1/5] Add test case for MP Consumer auto commit Tweak MP Consumer test to use iterator --- test/test_consumer_integration.py | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 17a8ac9..e5e2148 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -327,6 +327,41 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer1.stop() consumer2.stop() + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") + def test_multi_process_offset_behavior__resuming_behavior(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Start a consumer + consumer1 = self.consumer( + consumer=MultiProcessConsumer, + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # Grab the first 195 messages + output_msgs1 = [] + idx = 0 + for message in consumer1: + output_msgs1.append(message.message.value) + idx += 1 + if idx >= 195: + break + self.assert_message_count(output_msgs1, 195) + + # The total offset across both partitions should be at 180 + consumer2 = self.consumer( + consumer=MultiProcessConsumer, + auto_commit_every_t = None, + auto_commit_every_n = 20, + ) + + # 181-200 + self.assert_message_count([ message for message in consumer2 ], 20) + + consumer1.stop() + consumer2.stop() + # TODO: Make this a unit test -- should not require integration @kafka_versions("all") def test_fetch_buffer_size(self): From 92a3737a6b5267c7b643a9163c81b85ee0b0da58 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 26 Mar 2015 14:04:38 +0300 Subject: [PATCH 2/5] Added basic tests for load_initial_offsets option --- test/test_consumer_integration.py | 35 +++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index e5e2148..70d5109 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -127,6 +127,23 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() + @kafka_versions('all') + def test_simple_consumer_load_initial_offsets(self): + self.send_messages(0, range(0, 100)) + self.send_messages(1, range(100, 200)) + + # Create 1st consumer and change offsets + consumer = self.consumer() + self.assertEqual(consumer.offsets, {0: 0, 1: 0}) + consumer.offsets.update({0:51, 1:101}) + # Update counter after manual offsets update + consumer.count_since_commit += 1 + consumer.commit() + + # Create 2nd consumer and check initial offsets + consumer = self.consumer(auto_commit=False) + self.assertEqual(consumer.offsets, {0: 51, 1: 101}) + @kafka_versions("all") def test_simple_consumer__seek(self): self.send_messages(0, range(0, 100)) @@ -252,6 +269,24 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() + @kafka_versions("all") + def test_multi_process_consumer_load_initial_offsets(self): + self.send_messages(0, range(0, 10)) + self.send_messages(1, range(10, 20)) + + # Create 1st consumer and change offsets + consumer = self.consumer() + self.assertEqual(consumer.offsets, {0: 0, 1: 0}) + consumer.offsets.update({0:5, 1:15}) + # Update counter after manual offsets update + consumer.count_since_commit += 1 + consumer.commit() + + # Create 2nd consumer and check initial offsets + consumer = self.consumer(consumer = MultiProcessConsumer, + auto_commit=False) + self.assertEqual(consumer.offsets, {0: 5, 1: 15}) + @kafka_versions("all") def test_large_messages(self): # Produce 10 "normal" size messages From 32dd817aac4130a019339afac7ef52f2b9b7acd4 Mon Sep 17 00:00:00 2001 From: Viktor Shlapakov Date: Thu, 26 Mar 2015 15:21:46 +0300 Subject: [PATCH 3/5] Skip these tests: no OffsetCommitRequest for 0.8.0 --- test/test_consumer_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 70d5109..fd62d9b 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -127,7 +127,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): with self.assertRaises(OffsetOutOfRangeError): consumer.get_message() - @kafka_versions('all') + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") def test_simple_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 100)) self.send_messages(1, range(100, 200)) @@ -269,7 +269,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer.stop() - @kafka_versions("all") + @kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0") def test_multi_process_consumer_load_initial_offsets(self): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) From b6d032cc3f1b53a6d5b395f9b14de62f547c8f1c Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 29 Mar 2015 17:24:56 -0700 Subject: [PATCH 4/5] Fetch previously committed offsets in base consumer class so long as a group is configured (but document that group must be None for old servers). This fixes multiprocessor consumer issue that prevented access to commit offsets if auto_commit is disabled. Also refactor fetch_last_known_offsets based on KafkaConsumer While still setting unknown offsets to 0 --- kafka/consumer/base.py | 41 ++++++++++++++++++++----------- kafka/consumer/multiprocess.py | 2 ++ kafka/consumer/simple.py | 2 ++ test/test_consumer_integration.py | 5 +++- test/test_failover_integration.py | 2 +- 5 files changed, 36 insertions(+), 16 deletions(-) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index bde3c1a..91ad82f 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -7,7 +7,7 @@ from threading import Lock import kafka.common from kafka.common import ( OffsetRequest, OffsetCommitRequest, OffsetFetchRequest, - UnknownTopicOrPartitionError + UnknownTopicOrPartitionError, check_error ) from kafka.util import ReentrantTimer @@ -68,29 +68,42 @@ class Consumer(object): self.commit) self.commit_timer.start() - if auto_commit: + # Set initial offsets + if self.group is not None: self.fetch_last_known_offsets(partitions) else: for partition in partitions: self.offsets[partition] = 0 + def fetch_last_known_offsets(self, partitions=None): + if self.group is None: + raise ValueError('KafkaClient.group must not be None') + if not partitions: partitions = self.client.get_partition_ids_for_topic(self.topic) - def get_or_init_offset(resp): - try: - kafka.common.check_error(resp) - return resp.offset - except UnknownTopicOrPartitionError: - return 0 - for partition in partitions: - req = OffsetFetchRequest(self.topic, partition) - (resp,) = self.client.send_offset_fetch_request(self.group, [req], - fail_on_error=False) - self.offsets[partition] = get_or_init_offset(resp) - self.fetch_offsets = self.offsets.copy() + (resp,) = self.client.send_offset_fetch_request( + self.group, + [OffsetFetchRequest(self.topic, partition)], + fail_on_error=False + ) + try: + check_error(resp) + # API spec says server wont set an error here + # but 0.8.1.1 does actually... + except UnknownTopicOrPartitionError: + pass + + # -1 offset signals no commit is currently stored + if resp.offset == -1: + self.offsets[partition] = 0 + + # Otherwise we committed the stored offset + # and need to fetch the next one + else: + self.offsets[partition] = resp.offset def commit(self, partitions=None): """ diff --git a/kafka/consumer/multiprocess.py b/kafka/consumer/multiprocess.py index 5ce8b4d..3acd470 100644 --- a/kafka/consumer/multiprocess.py +++ b/kafka/consumer/multiprocess.py @@ -93,6 +93,8 @@ class MultiProcessConsumer(Consumer): Arguments: client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique + If you are connecting to a server that does not support offset + commit/fetch (any prior to 0.8.1.1), then you *must* set this to None topic: the topic to consume Keyword Arguments: diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index b50de61..ae00dab 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -73,6 +73,8 @@ class SimpleConsumer(Consumer): Arguments: client: a connected KafkaClient group: a name for this consumer, used for offset storage and must be unique + If you are connecting to a server that does not support offset + commit/fetch (any prior to 0.8.1.1), then you *must* set this to None topic: the topic to consume Keyword Arguments: diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index fd62d9b..403ce0f 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -53,6 +53,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def consumer(self, **kwargs): if os.environ['KAFKA_VERSION'] == "0.8.0": # Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off + kwargs['group'] = None kwargs['auto_commit'] = False else: kwargs.setdefault('auto_commit', True) @@ -260,7 +261,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.send_messages(0, range(0, 10)) self.send_messages(1, range(10, 20)) - consumer = MultiProcessConsumer(self.client, "group1", self.topic, + # set group to None and auto_commit to False to avoid interactions w/ + # offset commit/fetch apis + consumer = MultiProcessConsumer(self.client, None, self.topic, auto_commit=False, iter_timeout=0) self.assertEqual(consumer.pending(), 20) diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 7d27526..15f0338 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -183,7 +183,7 @@ class TestFailover(KafkaIntegrationTestCase): client = KafkaClient(hosts) group = random_string(10) - consumer = SimpleConsumer(client, group, topic, + consumer = SimpleConsumer(client, None, topic, partitions=partitions, auto_commit=False, iter_timeout=timeout) From 1d252bfc20c8b1058dc93a495c3bdb0f4ccdf590 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 30 Mar 2015 16:55:41 -0700 Subject: [PATCH 5/5] Bulk fetch offset partitions in base consumer -- suggested by ecanzonieri --- kafka/consumer/base.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py index 91ad82f..0bbf46c 100644 --- a/kafka/consumer/base.py +++ b/kafka/consumer/base.py @@ -83,12 +83,13 @@ class Consumer(object): if not partitions: partitions = self.client.get_partition_ids_for_topic(self.topic) - for partition in partitions: - (resp,) = self.client.send_offset_fetch_request( - self.group, - [OffsetFetchRequest(self.topic, partition)], - fail_on_error=False - ) + responses = self.client.send_offset_fetch_request( + self.group, + [OffsetFetchRequest(self.topic, p) for p in partitions], + fail_on_error=False + ) + + for resp in responses: try: check_error(resp) # API spec says server wont set an error here @@ -98,12 +99,12 @@ class Consumer(object): # -1 offset signals no commit is currently stored if resp.offset == -1: - self.offsets[partition] = 0 + self.offsets[resp.partition] = 0 # Otherwise we committed the stored offset # and need to fetch the next one else: - self.offsets[partition] = resp.offset + self.offsets[resp.partition] = resp.offset def commit(self, partitions=None): """