Adding healthcheck

Healthcheck allows to verify if:
 - API is up and running
 - Kafka, that monasca-log-api sends data to, is up and running
 and an expected topic can be found there.

- added documentation entries

Change-Id: I316c1d9518cfed37119f11c326c071bfbfc7658e
This commit is contained in:
Tomasz Trębski 2015-11-25 11:04:29 +01:00
parent cb54d3e496
commit 412892aed2
14 changed files with 527 additions and 4 deletions

View File

@ -0,0 +1,21 @@
monasca_log_api.healthcheck package
monasca_log_api.healthcheck.kafka_check module
.. automodule:: monasca_log_api.healthcheck.kafka_check
monasca_log_api.healthcheck.keystone_protocol module
.. automodule:: monasca_log_api.healthcheck.keystone_protocol

View File

@ -9,6 +9,7 @@ Subpackages

View File

@ -9,6 +9,7 @@ debug=True
logs = monasca_log_api.v2.reference.logs:Logs
versions = monasca_log_api.v2.reference.versions:Versions
healthchecks = monasca_log_api.v2.reference.healthchecks:HealthChecks
max_log_size = 1048576
@ -29,7 +30,11 @@ certfile =
keyfile =
insecure = false
kafka_url = localhost:8900
kafka_topics = log
path = /v2.0/log
default_roles = monasca-user
agent_roles = monasca-log-agent
agent_roles = monasca-log-agent

View File

@ -8,7 +8,7 @@ pipeline = auth roles api
paste.app_factory = monasca_log_api.server:launch
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
paste.filter_factory = monasca_log_api.healthcheck.keystone_protocol:filter_factory
paste.filter_factory = monasca_log_api.middleware.role_middleware:RoleMiddleware.factory

View File

@ -0,0 +1,60 @@
# Copyright 2016 FUJITSU LIMITED
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import falcon
from oslo_log import log
LOG = log.getLogger(__name__)
HealthCheckResult = collections.namedtuple('HealthCheckResult',
['status', 'details'])
# TODO(feature) monasca-common candidate
class HealthChecksApi(object):
"""HealthChecks Api
HealthChecksApi server information regarding health of the API.
def __init__(self):
super(HealthChecksApi, self).__init__()'Initializing HealthChecksApi!')
def on_get(self, req, res):
"""Complex healthcheck report on GET.
Returns complex report regarding API well being
and all dependent services.
:param falcon.Request req: current request
:param falcon.Response res: current response
res.status = falcon.HTTP_501
def on_head(self, req, res):
"""Simple healthcheck report on HEAD.
In opposite to :py:meth:`.HealthChecksApi.on_get`, this
method is supposed to execute ASAP to inform user that
API is up and running.
:param falcon.Request req: current request
:param falcon.Response res: current response
res.status = falcon.HTTP_501

View File

@ -0,0 +1 @@
"""Base package for monasca-log-api healthcheck"""

View File

@ -0,0 +1,109 @@
# Copyright 2015 FUJITSU LIMITED
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import kafka.client as client
from oslo_config import cfg
from oslo_log import log
LOG = log.getLogger(__name__)
kafka_check_opts = [
help='Url to kafka server'),
help='Verify existence of configured topics')
kafka_check_group = cfg.OptGroup(name='kafka_healthcheck',
cfg.CONF.register_opts(kafka_check_opts, kafka_check_group)
CheckResult = collections.namedtuple('CheckResult', ['healthy', 'message'])
"""Result from the healthcheck, contains healthy(boolean) and message"""
# TODO(feature) monasca-common candidate
class KafkaHealthCheck(object):
"""Evaluates kafka health
Healthcheck verifies if:
* kafka server is up and running
* there is a configured topic in kafka
If following conditions are met healthcheck returns healthy status.
Otherwise unhealthy status is returned with explanation.
Example of middleware configuration:
.. code-block:: ini
kafka_url = localhost:8900
kafka_topics = log
It is possible to specify multiple topics if necessary.
Just separate them with ,
def healthcheck(self):
url = CONF.kafka_healthcheck.kafka_url
kafka_client = client.KafkaClient(hosts=url)
except client.KafkaUnavailableError as ex:
error_str = 'Could not connect to kafka at %s' % url
return CheckResult(healthy=False, message=error_str)
result = self._verify_topics(kafka_client)
return result
# noinspection PyMethodMayBeStatic
def _verify_topics(self, kafka_client):
topics = CONF.kafka_healthcheck.kafka_topics
for t in topics:
# kafka client loads metadata for topics as fast
# as possible (happens in __init__), therefore this
# topic_partitions is sure to be filled
for_topic = t in kafka_client.topic_partitions
if not for_topic:
error_str = 'Kafka: Topic %s not found' % t
return CheckResult(healthy=False, message=error_str)
return CheckResult(healthy=True, message='OK')
# noinspection PyMethodMayBeStatic
def _disconnect_gracefully(self, kafka_client):
# at this point, client is connected so it must be closed
# regardless of topic existence
except Exception as ex:
# log that something went wrong and move on

