Fix detection and operation of Kafka plugin

Change-Id: I6d14ddcf3e103b8bc66d3113466c0685fcb92bdd
This commit is contained in:
Tim Kuhlman 2014-07-28 14:11:46 -06:00
parent 43ac20853b
commit 1043c7e242
6 changed files with 201 additions and 198 deletions

View File

@ -4,6 +4,8 @@ instances:
# - kafka_connect_str: localhost:19092
# zk_connect_str: localhost:2181
# zk_prefix: /0.8
# By default shows the lag for the various topics, full includes additional metrics with actual offsets
# full_output: False
# consumer_groups:
# my_consumer:
# my_topic: [0, 1, 4, 12]

View File

@ -1,109 +1,31 @@
from collections import defaultdict
import sys
import random
if sys.version_info < (2, 6):
# Normally we'd write our checks to be compatible with >= python 2.4 but
# the dependencies of this check are not compatible with 2.4 and would
# be too much work to rewrite, so raise an exception here.
raise Exception('kafka_consumer check requires at least Python 2.6')
from collections import defaultdict
from monagent.collector.checks import AgentCheck
try:
from kafka.client import KafkaClient
from kafka.common import OffsetRequest
from kafka.consumer import SimpleConsumer
except ImportError:
raise Exception('Missing python dependency: kafka (https://github.com/mumrah/kafka-python)')
try:
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
except ImportError:
raise Exception('Missing python dependency: kazoo (https://github.com/python-zk/kazoo)')
from monagent.collector.checks import AgentCheck
class KafkaCheck(AgentCheck):
def check(self, instance):
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
zk_connect_str = self.read_config(instance, 'zk_connect_str')
kafka_host_ports = self.read_config(instance, 'kafka_connect_str',
cast=self._parse_connect_str)
# Construct the Zookeeper path pattern
zk_prefix = instance.get('zk_prefix', '')
zk_path_tmpl = zk_prefix + '/consumers/%s/offsets/%s/%s'
# Connect to Zookeeper
zk_conn = KazooClient(zk_connect_str)
zk_conn.start()
try:
# Query Zookeeper for consumer offsets
consumer_offsets = {}
topics = defaultdict(set)
for consumer_group, topic_partitions in consumer_groups.iteritems():
for topic, partitions in topic_partitions.iteritems():
# Remember the topic partitions that we've see so that we can
# look up their broker offsets later
topics[topic].update(set(partitions))
for partition in partitions:
zk_path = zk_path_tmpl % (consumer_group, topic, partition)
try:
consumer_offset = int(zk_conn.get(zk_path)[0])
key = (consumer_group, topic, partition)
consumer_offsets[key] = consumer_offset
except NoNodeError:
self.log.warn('No zookeeper node at %s' % zk_path)
except Exception:
self.log.exception('Could not read consumer offset from %s' % zk_path)
finally:
try:
zk_conn.stop()
zk_conn.close()
except Exception:
self.log.exception('Error cleaning up Zookeeper connection')
# Connect to Kafka
kafka_host, kafka_port = random.choice(kafka_host_ports)
kafka_conn = KafkaClient(kafka_host, kafka_port)
try:
# Query Kafka for the broker offsets
broker_offsets = {}
for topic, partitions in topics.items():
offset_responses = kafka_conn.send_offset_request([
OffsetRequest(topic, p, -1, 1) for p in partitions])
for resp in offset_responses:
broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0]
finally:
try:
kafka_conn.close()
except Exception:
self.log.exception('Error cleaning up Kafka connection')
# Report the broker data
for (topic, partition), broker_offset in broker_offsets.items():
broker_dimensions = {'topic': topic, 'partition': partition}
broker_offset = broker_offsets.get((topic, partition))
self.gauge('kafka.broker_offset', broker_offset, dimensions=broker_dimensions)
# Report the consumer
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
# Get the broker offset
broker_offset = broker_offsets.get((topic, partition))
# Report the consumer offset and lag
dimensions = {'topic': topic, 'partition': partition, 'consumer_group': consumer_group}
self.gauge('kafka.consumer_offset', consumer_offset, dimensions=dimensions)
self.gauge('kafka.consumer_lag', broker_offset - consumer_offset,
dimensions=dimensions)
# Private config validation/marshalling functions
""" Checks the configured kafka instance reporting the consumption lag
for each partition per topic in each consumer group. If full_output
is set also reports broker offsets and the current consumer offset.
Works on Kafka version >= 0.8.1.1
"""
def _validate_consumer_groups(self, val):
""" Private config validation/marshalling functions
"""
try:
consumer_group, topic_partitions = val.items()[0]
assert isinstance(consumer_group, (str, unicode))
@ -122,16 +44,58 @@ consumer_groups:
mytopic1: [10, 12]
''')
def _parse_connect_str(self, val):
def check(self, instance):
consumer_groups = self.read_config(instance, 'consumer_groups',
cast=self._validate_consumer_groups)
kafka_host_ports = self.read_config(instance, 'kafka_connect_str')
full_output = self.read_config(instance, 'full_output', cast=bool)
try:
host_port_strs = val.split(',')
host_ports = []
for hp in host_port_strs:
host, port = hp.strip().split(':')
host_ports.append((host, int(port)))
return host_ports
except Exception as e:
self.log.exception(e)
raise Exception(
'Could not parse %s. Must be in the form of `host0:port0,host1:port1,host2:port2`' %
val)
# Connect to Kafka
kafka_conn = KafkaClient(kafka_host_ports)
# Query Kafka for consumer offsets
consumer_offsets = {}
topics = defaultdict(set)
for consumer_group, topic_partitions in consumer_groups.iteritems():
for topic, partitions in topic_partitions.iteritems():
consumer = SimpleConsumer(kafka_conn, consumer_group, topic)
# Remember the topic partitions that we've see so that we can
# look up their broker offsets later
topics[topic].update(set(partitions))
for partition in partitions:
consumer_offsets[(consumer_group, topic, partition)] = consumer.offsets[partition]
# Query Kafka for the broker offsets, done in a separate loop so only one query is done
# per topic even if multiple consumer groups watch the same topic
broker_offsets = {}
for topic, partitions in topics.items():
offset_responses = kafka_conn.send_offset_request([
OffsetRequest(topic, p, -1, 1) for p in partitions])
for resp in offset_responses:
broker_offsets[(resp.topic, resp.partition)] = resp.offsets[0]
finally:
try:
kafka_conn.close()
except Exception:
self.log.exception('Error cleaning up Kafka connection')
# Report the broker data
if full_output:
for (topic, partition), broker_offset in broker_offsets.items():
broker_dimensions = {'topic': topic, 'partition': partition}
broker_offset = broker_offsets.get((topic, partition))
self.gauge('kafka.broker_offset', broker_offset, dimensions=broker_dimensions)
# Report the consumer data
for (consumer_group, topic, partition), consumer_offset in consumer_offsets.items():
# Get the broker offset
broker_offset = broker_offsets.get((topic, partition))
# Report the consumer offset and lag
dimensions = {'topic': topic, 'partition': partition, 'consumer_group': consumer_group}
if full_output:
self.gauge('kafka.consumer_offset', consumer_offset, dimensions=dimensions)
self.gauge('kafka.consumer_lag', broker_offset - consumer_offset,
dimensions=dimensions)

View File

@ -1,73 +0,0 @@
import collections
import logging
import monsetup.agent_config
import monsetup.detection
log = logging.getLogger(__name__)
class Kafka(monsetup.detection.Plugin):
"""Detect Kafka daemons and sets up configuration to monitor them.
This plugin configures the kafka_consumer plugin and does not configure any jmx based checks against kafka.
Note this plugin will pull the same information from kafka on each node in the cluster it runs on.
"""
def _detect(self):
"""Run detection, set self.available True if the service is detected.
"""
if monsetup.detection.find_process_cmdline('kafka') is not None:
self.available = True
def build_config(self):
"""Build the config as a Plugins object and return.
"""
config = monsetup.agent_config.Plugins()
# First watch the process
config.merge(monsetup.detection.watch_process(['kafka']))
log.info("\tWatching the kafka process.")
if self.dependencies_installed():
# todo this naively assumes zookeeper is also available on localhost
import kazoo
from kazoo.client import KazooClient
# kazoo fills up the console without this
logging.getLogger('kazoo').setLevel(logging.WARN)
zk = KazooClient(hosts='127.0.0.1:2181', read_only=True)
zk.start()
topics = {}
for topic in zk.get_children('/brokers/topics'):
topics[topic] = zk.get_children('/brokers/topics/%s/partitions' % topic)
# {'consumer_group_name': { 'topic1': [ 0, 1, 2] # partitions }}
consumers = collections.defaultdict(dict)
for consumer in zk.get_children('/consumers'):
try:
for topic in zk.get_children('/consumers/%s/offsets' % consumer):
if topic in topics:
consumers[consumer][topic] = topics[topic]
except kazoo.exceptions.NoNodeError:
continue
log.info("\tInstalling kafka_consumer plugin.")
config['kafka_consumer'] = {'init_config': None,
'instances': [{'kafka_connect_str': 'localhost:9092',
'zk_connect_str': 'localhost:2181',
'consumer_groups': dict(consumers)}]}
return config
def dependencies_installed(self):
try:
import kafka
import kazoo
except ImportError:
return False
return True

View File

@ -0,0 +1,118 @@
import logging
import re
from subprocess import CalledProcessError, check_output
from monsetup.detection import Plugin, find_process_cmdline, watch_process
from monsetup.detection.utils import find_addr_listening_on_port
from monsetup import agent_config
log = logging.getLogger(__name__)
class Kafka(Plugin):
"""Detect Kafka daemons and sets up configuration to monitor them.
This plugin configures the kafka_consumer plugin and does not configure any jmx based checks against kafka.
Note this plugin will pull the same information from kafka on each node in the cluster it runs on.
For more information see:
- https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
-
"""
def __init__(self, template_dir, overwrite=True, alarms=None, port=9092):
Plugin.__init__(self, template_dir, overwrite, alarms)
self.port = port
self.zk_url = self._find_zookeeper_url()
self.config = agent_config.Plugins()
def _detect(self):
"""Run detection, set self.available True if the service is detected."""
if find_process_cmdline('kafka') is not None:
self.available = True
def _detect_consumers(self):
""" Using zookeeper and a kafka connection find the consumers, associated topics and partitions.
"""
try:
# The kafka api provides no way to discover existing consumer groups so a query to
# zookeeper must be made. This is unfortunately fragile as kafka is moving away from
# zookeeper. Tested with kafka 0.8.1.1
from kafka.client import KafkaClient
kafka_connect_str = self._find_kafka_connection()
kafka = KafkaClient(kafka_connect_str)
# {'consumer_group_name': { 'topic1': [ 0, 1, 2] # partitions }}
consumers = {}
# Find consumers and topics
for consumer in self._ls_zookeeper('/consumers'):
consumers[consumer] = {topic: kafka.topic_partitions[topic]
for topic in self._ls_zookeeper('/consumers/%s/offsets' % consumer)}
log.info("\tInstalling kafka_consumer plugin.")
self.config['kafka_consumer'] = {'init_config': None,
'instances': [{'kafka_connect_str': kafka_connect_str, 'full_output': False,
'consumer_groups': dict(consumers)}]}
except Exception:
log.exception('Error Detecting Kafka consumers/topics/partitions')
def _find_kafka_connection(self):
listen_ip = find_addr_listening_on_port(self.port)
if listen_ip is None:
log.error("\tKafka not found on port {:d}, using 'localhost'".format(self.port))
listen_ip = 'localhost'
else:
log.info("\tKafka found listening on {:s}:{:d}".format(listen_ip, self.port))
return "{:s}:{:d}".format(listen_ip, self.port)
@staticmethod
def _find_zookeeper_url():
""" Pull the zookeeper url the kafka config.
:return: Zookeeper url
"""
zk_connect = re.compile('zookeeper.connect=(.*)')
with open('/etc/kafka/server.properties') as settings:
match = zk_connect.search(settings.read())
if match is None:
log.error('No zookeeper url found in the kafka server properties.')
return None
return match.group(1).split(',')[0] # Only use the first zk url
def _ls_zookeeper(self, path):
""" Do a ls on the given zookeeper path.
I am using the local command line kafka rather than kazoo because it doesn't make sense to
have kazoo as a dependency only for detection.
"""
zk_shell = ['/opt/kafka/bin/zookeeper-shell.sh', self.zk_url, 'ls', path]
try:
output = check_output(zk_shell)
except CalledProcessError:
log.exception('Error running the zookeeper shell to list path %s' % path)
raise
# The last line is like '[item1, item2, item3]'
return [entry.strip() for entry in output.splitlines()[-1].strip('[]').split(',')]
def build_config(self):
"""Build the config as a Plugins object and return.
Config includes: consumer_groups (include topics) and kafka_connection_str
"""
# First watch the process
self.config.merge(watch_process(['kafka']))
log.info("\tWatching the kafka process.")
if self.dependencies_installed() and self.zk_url is not None:
self._detect_consumers()
else:
log.warning("Dependencies not installed, skipping plugin configuration.")
return self.config
def dependencies_installed(self):
try:
import kafka
except ImportError:
return False
return True

View File

@ -7,7 +7,6 @@ from monsetup import agent_config
def find_process_cmdline(search_string):
"""Simple function to search running process for one with cmdline containing.
"""
for process in psutil.process_iter():
for arg in process.cmdline():
@ -19,7 +18,6 @@ def find_process_cmdline(search_string):
def find_process_name(pname):
"""Simple function to search running process for one with pname.
"""
for process in psutil.process_iter():
if pname == process.name():
@ -28,9 +26,15 @@ def find_process_name(pname):
return None
def find_addr_listening_on_port(port):
"""Return the IP address which is listening on the specified TCP port."""
for conn in psutil.net_connections(kind='tcp'):
if conn.laddr[1] == port and conn.status == psutil.CONN_LISTEN:
return conn.laddr[0].lstrip("::ffff:")
def watch_process(search_strings, service=None, component=None):
"""Takes a list of process search strings and returns a Plugins object with the config set.
This was built as a helper as many plugins setup process watching
"""
config = agent_config.Plugins()
@ -48,7 +52,6 @@ def watch_process(search_strings, service=None, component=None):
def service_api_check(name, url, pattern, service=None, component=None):
"""Setup a service api to be watched by the http_check plugin.
"""
config = agent_config.Plugins()
parameters = {'name': name,

View File

@ -9,30 +9,19 @@ import pwd
import socket
import subprocess
import sys
import yaml
import platform
import yaml
import agent_config
from detection.plugins import kafka
from detection.plugins import mon
from detection.plugins import mysql
from detection.plugins import rabbitmq
from detection.plugins import network
from detection.plugins import zookeeper
from detection.plugins import nova
from detection.plugins import glance
from detection.plugins import cinder
from detection.plugins import neutron
from detection.plugins import swift
from detection.plugins import keystone
from detection.plugins import ceilometer
from detection.plugins import kafka_consumer, mon, mysql, network, zookeeper
from detection.plugins import nova, glance, cinder, neutron, swift
from detection.plugins import keystone, ceilometer
from service import sysv
# List of all detection plugins to run
DETECTION_PLUGINS = [kafka.Kafka, mon.MonAPI, mon.MonPersister, mon.MonThresh, mysql.MySQL,
rabbitmq.RabbitMQ, network.Network, nova.Nova, cinder.Cinder, swift.Swift,
glance.Glance, ceilometer.Ceilometer, neutron.Neutron, keystone.Keystone,
zookeeper.Zookeeper]
DETECTION_PLUGINS = [kafka_consumer.Kafka, mon.MonAPI, mon.MonPersister, mon.MonThresh, mysql.MySQL,
network.Network, nova.Nova, cinder.Cinder, swift.Swift, glance.Glance,
ceilometer.Ceilometer, neutron.Neutron, keystone.Keystone, zookeeper.Zookeeper]
# Map OS to service type
OS_SERVICE_MAP = {'Linux': sysv.SysV}
@ -134,7 +123,7 @@ def main(argv=None):
with open(config_path, 'w') as config_file:
os.chmod(config_path, 0o640)
os.chown(config_path, 0, gid)
config_file.write(yaml.dump(value))
config_file.write(yaml.safe_dump(value, encoding='utf-8', allow_unicode=True))
# Now that the config is build start the service
try: