Reduced kafka metrics in the default case

Previously metrics were per partition per consumer which can quickly get
quite large. Now they default to per topic rather than per partition.

Change-Id: I6fe2bd57ddcfbb89c907dc7c2f9781bdd23db6ce
This commit is contained in:
Tim Kuhlman 2015-06-03 16:57:14 -06:00
parent e8a96ca152
commit e07b08e98d
4 changed files with 107 additions and 235 deletions

View File

@ -1,156 +0,0 @@
instances:
# - host: localhost
# port: 9999
# name: jmx_instance
# user: username
# password: password
# #java_bin_path: /path/to/java #Optional, should be set if the agent cannot find your java executable
# #trust_store_path: /path/to/trustStore.jks # Optional, should be set if ssl is enabled
# #trust_store_password: password
# dimensions:
# env: stage
# newTag: test
init_config:
is_jmx: true
# Metrics collected by this check. You should not have to modify this.
conf:
#
# Aggregate cluster stats
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesOutPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.net.bytes_out
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsBytesInPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.net.bytes_in
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsMessagesInPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.messages_in
#
# Request timings
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsFailedFetchRequestsPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.request.fetch.failed
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="BrokerTopicMetrics",name="AllTopicsFailedProduceRequestsPerSec"'
attribute:
MeanRate:
metric_type: gauge
alias: kafka.request.produce.failed
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Produce-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.produce.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.produce.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Fetch-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.fetch.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.fetch.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="UpdateMetadata-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.update_metadata.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.update_metadata.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Metadata-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.metadata.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.metadata.time.99percentile
- include:
domain: '"kafka.network"'
bean: '"kafka.network":type="RequestMetrics",name="Offsets-TotalTimeMs"'
attribute:
Mean:
metric_type: counter
alias: kafka.request.offsets.time.avg
99thPercentile:
metric_type: counter
alias: kafka.request.offsets.time.99percentile
#
# Replication stats
#
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ReplicaManager",name="ISRShrinksPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.isr_shrinks
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ReplicaManager",name="ISRExpandsPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.isr_expands
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ControllerStats",name="LeaderElectionRateAndTimeMs"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.leader_elections
- include:
domain: '"kafka.server"'
bean: '"kafka.server":type="ControllerStats",name="UncleanLeaderElectionsPerSec"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.replication.unclean_leader_elections
#
# Log flush stats
#
- include:
domain: '"kafka.log"'
bean: '"kafka.log":type="LogFlushStats",name="LogFlushRateAndTimeMs"'
attribute:
MeanRate:
metric_type: counter
alias: kafka.log.flush_rate

View File

@ -4,8 +4,10 @@ instances:
# - kafka_connect_str: localhost:19092
# zk_connect_str: localhost:2181
# zk_prefix: /0.8
# By default shows the lag for the various topics, full includes additional metrics with actual offsets
# It is possible to get the lag for each partition the default is to sum lag for all partitions
# per_partition: False
# Full includes additional metrics with actual offsets per partition, can't be set if per_partition is False
# full_output: False
# consumer_groups:
# my_consumer:
# my_topic: [0, 1, 4, 12]
# my_topic: [0, 1, 4, 12] # If the partitions is an empty list the plugin will use all existing partitions

View File

