Fetch previously committed offsets in base consumer class so long as
a group is configured (but document that group must be None for old servers). This fixes multiprocessor consumer issue that prevented access to commit offsets if auto_commit is disabled. Also refactor fetch_last_known_offsets based on KafkaConsumer While still setting unknown offsets to 0
This commit is contained in:
@@ -7,7 +7,7 @@ from threading import Lock
|
||||
import kafka.common
|
||||
from kafka.common import (
|
||||
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
|
||||
UnknownTopicOrPartitionError
|
||||
UnknownTopicOrPartitionError, check_error
|
||||
)
|
||||
|
||||
from kafka.util import ReentrantTimer
|
||||
@@ -68,29 +68,42 @@ class Consumer(object):
|
||||
self.commit)
|
||||
self.commit_timer.start()
|
||||
|
||||
if auto_commit:
|
||||
# Set initial offsets
|
||||
if self.group is not None:
|
||||
self.fetch_last_known_offsets(partitions)
|
||||
else:
|
||||
for partition in partitions:
|
||||
self.offsets[partition] = 0
|
||||
|
||||
|
||||
def fetch_last_known_offsets(self, partitions=None):
|
||||
if self.group is None:
|
||||
raise ValueError('KafkaClient.group must not be None')
|
||||
|
||||
if not partitions:
|
||||
partitions = self.client.get_partition_ids_for_topic(self.topic)
|
||||
|
||||
def get_or_init_offset(resp):
|
||||
try:
|
||||
kafka.common.check_error(resp)
|
||||
return resp.offset
|
||||
except UnknownTopicOrPartitionError:
|
||||
return 0
|
||||
|
||||
for partition in partitions:
|
||||
req = OffsetFetchRequest(self.topic, partition)
|
||||
(resp,) = self.client.send_offset_fetch_request(self.group, [req],
|
||||
fail_on_error=False)
|
||||
self.offsets[partition] = get_or_init_offset(resp)
|
||||
self.fetch_offsets = self.offsets.copy()
|
||||
(resp,) = self.client.send_offset_fetch_request(
|
||||
self.group,
|
||||
[OffsetFetchRequest(self.topic, partition)],
|
||||
fail_on_error=False
|
||||
)
|
||||
try:
|
||||
check_error(resp)
|
||||
# API spec says server wont set an error here
|
||||
# but 0.8.1.1 does actually...
|
||||
except UnknownTopicOrPartitionError:
|
||||
pass
|
||||
|
||||
# -1 offset signals no commit is currently stored
|
||||
if resp.offset == -1:
|
||||
self.offsets[partition] = 0
|
||||
|
||||
# Otherwise we committed the stored offset
|
||||
# and need to fetch the next one
|
||||
else:
|
||||
self.offsets[partition] = resp.offset
|
||||
|
||||
def commit(self, partitions=None):
|
||||
"""
|
||||
|
@@ -93,6 +93,8 @@ class MultiProcessConsumer(Consumer):
|
||||
Arguments:
|
||||
client: a connected KafkaClient
|
||||
group: a name for this consumer, used for offset storage and must be unique
|
||||
If you are connecting to a server that does not support offset
|
||||
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
|
||||
topic: the topic to consume
|
||||
|
||||
Keyword Arguments:
|
||||
|
@@ -73,6 +73,8 @@ class SimpleConsumer(Consumer):
|
||||
Arguments:
|
||||
client: a connected KafkaClient
|
||||
group: a name for this consumer, used for offset storage and must be unique
|
||||
If you are connecting to a server that does not support offset
|
||||
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
|
||||
topic: the topic to consume
|
||||
|
||||
Keyword Arguments:
|
||||
|
@@ -53,6 +53,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
def consumer(self, **kwargs):
|
||||
if os.environ['KAFKA_VERSION'] == "0.8.0":
|
||||
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
|
||||
kwargs['group'] = None
|
||||
kwargs['auto_commit'] = False
|
||||
else:
|
||||
kwargs.setdefault('auto_commit', True)
|
||||
@@ -260,7 +261,9 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
|
||||
self.send_messages(0, range(0, 10))
|
||||
self.send_messages(1, range(10, 20))
|
||||
|
||||
consumer = MultiProcessConsumer(self.client, "group1", self.topic,
|
||||
# set group to None and auto_commit to False to avoid interactions w/
|
||||
# offset commit/fetch apis
|
||||
consumer = MultiProcessConsumer(self.client, None, self.topic,
|
||||
auto_commit=False, iter_timeout=0)
|
||||
|
||||
self.assertEqual(consumer.pending(), 20)
|
||||
|
@@ -183,7 +183,7 @@ class TestFailover(KafkaIntegrationTestCase):
|
||||
|
||||
client = KafkaClient(hosts)
|
||||
group = random_string(10)
|
||||
consumer = SimpleConsumer(client, group, topic,
|
||||
consumer = SimpleConsumer(client, None, topic,
|
||||
partitions=partitions,
|
||||
auto_commit=False,
|
||||
iter_timeout=timeout)
|
||||
|
Reference in New Issue
Block a user