Speed up kafka plugin

Contruct one kafka consumer per consumer_group/topic cobination
This should significantly reduce the collection time

Additionally, this removed the ability to specify partitions to
collect. It will accept both old and new config formats, but
will always collect info from all partitions for a topic

Change-Id: Ib8e1b24ce515a0635e7e6cc4f6537e80f66b161c
This commit is contained in:
Ryan Brandt 2015-09-29 15:05:23 -06:00
parent 696bf6a26b
commit 0a9028ce89
2 changed files with 41 additions and 37 deletions

View File

@ -4,10 +4,10 @@ instances:
# - kafka_connect_str: localhost:19092
# zk_connect_str: localhost:2181
# zk_prefix: /0.8
# It is possible to get the lag for each partition the default is to sum lag for all partitions
# It is possible to get the lag for each partition, the default is to sum lag for all partitions
# per_partition: False
# Full includes additional metrics with actual offsets per partition, can't be set if per_partition is False
# full_output: False
# consumer_groups:
# my_consumer:
# my_topic: [0, 1, 4, 12] # If the partitions is an empty list the plugin will use all existing partitions
# - my_topic # the plugin will automatically collect from all existing partitions

View File

@ -36,60 +36,63 @@ class KafkaCheck(checks.AgentCheck):
is set also reports broker offsets and the current consumer offset.
Works on Kafka version >= 0.8.1.1
"""
def _validate_consumer_groups(self, val):
"""Private config validation/marshalling functions
def _parse_consumer_groups(self, raw_val):
"""Parses and validates the config
Expected format is:
consumer_groups:
myconsumer0: # consumer group name
- mytopic0
myconsumer1:
- mytopic0
- mytopic1
"""
consumer_groups = dict()
try:
consumer_group, topic_partitions = val.items()[0]
assert isinstance(consumer_group, (str, unicode))
topic, partitions = topic_partitions.items()[0]
assert isinstance(topic, (str, unicode))
assert isinstance(partitions, (list, tuple))
return val
for group, topics in raw_val.iteritems():
assert isinstance(group, basestring)
if isinstance(topics, dict):
self.log.info("Found old config format, discarding partition list")
topics = topics.keys()
assert isinstance(topics, list)
assert isinstance(topics[0], basestring)
consumer_groups[group] = topics
return consumer_groups
except Exception as e:
self.log.exception(e)
raise Exception('''The `consumer_groups` value must be a mapping of mappings, like this:
consumer_groups:
myconsumer0: # consumer group name
mytopic0: [0, 1] # topic: list of partitions
myconsumer1:
mytopic0: [0, 1, 2]
mytopic1: [10, 12]
''')
raise Exception("Invalid `consumer_groups` value. Must be a mapping of lists")
def _get_kafka_offsets(self, kafka_conn, consumer_groups):
# Query Kafka for consumer offsets
consumer_offsets = {}
topics = collections.defaultdict(set)
for consumer_group, topic_partitions in consumer_groups.iteritems():
for topic, partitions in topic_partitions.iteritems():
if len(partitions) == 0: # Use all found partitions if none are specified.
try:
kafka_consumer = consumer.SimpleConsumer(kafka_conn, consumer_group, topic)
partitions = kafka_consumer.offsets.keys()
except Exception:
self.log.error('Error fetching partition list for topic {0}'.format(topic))
continue
finally:
topic_partitions = collections.defaultdict(set)
for consumer_group, topics in consumer_groups.iteritems():
for topic in topics:
kafka_consumer = None
try:
kafka_consumer = consumer.SimpleConsumer(kafka_conn, consumer_group, topic)
partitions = kafka_consumer.offsets.keys()
except Exception:
self.log.error('Error fetching partition list for topic {0}'.format(topic))
if kafka_consumer is not None:
kafka_consumer.stop()
continue
# Remember the topic partitions encountered so that we can look up their broker offsets later
topics[topic].update(set(partitions))
topic_partitions[topic].update(set(partitions))
consumer_offsets[(consumer_group, topic)] = {}
for partition in partitions:
try:
kafka_consumer = consumer.SimpleConsumer(kafka_conn, consumer_group, topic)
consumer_offsets[(consumer_group, topic)][partition] = kafka_consumer.offsets[partition]
except KeyError:
self.log.error('Error fetching consumer offset for {0} partition {1}'.format(topic, partition))
finally:
kafka_consumer.stop()
kafka_consumer.stop()
# Query Kafka for the broker offsets, done in a separate loop so only one query is done
# per topic/partition even if multiple consumer groups watch the same topic
broker_offsets = {}
for topic, partitions in topics.iteritems():
for topic, partitions in topic_partitions.iteritems():
offset_responses = []
for p in partitions:
try:
@ -104,8 +107,9 @@ consumer_groups:
return consumer_offsets, broker_offsets
def check(self, instance):
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
raw_consumer_groups = self.read_config(instance, 'consumer_groups')
consumer_groups = self._convert_consumer_groups(raw_consumer_groups)
kafka_host_ports = self.read_config(instance, 'kafka_connect_str')
full_output = self.read_config(instance, 'full_output', cast=bool, optional=True)
per_partition = self.read_config(instance, 'per_partition', cast=bool, optional=True)