Merge "Kafka healthcheck"
This commit is contained in:
commit
f3cbb73f70
0
monasca_common/healthcheck/__init__.py
Normal file
0
monasca_common/healthcheck/__init__.py
Normal file
0
monasca_common/healthcheck/checks/__init__.py
Normal file
0
monasca_common/healthcheck/checks/__init__.py
Normal file
103
monasca_common/healthcheck/checks/kafka.py
Normal file
103
monasca_common/healthcheck/checks/kafka.py
Normal file
@ -0,0 +1,103 @@
|
||||
# Copyright 2016 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kafka import client
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_common.healthcheck import result
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
kafka_check_opts = [
|
||||
cfg.StrOpt('kafka_url',
|
||||
required=True,
|
||||
help='Url to kafka server'),
|
||||
cfg.ListOpt('kafka_topics',
|
||||
required=True,
|
||||
default=['logs'],
|
||||
help='Verify existence of configured topics')
|
||||
]
|
||||
kafka_check_group = cfg.OptGroup(name='kafka_healthcheck',
|
||||
title='kafka_healthcheck')
|
||||
|
||||
cfg.CONF.register_group(kafka_check_group)
|
||||
cfg.CONF.register_opts(kafka_check_opts, kafka_check_group)
|
||||
|
||||
|
||||
class KafkaHealthCheck(object):
|
||||
"""Evaluates kafka health
|
||||
|
||||
Healthcheck verifies if:
|
||||
|
||||
* kafka server is up and running
|
||||
* there is a configured topic in kafka
|
||||
|
||||
If following conditions are met healthcheck returns healthy status.
|
||||
Otherwise unhealthy status is returned with explanation.
|
||||
|
||||
Example of middleware configuration:
|
||||
|
||||
.. code-block:: ini
|
||||
|
||||
[kafka_healthcheck]
|
||||
kafka_url = localhost:8900
|
||||
kafka_topics = log
|
||||
|
||||
Note:
|
||||
It is possible to specify multiple topics if necessary.
|
||||
Just separate them with ,
|
||||
|
||||
"""
|
||||
|
||||
def healthcheck(self):
|
||||
url = CONF.kafka_healthcheck.kafka_url
|
||||
|
||||
try:
|
||||
kafka_client = client.KafkaClient(hosts=url)
|
||||
except client.KafkaUnavailableError as ex:
|
||||
LOG.error(repr(ex))
|
||||
error_str = 'Could not connect to kafka at %s' % url
|
||||
return result.HealthCheckResult(healthy=False, message=error_str)
|
||||
|
||||
self._disconnect_gracefully(kafka_client)
|
||||
|
||||
return self._verify_topics(kafka_client)
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
def _verify_topics(self, kafka_client):
|
||||
topics = CONF.kafka_healthcheck.kafka_topics
|
||||
|
||||
for t in topics:
|
||||
# kafka client loads metadata for topics as fast
|
||||
# as possible (happens in __init__), therefore this
|
||||
# topic_partitions is sure to be filled
|
||||
for_topic = t in kafka_client.topic_partitions
|
||||
if not for_topic:
|
||||
error_str = 'Kafka: Topic %s not found' % t
|
||||
LOG.error(error_str)
|
||||
return result.HealthCheckResult(healthy=False, message=error_str)
|
||||
|
||||
return result.HealthCheckResult(healthy=True, message='OK')
|
||||
|
||||
# noinspection PyMethodMayBeStatic
|
||||
def _disconnect_gracefully(self, kafka_client):
|
||||
# at this point, client is connected so it must be closed
|
||||
# regardless of topic existence
|
||||
try:
|
||||
kafka_client.close()
|
||||
except Exception as ex:
|
||||
# log that something went wrong and move on
|
||||
LOG.error(repr(ex))
|
19
monasca_common/healthcheck/result.py
Normal file
19
monasca_common/healthcheck/result.py
Normal file
@ -0,0 +1,19 @@
|
||||
# Copyright 2016 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
|
||||
HealthCheckResult = collections.namedtuple('CheckResult',
|
||||
['healthy', 'message'])
|
||||
"""Result from the healthcheck, contains healthy(boolean) and message"""
|
Loading…
Reference in New Issue
Block a user