ambari health check implementation

this change implements ambari alerts health checks.
ambari service checks seems to be harmful in case of unhealthy
clusters, so implementation of that will be postponed.

Partially-implements blueprint: cluster-verification

Change-Id: I4898d44e392944cdddbd14154738b8935a9b6da4
This commit is contained in:
Vitaly Gridnev 2016-02-15 16:10:15 +03:00
parent 941fac83a1
commit dccce05b86
4 changed files with 278 additions and 0 deletions

View File

@ -58,6 +58,12 @@ class AmbariClient(object):
kwargs.update(self._default_client_args)
return self._http_client.delete(*args, **kwargs)
def get_alerts_data(self, cluster):
url = self._base_url + "/clusters/%s/alerts?fields=*" % cluster.name
resp = self.get(url)
data = self.check_response(resp)
return data.get('items', [])
@staticmethod
def check_response(resp):
resp.raise_for_status()

View File

@ -0,0 +1,149 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import functools
from oslo_log import log as logging
import six
from sahara.i18n import _
from sahara.plugins.ambari import client
from sahara.plugins.ambari import common as p_common
from sahara.plugins import utils as plugin_utils
from sahara.service.health import health_check_base
LOG = logging.getLogger(__name__)
class AlertsProvider(object):
def __init__(self, cluster):
self._data = None
self._cluster_services = None
self._exception_store = None
self.cluster = cluster
# calling to cache all data
self.get_alerts_data()
def get_cluster_services(self):
return self._cluster_services
def is_ambari_active(self):
if self._exception_store:
raise health_check_base.RedHealthError(self._exception_store)
return _("Ambari Monitor healthy")
def get_alerts_data(self, service=None):
if self._data is not None:
# return cached data
return self._data.get(service, []) if service else self._data
self._data = {}
self._cluster_services = []
try:
ambari = plugin_utils.get_instance(
self.cluster, p_common.AMBARI_SERVER)
password = self.cluster.extra.get("ambari_password")
with client.AmbariClient(ambari, password=password) as ambari:
resp = ambari.get_alerts_data(self.cluster)
for alert in resp:
alert = alert.get('Alert', {})
service = alert.get('service_name').lower()
if service not in self._data:
self._data[service] = []
self._cluster_services.append(service)
self._data[service].append(alert)
except Exception as e:
prefix = _("Can't get response from Ambari Monitor")
msg = _("%(problem)s: %(description)s") % {
'problem': prefix, 'description': six.text_type(e)}
# don't put in exception to logs, it will be done by log.exception
LOG.exception(prefix)
self._exception_store = msg
class AmbariHealthCheck(health_check_base.BasicHealthCheck):
def __init__(self, cluster, provider):
self.provider = provider
super(AmbariHealthCheck, self).__init__(cluster)
def get_health_check_name(self):
return "Ambari alerts health check"
def is_available(self):
return self.cluster.plugin_name == 'ambari'
def check_health(self):
return self.provider.is_ambari_active()
class AmbariServiceHealthCheck(health_check_base.BasicHealthCheck):
def __init__(self, cluster, provider, service):
self.provider = provider
self.service = service.lower()
super(AmbariServiceHealthCheck, self).__init__(cluster)
def get_health_check_name(self):
return "Ambari alerts for %s Service" % self.service
def is_available(self):
return self.cluster.plugin_name == 'ambari'
def get_important_services(self):
return [
p_common.HDFS_SERVICE.lower(),
p_common.YARN_SERVICE.lower(),
p_common.OOZIE_SERVICE.lower(),
p_common.ZOOKEEPER_SERVICE.lower()
]
def check_health(self):
imp_map = {'OK': 'GREEN', 'WARNING': 'YELLOW', 'CRITICAL': 'RED'}
other_map = {'OK': 'GREEN'}
color_counter = collections.Counter()
important_services = self.get_important_services()
for alert in self.provider.get_alerts_data(self.service):
alert_summary = alert.get('state', 'UNKNOWN')
if self.service in important_services:
target = imp_map.get(alert_summary, 'RED')
else:
target = other_map.get(alert_summary, 'YELLOW')
color_counter[target] += 1
if color_counter['RED'] > 0 and color_counter['YELLOW'] > 0:
raise health_check_base.RedHealthError(
_("Ambari Monitor has responded that cluster has "
"%(red)d critical and %(yellow)d warning alert(s)")
% {'red': color_counter['RED'],
'yellow': color_counter['YELLOW']})
elif color_counter['RED'] > 0:
raise health_check_base.RedHealthError(
_("Ambari Monitor has responded that cluster has "
"%(red)d critical alert(s)")
% {'red': color_counter['RED']})
elif color_counter['YELLOW'] > 0:
raise health_check_base.YellowHealthError(
_("Ambari Monitor has responded that cluster "
"has %d warning alert(s)")
% color_counter['YELLOW'])
return _("No alerts found")
def get_health_checks(cluster):
provider = AlertsProvider(cluster)
checks = [functools.partial(AmbariHealthCheck, provider=provider)]
for service in provider.get_cluster_services():
checks.append(functools.partial(
AmbariServiceHealthCheck, provider=provider, service=service))
return checks