@ -1,12 +1,5 @@
import sys
if sys.version_info < (2, 6):
# Normally we'd write our checks to be compatible with >= python 2.4 but
# the dependencies of this check are not compatible with 2.4 and would
# be too much work to rewrite, so raise an exception here.
raise Exception('kafka_consumer check requires at least Python 2.6')
import collections
import logging
import monasca_agent.collector.checks as checks
try:
@ -17,6 +10,26 @@ except ImportError:
raise Exception('Missing python dependency: kafka (https://github.com/mumrah/kafka-python)')
log = logging.getLogger(__name__)
class KafkaConnection(object):
""" A simple context manager for kafka connections """
def __init__(self, connect_str):
self.connect_str = connect_str
def __enter__(self):
self.kafka_conn = client.KafkaClient(self.connect_str)
return self.kafka_conn
def __exit__(self, type, value, traceback):
try:
self.kafka_conn.close()
except Exception:
log.exception('Error cleaning up Kafka connection')
class KafkaCheck(checks.AgentCheck):
""" Checks the configured kafka instance reporting the consumption lag
for each partition per topic in each consumer group. If full_output
@ -44,78 +57,94 @@ consumer_groups:
mytopic1: [10, 12]
''')
def _get_kafka_offsets(self, kafka_conn, consumer_groups):
# Query Kafka for consumer offsets
consumer_offsets = {}
topics = collections.defaultdict(set)
for consumer_group, topic_partitions in consumer_groups.iteritems():
for topic, partitions in topic_partitions.iteritems():
kafka_consumer = consumer.SimpleConsumer(kafka_conn, consumer_group, topic)
if len(partitions) == 0: # Use all found partitions if none are specified.
try:
partitions = kafka_consumer.offsets.keys()
except Exception:
kafka_consumer.stop()
self.log.error('Error fetching partition list for topic {0}'.format(topic))
continue
# Remember the topic partitions encountered so that we can look up their broker offsets later
topics[topic].update(set(partitions))
consumer_offsets[(consumer_group, topic)] = {}
for partition in partitions:
try:
consumer_offsets[(consumer_group, topic)][partition] = kafka_consumer.offsets[partition]
except KeyError:
kafka_consumer.stop()
self.log.error('Error fetching consumer offset for {0} partition {1}'.format(topic, partition))
kafka_consumer.stop()
# Query Kafka for the broker offsets, done in a separate loop so only one query is done
# per topic/partition even if multiple consumer groups watch the same topic
broker_offsets = {}
for topic, partitions in topics.iteritems():
offset_responses = []
for p in partitions:
try:
response = kafka_conn.send_offset_request([common.OffsetRequest(topic, p, -1, 1)])
offset_responses.append(response[0])
except common.KafkaError as e:
self.log.error("Error fetching broker offset: {0}".format(e))
for resp in offset_responses:
broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0]
return consumer_offsets, broker_offsets
def check(self, instance):
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
kafka_host_ports = self.read_config(instance, 'kafka_connect_str')
full_output = self.read_config(instance, 'full_output', cast=bool)
full_output = self.read_config(instance, 'full_output', cast=bool, optional=True)
per_partition = self.read_config(instance, 'per_partition', cast=bool, optional=True)
if not per_partition:
full_output = False
dimensions = {'component': 'kafka', 'service': 'kafka'}
try:
# Connect to Kafka
kafka_conn = client.KafkaClient(kafka_host_ports)
# Connect to Kafka and pull information
with KafkaConnection(kafka_host_ports) as kafka_conn:
consumer_offsets, broker_offsets = self._get_kafka_offsets(kafka_conn, consumer_groups)
# Query Kafka for consumer offsets
consumer_offsets = {}
topics = collections.defaultdict(set)
for consumer_group, topic_partitions in consumer_groups.iteritems():
for topic, partitions in topic_partitions.iteritems():
kafka_consumer = consumer.SimpleConsumer(kafka_conn, consumer_group, topic)
# Remember the topic partitions that we've see so that we can
# look up their broker offsets later
topics[topic].update(set(partitions))
for partition in partitions:
try:
consumer_offsets[(consumer_group, topic, partition)] = kafka_consumer.offsets[partition]
except KeyError:
kafka_consumer.stop()
self.log.error('Error fetching consumer offset for {0} partition {1}'.format(topic, partition))
kafka_consumer.stop()
# Query Kafka for the broker offsets, done in a separate loop so only one query is done
# per topic even if multiple consumer groups watch the same topic
broker_offsets = {}
for topic, partitions in topics.items():
offset_responses = []
for p in partitions:
try:
response = kafka_conn.send_offset_request([common.OffsetRequest(topic, p, -1, 1)])
offset_responses.append(response[0])
except common.KafkaError as e:
self.log.error("Error fetching broker offset: {0}".format(e))
for resp in offset_responses:
broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0]
finally:
try:
kafka_conn.close()
except Exception:
self.log.exception('Error cleaning up Kafka connection')
# Report the broker data
# Report the broker data if full output
if full_output:
broker_dimensions = dimensions.copy()
for (topic, partition), broker_offset in broker_offsets.items():
broker_dimensions.update({'topic': topic,
'partition': partition})
for (topic, partition), broker_offset in broker_offsets.iteritems():
broker_dimensions.update({'topic': topic, 'partition': partition})
broker_offset = broker_offsets.get((topic, partition))
self.gauge('kafka.broker_offset',
broker_offset,
self.gauge('kafka.broker_offset', broker_offset,
dimensions=self._set_dimensions(broker_dimensions, instance))
# Report the consumer data
consumer_dimensions=dimensions.copy()
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
# Get the broker offset
broker_offset = broker_offsets.get((topic, partition))
# Report the consumer offset and lag
consumer_dimensions.update({'topic': topic,
'partition': partition,
'consumer_group': consumer_group})
if full_output:
self.gauge('kafka.consumer_offset',
consumer_offset,
consumer_dimensions = dimensions.copy()
for (consumer_group, topic), offsets in consumer_offsets.iteritems():
if per_partition:
for partition, consumer_offset in offsets.iteritems():
# Get the broker offset
broker_offset = broker_offsets.get((topic, partition))
# Report the consumer offset and lag
consumer_dimensions.update({'topic': topic, 'partition': partition,
'consumer_group': consumer_group})
if full_output:
self.gauge('kafka.consumer_offset', consumer_offset,
dimensions=self._set_dimensions(consumer_dimensions, instance))
self.gauge('kafka.consumer_lag', broker_offset - consumer_offset,
dimensions=self._set_dimensions(consumer_dimensions, instance))
else:
consumer_dimensions.update({'topic': topic, 'consumer_group': consumer_group})
total_lag = 0
for partition, consumer_offset in offsets.iteritems():
broker_offset = broker_offsets.get((topic, partition))
total_lag += broker_offset - consumer_offset
self.gauge('kafka.consumer_lag', total_lag,
dimensions=self._set_dimensions(consumer_dimensions, instance))
self.gauge('kafka.consumer_lag',
broker_offset - consumer_offset,
dimensions=self._set_dimensions(consumer_dimensions, instance))

View File

@ -32,31 +32,28 @@ class Kafka(Plugin):
self.available = True
def _detect_consumers(self):
""" Using zookeeper and a kafka connection find the consumers, associated topics and partitions.
"""
"""Using zookeeper and a kafka connection find the consumers and associated topics. """
try:
# The kafka api provides no way to discover existing consumer groups so a query to
# zookeeper must be made. This is unfortunately fragile as kafka is moving away from
# zookeeper. Tested with kafka 0.8.1.1
from kafka.client import KafkaClient
kafka_connect_str = self._find_kafka_connection()
kafka = KafkaClient(kafka_connect_str)
# {'consumer_group_name': { 'topic1': [ 0, 1, 2] # partitions }}
consumers = {}
# Find consumers and topics
for consumer in self._ls_zookeeper('/consumers'):
consumers[consumer] = dict((topic, kafka.topic_partitions[topic])
for topic in self._ls_zookeeper('/consumers/%s/offsets' % consumer))
consumers[consumer] = dict((topic, [])
for topic in self._ls_zookeeper('/consumers/%s/offsets' % consumer))
log.info("\tInstalling kafka_consumer plugin.")
self.config['kafka_consumer'] = {'init_config': None,
'instances': [{'name': kafka_connect_str,
'kafka_connect_str': kafka_connect_str,
'full_output': True,
'per_partition': False,
'consumer_groups': dict(consumers)}]}
except Exception:
log.error('Error Detecting Kafka consumers/topics/partitions')
log.error('Error Detecting Kafka consumers/topics')
def _find_kafka_connection(self):
listen_ip = find_addr_listening_on_port(self.port)