View File

@ -0,0 +1,60 @@
# Copyright 2015 FUJITSU LIMITED
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystonemiddleware import auth_token
from oslo_log import log
LOG = log.getLogger(__name__)
class SkippingAuthProtocol(auth_token.AuthProtocol):
"""SkippingAuthProtocol to reach healthcheck endpoint
Because healthcheck endpoints exists as endpoint, it
is hidden behind keystone filter thus a request
needs to authenticated before it is reached.
SkippingAuthProtocol is lean customization
of :py:class:`keystonemiddleware.auth_token.AuthProtocol`
that disables keystone communication if request
is meant to reach healthcheck
def process_request(self, request):
path = request.path
if path == '/healthcheck':
LOG.debug(('Request path is %s and it does not require keystone '
'communication'), path)
return None # return NONE to reach actual logic
return super(SkippingAuthProtocol, self).process_request(request)
def filter_factory(global_conf, **local_conf): # pragma: no cover
"""Return factory function for :py:class:`.SkippingAuthProtocol`
:param global_conf: global configuration
:param local_conf: local configuration
:return: factory function
:rtype: function
conf = global_conf.copy()
def auth_filter(app):
return SkippingAuthProtocol(app, conf)
return auth_filter

View File

@ -28,10 +28,16 @@ CONF = cfg.CONF
dispatcher_opts = [
help='Versions endpoint'),
help='Logs endpoint'),
help='Healthchecks endpoint')
dispatcher_group = cfg.OptGroup(name='dispatcher', title='dispatcher')
@ -53,12 +59,18 @@ def launch(conf, config_file='/etc/monasca/log-api-config.conf'):
LOG.debug('Dispatcher drivers have been added to the routes!')
return app
def load_healthcheck_resource(app):
healthchecks = simport.load(CONF.dispatcher.healthchecks)()
app.add_route('/healthcheck', healthchecks)
def load_logs_resource(app):
logs = simport.load(CONF.dispatcher.logs)()
app.add_route('/v2.0/log/single', logs)

View File

