Add set_topic_partitions method to configure topics/partitions to consume
This commit is contained in:
@@ -4,12 +4,15 @@ import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
from kafka.client import KafkaClient
|
||||
from kafka.common import (
|
||||
OffsetFetchRequest, OffsetCommitRequest, OffsetRequest, FetchRequest,
|
||||
check_error, NotLeaderForPartitionError, UnknownTopicOrPartitionError,
|
||||
OffsetOutOfRangeError, RequestTimedOutError, KafkaMessage, ConsumerTimeout,
|
||||
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError
|
||||
FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError,
|
||||
TopicAndPartition
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -118,15 +121,9 @@ class KafkaConsumer(object):
|
||||
"""
|
||||
|
||||
def __init__(self, *topics, **configs):
|
||||
self.topics = topics
|
||||
self.configure(**configs)
|
||||
|
||||
# Get initial topic metadata
|
||||
self.client.load_metadata_for_topics()
|
||||
for topic in self.topics:
|
||||
if topic not in self.client.topic_partitions:
|
||||
raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
|
||||
logger.info("Configuring consumer to fetch topic '%s'", topic)
|
||||
self.set_topic_partitions(*topics)
|
||||
|
||||
# Setup offsets
|
||||
self._offsets = OffsetsStruct(fetch=defaultdict(dict),
|
||||
@@ -333,6 +330,60 @@ class KafkaConsumer(object):
|
||||
client_id=self._config['client_id'],
|
||||
timeout=(self._config['socket_timeout_ms'] / 1000.0))
|
||||
|
||||
def set_topic_partitions(self, *topics):
|
||||
"""
|
||||
Set the topic/partitions to consume
|
||||
|
||||
Accepts types:
|
||||
str - topic name, will consume all available partitions
|
||||
TopicAndPartition namedtuple - will consume topic/partition
|
||||
tuple - will consume (topic, partition)
|
||||
dict - will consume { topic: partition }
|
||||
{ topic: [partition list] }
|
||||
{ topic: (partition tuple,) }
|
||||
|
||||
Ex:
|
||||
kafka = KafkaConsumer()
|
||||
|
||||
# Consume topic1-all; topic2-partition2; topic3-partition0
|
||||
kafka.set_topic_partitions("topic1", ("topic2", 2), {"topic3": 0})
|
||||
"""
|
||||
self.topics = []
|
||||
self.client.load_metadata_for_topics()
|
||||
|
||||
for arg in topics:
|
||||
if isinstance(arg, six.string_types):
|
||||
for partition in self.client.get_partition_ids_for_topic(arg):
|
||||
self.topics.append(TopicAndPartition(arg, partition))
|
||||
|
||||
elif isinstance(arg, TopicAndPartition):
|
||||
self.topics.append(arg)
|
||||
|
||||
elif isinstance(arg, tuple):
|
||||
self.topics.append(TopicAndPartition(*arg))
|
||||
|
||||
elif isinstance(arg, dict):
|
||||
for topic in arg:
|
||||
if isinstance(arg[topic], int):
|
||||
self.topics.append(TopicAndPartition(topic, arg[topic]))
|
||||
elif isinstance(arg[topic], (list, tuple)):
|
||||
for partition in arg[topic]:
|
||||
self.topics.append(TopicAndPartition(topic, partition))
|
||||
else:
|
||||
raise KafkaConfigurationError('Unknown topic type (dict key must be '
|
||||
'int or list/tuple of ints)')
|
||||
else:
|
||||
raise KafkaConfigurationError('Unknown topic type (topic must be '
|
||||
'string, TopicAndPartition, '
|
||||
'(topic,partition) tuple, or {topic: '
|
||||
'partitions} dict)')
|
||||
|
||||
# Get initial topic metadata
|
||||
for topic_partitions in self.topics:
|
||||
if topic not in self.client.topic_partitions:
|
||||
raise UnknownTopicOrPartitionError("Topic %s not found in broker metadata" % topic)
|
||||
logger.info("Configuring consumer to fetch topic '%s'", topic)
|
||||
|
||||
def fetch_messages(self):
|
||||
|
||||
max_bytes = self._config['fetch_message_max_bytes']
|
||||
|
||||
Reference in New Issue
Block a user