View File

@ -21,6 +21,7 @@ from sahara.plugins.ambari import common as p_common
from sahara.plugins.ambari import configs
from sahara.plugins.ambari import deploy
from sahara.plugins.ambari import edp_engine
from sahara.plugins.ambari import health
from sahara.plugins.ambari import validation
from sahara.plugins import provisioning as p
from sahara.plugins import utils as plugin_utils
@ -227,3 +228,6 @@ class AmbariPluginProvider(p.ProvisioningPluginBase):
for service in node_group.node_processes:
ports.extend(ports_map.get(service, []))
return ports
def get_health_checks(self, cluster):
return health.get_health_checks(cluster)

View File

@ -0,0 +1,119 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import six
import testtools
from sahara.plugins.ambari import health
from sahara.service.health import health_check_base
from sahara.tests.unit import base
class TestAmbariHealthCheck(base.SaharaTestCase):
def _standard_negative_test(self, mockclient, return_value, col, count):
mockclient.return_value = return_value
pr = health.AlertsProvider(mock.Mock())
service = return_value[0].get('Alert').get('service_name')
expected_exc = health_check_base.YellowHealthError
if col == 'RED':
expected_exc = health_check_base.RedHealthError
with testtools.ExpectedException(expected_exc):
try:
health.AmbariServiceHealthCheck(
mock.Mock(extra={}), pr, service).check_health()
except Exception as e:
self.assertEqual(
"Cluster health is %s. Reason: "
"Ambari Monitor has responded that cluster "
"has %s alert(s)" % (col, count), six.text_type(e))
raise
@mock.patch('sahara.plugins.ambari.client.AmbariClient.__init__')
@mock.patch('sahara.plugins.ambari.client.AmbariClient.close')
@mock.patch('sahara.plugins.ambari.client.AmbariClient.get_alerts_data')
@mock.patch('sahara.plugins.utils.get_instance')
def test_check_health(self, get_instance, alerts_response, close, init):
init.return_value = None
alerts_response.return_value = [
{
'Alert': {
'state': 'OK',
'service_name': 'ZOOKEEPER'
}
}
]
result = health.AmbariServiceHealthCheck(
mock.Mock(extra={}), health.AlertsProvider(mock.Mock()),
'ZOOKEEPER').check_health()
self.assertEqual('No alerts found', result)
self._standard_negative_test(alerts_response, [
{
'Alert': {
'state': 'WARNING',
'service_name': 'ZOOKEEPER'
}
}
], 'YELLOW', "1 warning")
self._standard_negative_test(alerts_response, [
{
'Alert': {
'state': 'CRITICAL',
'service_name': 'ZOOKEEPER'
}
}
], 'RED', "1 critical")
# not important service: only contribute as yellow
self._standard_negative_test(alerts_response, [
{
'Alert': {
'state': 'CRITICAL',
'service_name': 'Kafka'
}
}
], 'YELLOW', "1 warning")
self._standard_negative_test(alerts_response, [
{
'Alert': {
'state': 'CRITICAL',
'service_name': 'ZOOKEEPER'
},
},
{
'Alert': {
'state': 'WARNING',
'service_name': 'ZOOKEEPER'
}
}
], 'RED', "1 critical and 1 warning")
alerts_response.side_effect = [ValueError(
"OOUCH!")]
with testtools.ExpectedException(health_check_base.RedHealthError):
try:
health.AmbariHealthCheck(
mock.Mock(extra={}), health.AlertsProvider(mock.Mock())
).check_health()
except Exception as e:
self.assertEqual(
"Cluster health is RED. Reason: "
"Can't get response from Ambari Monitor: OOUCH!",
six.text_type(e))
raise