@ -0,0 +1,76 @@
# Copyright 2015 FUJITSU LIMITED
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from falcon import testing
import mock
import simplejson as json
from monasca_log_api.healthcheck import kafka_check as healthcheck
from monasca_log_api.tests import base
from monasca_log_api.v2.reference import healthchecks
ENDPOINT = '/healthcheck'
class TestHealthChecks(testing.TestBase):
def before(self):
self.conf = base.mock_config(self)
self.resource = healthchecks.HealthChecks()
def test_should_return_200_for_head(self):
self.simulate_request(ENDPOINT, method='HEAD')
self.assertEqual(falcon.HTTP_NO_CONTENT, self.srmock.status)
def test_should_report_healthy_if_kafka_healthy(self, kafka_check):
kafka_check.healthcheck.return_value = healthcheck.CheckResult(True,
self.resource._kafka_check = kafka_check
ret = self.simulate_request(ENDPOINT,
'Content-Type': 'application/json'
self.assertEqual(falcon.HTTP_OK, self.srmock.status)
ret = json.loads(ret)
self.assertIn('kafka', ret)
self.assertEqual('OK', ret.get('kafka'))
def test_should_report_unhealthy_if_kafka_healthy(self, kafka_check):
url = 'localhost:8200'
err_str = 'Could not connect to kafka at %s' % url
kafka_check.healthcheck.return_value = healthcheck.CheckResult(False,
self.resource._kafka_check = kafka_check
ret = self.simulate_request(ENDPOINT,
'Content-Type': 'application/json'
self.assertEqual(falcon.HTTP_SERVICE_UNAVAILABLE, self.srmock.status)
ret = json.loads(ret)
self.assertIn('kafka', ret)
self.assertEqual(err_str, ret.get('kafka'))

View File

@ -0,0 +1,75 @@
# Copyright 2015 FUJITSU LIMITED
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from falcon import testing
import kafka.client as client
import mock
from monasca_log_api.healthcheck import kafka_check as kc
from monasca_log_api.tests import base
class KafkaCheckLogicTest(testing.TestBase):
mock_kafka_url = 'localhost:1234'
mocked_topics = ['test_1', 'test_2']
mock_config = {
'kafka_url': mock_kafka_url,
'kafka_topics': mocked_topics
def __init__(self, *args, **kwargs):
super(KafkaCheckLogicTest, self).__init__(*args, **kwargs)
self._conf = None
def setUp(self):
super(KafkaCheckLogicTest, self).setUp()
self._conf = base.mock_config(self)
self._conf.config(group='kafka_healthcheck', **self.mock_config)
def test_should_fail_kafka_unavailable(self, kafka_client):
kafka_client.side_effect = client.KafkaUnavailableError()
kafka_health = kc.KafkaHealthCheck()
result = kafka_health.healthcheck()
def test_should_fail_topic_missing(self, kafka_client):
kafka = mock.Mock()
kafka.topic_partitions = [self.mocked_topics[0]]
kafka_client.return_value = kafka
kafka_health = kc.KafkaHealthCheck()
result = kafka_health.healthcheck()
# verify result
# ensure client was closed
def test_should_pass(self, kafka_client):
kafka = mock.Mock()
kafka.topic_partitions = self.mocked_topics
kafka_client.return_value = kafka
kafka_health = kc.KafkaHealthCheck()
result = kafka_health.healthcheck()
# ensure client was closed

View File

@ -0,0 +1,43 @@
# Copyright 2015 FUJITSU LIMITED
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
import mock
from monasca_log_api.healthcheck import keystone_protocol
_APP = mock.Mock()
_CONF = {}
class TestKeystoneProtocol(unittest.TestCase):
def test_should_return_none_if_healthcheck(self):
instance = keystone_protocol.SkippingAuthProtocol(_APP, _CONF)
request = mock.Mock()
request.path = '/healthcheck'
ret_val = instance.process_request(request)
def test_should_enter_keystone_auth_if_not_healthcheck(self, proc_request):
instance = keystone_protocol.SkippingAuthProtocol(_APP, _CONF)
request = mock.Mock()
request.path = '/v2.0/logs/single'

View File

@ -0,0 +1,59 @@
# Copyright 2015 FUJITSU LIMITED
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from import utils as rest_utils
from monasca_log_api.api import healthcheck_api
from monasca_log_api.healthcheck import kafka_check
class HealthChecks(healthcheck_api.HealthChecksApi):
# response configuration
CACHE_CONTROL = ['must-revalidate', 'no-cache', 'no-store']
# response codes
def __init__(self):
self._kafka_check = kafka_check.KafkaHealthCheck()
super(HealthChecks, self).__init__()
def on_head(self, req, res):
res.status = self.HEALTHY_CODE_HEAD
res.cache_control = self.CACHE_CONTROL
def on_get(self, req, res):
# at this point we know API is alive, so
# keep up good work and verify kafka status
kafka_result = self._kafka_check.healthcheck()
# in case it'd be unhealthy,
# message will contain error string
status_data = {
'kafka': kafka_result.message
# Really simple approach, ideally that should be
# part of monasca-common with some sort of registration of
# healthchecks concept
res.status = (self.HEALTHY_CODE_GET
if kafka_result.healthy else self.NOT_HEALTHY_CODE)
res.cache_control = self.CACHE_CONTROL
res.body = rest_utils.as_json(status_data)

View File

@ -1,4 +1,5 @@
# TODO(trebskit) Add pypy to envlist ?
envlist = py27,py3,pep8
skipsdist = True