From dccce05b8685b340e511a5a5942cc6af0334d02c Mon Sep 17 00:00:00 2001 From: Vitaly Gridnev Date: Mon, 15 Feb 2016 16:10:15 +0300 Subject: [PATCH] ambari health check implementation this change implements ambari alerts health checks. ambari service checks seems to be harmful in case of unhealthy clusters, so implementation of that will be postponed. Partially-implements blueprint: cluster-verification Change-Id: I4898d44e392944cdddbd14154738b8935a9b6da4 --- sahara/plugins/ambari/client.py | 6 + sahara/plugins/ambari/health.py | 149 ++++++++++++++++++ sahara/plugins/ambari/plugin.py | 4 + .../tests/unit/plugins/ambari/test_health.py | 119 ++++++++++++++ 4 files changed, 278 insertions(+) create mode 100644 sahara/plugins/ambari/health.py create mode 100644 sahara/tests/unit/plugins/ambari/test_health.py diff --git a/sahara/plugins/ambari/client.py b/sahara/plugins/ambari/client.py index 436b4d65..ec787e14 100644 --- a/sahara/plugins/ambari/client.py +++ b/sahara/plugins/ambari/client.py @@ -58,6 +58,12 @@ class AmbariClient(object): kwargs.update(self._default_client_args) return self._http_client.delete(*args, **kwargs) + def get_alerts_data(self, cluster): + url = self._base_url + "/clusters/%s/alerts?fields=*" % cluster.name + resp = self.get(url) + data = self.check_response(resp) + return data.get('items', []) + @staticmethod def check_response(resp): resp.raise_for_status() diff --git a/sahara/plugins/ambari/health.py b/sahara/plugins/ambari/health.py new file mode 100644 index 00000000..7ecc5448 --- /dev/null +++ b/sahara/plugins/ambari/health.py @@ -0,0 +1,149 @@ +# Copyright (c) 2016 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import functools + +from oslo_log import log as logging +import six + +from sahara.i18n import _ +from sahara.plugins.ambari import client +from sahara.plugins.ambari import common as p_common +from sahara.plugins import utils as plugin_utils +from sahara.service.health import health_check_base + + +LOG = logging.getLogger(__name__) + + +class AlertsProvider(object): + def __init__(self, cluster): + self._data = None + self._cluster_services = None + self._exception_store = None + self.cluster = cluster + # calling to cache all data + self.get_alerts_data() + + def get_cluster_services(self): + return self._cluster_services + + def is_ambari_active(self): + if self._exception_store: + raise health_check_base.RedHealthError(self._exception_store) + return _("Ambari Monitor healthy") + + def get_alerts_data(self, service=None): + if self._data is not None: + # return cached data + return self._data.get(service, []) if service else self._data + self._data = {} + self._cluster_services = [] + try: + ambari = plugin_utils.get_instance( + self.cluster, p_common.AMBARI_SERVER) + password = self.cluster.extra.get("ambari_password") + with client.AmbariClient(ambari, password=password) as ambari: + resp = ambari.get_alerts_data(self.cluster) + for alert in resp: + alert = alert.get('Alert', {}) + service = alert.get('service_name').lower() + if service not in self._data: + self._data[service] = [] + self._cluster_services.append(service) + self._data[service].append(alert) + except Exception as e: + prefix = _("Can't get response from Ambari Monitor") + msg = _("%(problem)s: %(description)s") % { + 'problem': prefix, 'description': six.text_type(e)} + # don't put in exception to logs, it will be done by log.exception + LOG.exception(prefix) + self._exception_store = msg + + +class AmbariHealthCheck(health_check_base.BasicHealthCheck): + def __init__(self, cluster, provider): + self.provider = provider + super(AmbariHealthCheck, self).__init__(cluster) + + def get_health_check_name(self): + return "Ambari alerts health check" + + def is_available(self): + return self.cluster.plugin_name == 'ambari' + + def check_health(self): + return self.provider.is_ambari_active() + + +class AmbariServiceHealthCheck(health_check_base.BasicHealthCheck): + def __init__(self, cluster, provider, service): + self.provider = provider + self.service = service.lower() + super(AmbariServiceHealthCheck, self).__init__(cluster) + + def get_health_check_name(self): + return "Ambari alerts for %s Service" % self.service + + def is_available(self): + return self.cluster.plugin_name == 'ambari' + + def get_important_services(self): + return [ + p_common.HDFS_SERVICE.lower(), + p_common.YARN_SERVICE.lower(), + p_common.OOZIE_SERVICE.lower(), + p_common.ZOOKEEPER_SERVICE.lower() + ] + + def check_health(self): + imp_map = {'OK': 'GREEN', 'WARNING': 'YELLOW', 'CRITICAL': 'RED'} + other_map = {'OK': 'GREEN'} + color_counter = collections.Counter() + important_services = self.get_important_services() + for alert in self.provider.get_alerts_data(self.service): + alert_summary = alert.get('state', 'UNKNOWN') + if self.service in important_services: + target = imp_map.get(alert_summary, 'RED') + else: + target = other_map.get(alert_summary, 'YELLOW') + color_counter[target] += 1 + if color_counter['RED'] > 0 and color_counter['YELLOW'] > 0: + raise health_check_base.RedHealthError( + _("Ambari Monitor has responded that cluster has " + "%(red)d critical and %(yellow)d warning alert(s)") + % {'red': color_counter['RED'], + 'yellow': color_counter['YELLOW']}) + elif color_counter['RED'] > 0: + raise health_check_base.RedHealthError( + _("Ambari Monitor has responded that cluster has " + "%(red)d critical alert(s)") + % {'red': color_counter['RED']}) + elif color_counter['YELLOW'] > 0: + raise health_check_base.YellowHealthError( + _("Ambari Monitor has responded that cluster " + "has %d warning alert(s)") + % color_counter['YELLOW']) + return _("No alerts found") + + +def get_health_checks(cluster): + provider = AlertsProvider(cluster) + checks = [functools.partial(AmbariHealthCheck, provider=provider)] + for service in provider.get_cluster_services(): + checks.append(functools.partial( + AmbariServiceHealthCheck, provider=provider, service=service)) + return checks diff --git a/sahara/plugins/ambari/plugin.py b/sahara/plugins/ambari/plugin.py index 764a82e8..7e1a51c3 100644 --- a/sahara/plugins/ambari/plugin.py +++ b/sahara/plugins/ambari/plugin.py @@ -21,6 +21,7 @@ from sahara.plugins.ambari import common as p_common from sahara.plugins.ambari import configs from sahara.plugins.ambari import deploy from sahara.plugins.ambari import edp_engine +from sahara.plugins.ambari import health from sahara.plugins.ambari import validation from sahara.plugins import provisioning as p from sahara.plugins import utils as plugin_utils @@ -227,3 +228,6 @@ class AmbariPluginProvider(p.ProvisioningPluginBase): for service in node_group.node_processes: ports.extend(ports_map.get(service, [])) return ports + + def get_health_checks(self, cluster): + return health.get_health_checks(cluster) diff --git a/sahara/tests/unit/plugins/ambari/test_health.py b/sahara/tests/unit/plugins/ambari/test_health.py new file mode 100644 index 00000000..557f81b0 --- /dev/null +++ b/sahara/tests/unit/plugins/ambari/test_health.py @@ -0,0 +1,119 @@ +# Copyright (c) 2016 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import six +import testtools + +from sahara.plugins.ambari import health +from sahara.service.health import health_check_base +from sahara.tests.unit import base + + +class TestAmbariHealthCheck(base.SaharaTestCase): + def _standard_negative_test(self, mockclient, return_value, col, count): + mockclient.return_value = return_value + pr = health.AlertsProvider(mock.Mock()) + service = return_value[0].get('Alert').get('service_name') + expected_exc = health_check_base.YellowHealthError + if col == 'RED': + expected_exc = health_check_base.RedHealthError + with testtools.ExpectedException(expected_exc): + try: + health.AmbariServiceHealthCheck( + mock.Mock(extra={}), pr, service).check_health() + except Exception as e: + self.assertEqual( + "Cluster health is %s. Reason: " + "Ambari Monitor has responded that cluster " + "has %s alert(s)" % (col, count), six.text_type(e)) + raise + + @mock.patch('sahara.plugins.ambari.client.AmbariClient.__init__') + @mock.patch('sahara.plugins.ambari.client.AmbariClient.close') + @mock.patch('sahara.plugins.ambari.client.AmbariClient.get_alerts_data') + @mock.patch('sahara.plugins.utils.get_instance') + def test_check_health(self, get_instance, alerts_response, close, init): + init.return_value = None + alerts_response.return_value = [ + { + 'Alert': { + 'state': 'OK', + 'service_name': 'ZOOKEEPER' + } + } + ] + result = health.AmbariServiceHealthCheck( + mock.Mock(extra={}), health.AlertsProvider(mock.Mock()), + 'ZOOKEEPER').check_health() + + self.assertEqual('No alerts found', result) + + self._standard_negative_test(alerts_response, [ + { + 'Alert': { + 'state': 'WARNING', + 'service_name': 'ZOOKEEPER' + } + } + ], 'YELLOW', "1 warning") + + self._standard_negative_test(alerts_response, [ + { + 'Alert': { + 'state': 'CRITICAL', + 'service_name': 'ZOOKEEPER' + } + } + ], 'RED', "1 critical") + + # not important service: only contribute as yellow + self._standard_negative_test(alerts_response, [ + { + 'Alert': { + 'state': 'CRITICAL', + 'service_name': 'Kafka' + } + } + ], 'YELLOW', "1 warning") + + self._standard_negative_test(alerts_response, [ + { + 'Alert': { + 'state': 'CRITICAL', + 'service_name': 'ZOOKEEPER' + }, + }, + { + 'Alert': { + 'state': 'WARNING', + 'service_name': 'ZOOKEEPER' + } + } + ], 'RED', "1 critical and 1 warning") + + alerts_response.side_effect = [ValueError( + "OOUCH!")] + with testtools.ExpectedException(health_check_base.RedHealthError): + try: + health.AmbariHealthCheck( + mock.Mock(extra={}), health.AlertsProvider(mock.Mock()) + ).check_health() + except Exception as e: + self.assertEqual( + "Cluster health is RED. Reason: " + "Can't get response from Ambari Monitor: OOUCH!", + six.text_type(e)) + raise