split cloudera health checks
this change will split one cloudera health check into several health checks for each services. Partially-implements blueprint: cluster-verification Change-Id: I39012379d075b05aaec2764642af0befcec3db67
This commit is contained in:
parent
941fac83a1
commit
18cb9d2c77
@ -20,6 +20,7 @@ import six
|
||||
from sahara import conductor
|
||||
from sahara import context
|
||||
from sahara.plugins.cdh import db_helper as dh
|
||||
from sahara.plugins.cdh import health
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
@ -149,5 +150,5 @@ class BaseVersionHandler(AbstractVersionHandler):
|
||||
self.plugin_utils.recommend_configs(
|
||||
cluster, self.get_plugin_configs(), scaling)
|
||||
|
||||
def get_health_checks(self):
|
||||
return []
|
||||
def get_health_checks(self, cluster):
|
||||
return health.get_health_checks(cluster, self.cloudera_utils)
|
||||
|
@ -13,62 +13,135 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import functools
|
||||
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from sahara.i18n import _
|
||||
from sahara.service.health import health_check_base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class CDHHealthCheck(health_check_base.BasicHealthCheck):
|
||||
def get_health_check_name(self):
|
||||
return _("CDH cluster health check")
|
||||
|
||||
def is_available(self):
|
||||
return (self.cluster.plugin_name == 'cdh' and
|
||||
self.cluster.hadoop_version >= '5.4.0')
|
||||
class HealthStatusProvider(object):
|
||||
def __init__(self, cluster, cloudera_tools):
|
||||
self.cluster = cluster
|
||||
self.cloudera_tools = cloudera_tools
|
||||
self._data = None
|
||||
self._cluster_services = None
|
||||
self._exception_store = None
|
||||
self.get_health_status()
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_cloudera_tools(self):
|
||||
# should return correct ClouderaUtils class instance
|
||||
pass
|
||||
def get_cluster_services(self):
|
||||
return self._cluster_services
|
||||
|
||||
def is_cloudera_active(self):
|
||||
if self._exception_store:
|
||||
raise health_check_base.RedHealthError(self._exception_store)
|
||||
return _("Cloudera Manager is Active")
|
||||
|
||||
def get_cloudera_health(self):
|
||||
cu = self.cloudera_tools
|
||||
api = cu.get_api_client(self.cluster)
|
||||
return api.get_service_health_status(self.cluster.name)
|
||||
|
||||
def get_important_services(self):
|
||||
# will be overridable in future
|
||||
cu = self.get_cloudera_tools()
|
||||
cu = self.cloudera_tools
|
||||
return [
|
||||
cu.HDFS_SERVICE_NAME,
|
||||
cu.YARN_SERVICE_NAME,
|
||||
cu.OOZIE_SERVICE_NAME
|
||||
]
|
||||
|
||||
def get_cloudera_health(self):
|
||||
cu = self.get_cloudera_tools()
|
||||
api = cu.get_api_client(self.cluster)
|
||||
return api.get_service_health_status(self.cluster.name)
|
||||
def get_health_status(self, service=None):
|
||||
if self._data is not None:
|
||||
return self._data.get(service, []) if service else self._data
|
||||
self._data = {}
|
||||
self._cluster_services = []
|
||||
try:
|
||||
# all data already grouped by services
|
||||
self._data = self.get_cloudera_health()
|
||||
self._cluster_services = self._data.keys()
|
||||
except Exception as e:
|
||||
msg = _("Can't get response from Cloudera "
|
||||
"Manager")
|
||||
LOG.exception(msg)
|
||||
self._exception_store = _(
|
||||
"%(problem)s, reason: %(reason)s") % {
|
||||
'problem': msg, 'reason': six.text_type(e)}
|
||||
|
||||
|
||||
class ClouderaManagerHealthCheck(health_check_base.BasicHealthCheck):
|
||||
def __init__(self, cluster, provider):
|
||||
self.provider = provider
|
||||
super(ClouderaManagerHealthCheck, self).__init__(cluster)
|
||||
|
||||
def get_health_check_name(self):
|
||||
return _("Cloudera Manager health check")
|
||||
|
||||
def is_available(self):
|
||||
return (self.cluster.plugin_name == 'cdh' and
|
||||
self.cluster.hadoop_version >= '5.4.0')
|
||||
|
||||
def check_health(self):
|
||||
important_services = self.get_important_services()
|
||||
observed_data = self.get_cloudera_health()
|
||||
states = {'green': [], 'yellow': [], 'red': []}
|
||||
return self.provider.is_cloudera_active()
|
||||
|
||||
|
||||
class ServiceHealthCheck(health_check_base.BasicHealthCheck):
|
||||
def __init__(self, cluster, provider, service):
|
||||
self.provider = provider
|
||||
self.service = service
|
||||
super(ServiceHealthCheck, self).__init__(cluster)
|
||||
|
||||
def get_health_check_name(self):
|
||||
return _("CDH %s health check") % self.service
|
||||
|
||||
def is_available(self):
|
||||
return (self.cluster.plugin_name == 'cdh' and
|
||||
self.cluster.hadoop_version >= '5.4.0')
|
||||
|
||||
def check_health(self):
|
||||
important_services = self.provider.get_important_services()
|
||||
observed_data = self.provider.get_health_status(self.service)
|
||||
imp_map = {'BAD': 'red', 'CONCERNING': 'yellow', 'GOOD': 'green'}
|
||||
for el in list(observed_data.keys()):
|
||||
summary_for_service = observed_data[el]['summary']
|
||||
if el in important_services:
|
||||
target = imp_map.get(summary_for_service, 'red')
|
||||
else:
|
||||
target = 'green'
|
||||
if summary_for_service != 'GOOD':
|
||||
target = 'yellow'
|
||||
states[target].append(el)
|
||||
if len(states['red']) > 0:
|
||||
raise health_check_base.RedHealthError(
|
||||
_("The following services are in red state: %s")
|
||||
% states['red'])
|
||||
if len(states['yellow']) > 0:
|
||||
raise health_check_base.YellowHealthError(
|
||||
_("The following services are in yellow state: %s")
|
||||
% states['yellow'])
|
||||
return _("All services are healthy")
|
||||
summary = observed_data['summary']
|
||||
checks = observed_data.get('checks', [])
|
||||
failed_checks = []
|
||||
for check in checks:
|
||||
if check['summary'] != 'GOOD':
|
||||
failed_checks.append('%(name)s - %(summary)s state' % {
|
||||
'name': check['name'], 'summary': check['summary']
|
||||
})
|
||||
additional_info = None
|
||||
if failed_checks:
|
||||
additional_info = _(
|
||||
"The following checks did not pass: %s") % ",".join(
|
||||
failed_checks)
|
||||
if self.service in important_services:
|
||||
overall = imp_map.get(summary, 'red')
|
||||
else:
|
||||
overall = 'green'
|
||||
if summary != 'GOOD':
|
||||
overall = 'yellow'
|
||||
msg = _("Cloudera Manager has responded that service is in "
|
||||
"the %s state") % summary
|
||||
if additional_info:
|
||||
msg = _("%(problem)s. %(description)s") % {
|
||||
'problem': msg, 'description': additional_info}
|
||||
if overall == 'red':
|
||||
raise health_check_base.RedHealthError(msg)
|
||||
elif overall == 'yellow':
|
||||
raise health_check_base.YellowHealthError(msg)
|
||||
return msg
|
||||
|
||||
|
||||
def get_health_checks(cluster, cloudera_utils):
|
||||
provider = HealthStatusProvider(cluster, cloudera_utils)
|
||||
checks = [functools.partial(
|
||||
ClouderaManagerHealthCheck, provider=provider)]
|
||||
for service in provider.get_cluster_services():
|
||||
checks.append(functools.partial(
|
||||
ServiceHealthCheck, provider=provider, service=service))
|
||||
return checks
|
||||
|
@ -94,4 +94,4 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
|
||||
|
||||
def get_health_checks(self, cluster):
|
||||
return self._get_version_handler(
|
||||
cluster.hadoop_version).get_health_checks()
|
||||
cluster.hadoop_version).get_health_checks(cluster)
|
||||
|
@ -1,24 +0,0 @@
|
||||
# Copyright (c) 2016 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.plugins.cdh import health
|
||||
from sahara.plugins.cdh.v5_4_0 import cloudera_utils
|
||||
|
||||
CU = cloudera_utils.ClouderaUtilsV540()
|
||||
|
||||
|
||||
class CDH540HealthCheck(health.CDHHealthCheck):
|
||||
def get_cloudera_tools(self):
|
||||
return CU
|
@ -19,7 +19,6 @@ from sahara.plugins.cdh.v5_4_0 import cloudera_utils
|
||||
from sahara.plugins.cdh.v5_4_0 import config_helper
|
||||
from sahara.plugins.cdh.v5_4_0 import deploy
|
||||
from sahara.plugins.cdh.v5_4_0 import edp_engine
|
||||
from sahara.plugins.cdh.v5_4_0 import health
|
||||
from sahara.plugins.cdh.v5_4_0 import plugin_utils
|
||||
from sahara.plugins.cdh.v5_4_0 import validation
|
||||
|
||||
@ -71,6 +70,3 @@ class VersionHandler(avm.BaseVersionHandler):
|
||||
def get_edp_job_types(self):
|
||||
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
|
||||
edp_engine.EdpSparkEngine.get_supported_job_types())
|
||||
|
||||
def get_health_checks(self):
|
||||
return [health.CDH540HealthCheck]
|
||||
|
118
sahara/tests/unit/plugins/cdh/test_health.py
Normal file
118
sahara/tests/unit/plugins/cdh/test_health.py
Normal file
@ -0,0 +1,118 @@
|
||||
# Copyright (c) 2016 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import six
|
||||
import testtools
|
||||
|
||||
from sahara.plugins.cdh import health
|
||||
from sahara.service.health import health_check_base as base_health
|
||||
from sahara.tests.unit import base as unit_base
|
||||
|
||||
|
||||
class TestHealthCheck(unit_base.SaharaTestCase):
|
||||
def test_check_health_availability(self):
|
||||
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.3.0')
|
||||
self.assertFalse(health.ClouderaManagerHealthCheck(
|
||||
cluster, mock.Mock()).is_available())
|
||||
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.4.0')
|
||||
self.assertTrue(health.ClouderaManagerHealthCheck(
|
||||
cluster, mock.Mock()).is_available())
|
||||
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.5.0')
|
||||
self.assertTrue(health.ClouderaManagerHealthCheck(
|
||||
cluster, mock.Mock()).is_available())
|
||||
|
||||
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.3.0')
|
||||
self.assertFalse(health.ServiceHealthCheck(
|
||||
cluster, mock.Mock(), mock.Mock()).is_available())
|
||||
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.4.0')
|
||||
self.assertTrue(health.ServiceHealthCheck(
|
||||
cluster, mock.Mock(), mock.Mock()).is_available())
|
||||
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.5.0')
|
||||
self.assertTrue(health.ServiceHealthCheck(
|
||||
cluster, mock.Mock(), mock.Mock()).is_available())
|
||||
|
||||
def _base_negative_testcase(self, cdh_response_mock, return_value, msg,
|
||||
col, service, postfix=None):
|
||||
if not postfix:
|
||||
postfix = ""
|
||||
cdh_response_mock.return_value = return_value
|
||||
exc = (base_health.YellowHealthError
|
||||
if col == 'YELLOW' else base_health.RedHealthError)
|
||||
with testtools.ExpectedException(exc):
|
||||
try:
|
||||
pr = health.HealthStatusProvider(mock.Mock(), mock.Mock())
|
||||
health.ServiceHealthCheck(
|
||||
mock.Mock(), provider=pr, service=service).check_health()
|
||||
except Exception as e:
|
||||
msg = "%s%s" % (msg, postfix)
|
||||
all_message = "Cluster health is %(col)s. Reason: %(msg)s" % {
|
||||
'col': col, 'msg': msg}
|
||||
self.assertEqual(all_message, six.text_type(e))
|
||||
raise
|
||||
|
||||
@mock.patch('sahara.plugins.cdh.health.HealthStatusProvider.'
|
||||
'get_cloudera_health')
|
||||
@mock.patch('sahara.plugins.cdh.health.HealthStatusProvider.'
|
||||
'get_important_services')
|
||||
def test_health_calculation(self, important_stuff, cdh_response):
|
||||
important_stuff.return_value = ['yarn01', 'hdfs01', 'oozie01']
|
||||
cdh_response.return_value = {
|
||||
'yarn01': {
|
||||
'summary': 'GOOD',
|
||||
},
|
||||
'hdfs01': {
|
||||
'summary': 'GOOD',
|
||||
}
|
||||
}
|
||||
msg = ("Cloudera Manager has responded that service is in "
|
||||
"the %s state")
|
||||
pr = health.HealthStatusProvider(mock.Mock(), mock.Mock())
|
||||
self.assertEqual(
|
||||
msg % 'GOOD', health.ServiceHealthCheck(
|
||||
mock.Mock(), provider=pr, service='hdfs01').check_health())
|
||||
|
||||
self._base_negative_testcase(cdh_response, {
|
||||
'yarn01': {'summary': 'GOOD'},
|
||||
'hdfs01': {'summary': 'BAD'}
|
||||
}, msg % 'BAD', 'RED', service='hdfs01')
|
||||
|
||||
self._base_negative_testcase(cdh_response, {
|
||||
'yarn01': {'summary': 'CONCERNING'},
|
||||
'hdfs01': {'summary': 'BAD'}
|
||||
}, msg % 'CONCERNING', 'YELLOW', service='yarn01')
|
||||
|
||||
# not important service, only yellow health
|
||||
self._base_negative_testcase(cdh_response, {
|
||||
'yarn01': {'summary': 'CONCERNING'},
|
||||
'hdfs01': {'summary': 'BAD'},
|
||||
'some_service01': {'summary': 'BAD'}
|
||||
}, msg % 'BAD', 'YELLOW', service='some_service01')
|
||||
|
||||
self._base_negative_testcase(cdh_response, {
|
||||
'yarn01': {'summary': 'UNKNOWN_STATE'},
|
||||
'hdfs01': {'summary': 'BAD'},
|
||||
'some_service01': {'summary': 'BAD'}
|
||||
}, msg % 'UNKNOWN_STATE', 'RED', service='yarn01')
|
||||
|
||||
# test additional info
|
||||
postfix = (". The following checks did not "
|
||||
"pass: SUPER_HEALTH_CHECK - BAD state")
|
||||
self._base_negative_testcase(cdh_response, {
|
||||
'yarn01': {'summary': 'UNKNOWN_STATE'},
|
||||
'hdfs01': {'summary': 'BAD', 'checks': [
|
||||
{'name': 'SUPER_HEALTH_CHECK', 'summary': 'BAD'}]},
|
||||
'some_service01': {'summary': 'BAD'}
|
||||
}, msg % 'BAD', 'RED', service='hdfs01', postfix=postfix)
|
@ -1,96 +0,0 @@
|
||||
# Copyright (c) 2016 Mirantis Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
import testtools
|
||||
|
||||
from sahara.plugins.cdh.v5_4_0 import health
|
||||
from sahara.service.health import health_check_base as base_health
|
||||
from sahara.tests.unit import base as unit_base
|
||||
|
||||
|
||||
class TestHealthCheck(unit_base.SaharaTestCase):
|
||||
def test_check_health_availability(self):
|
||||
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.3.0')
|
||||
self.assertFalse(health.CDH540HealthCheck(cluster).is_available())
|
||||
|
||||
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.4.0')
|
||||
self.assertTrue(health.CDH540HealthCheck(cluster).is_available())
|
||||
|
||||
@mock.patch('sahara.plugins.cdh.health.CDHHealthCheck.get_cloudera_health')
|
||||
def test_health_calculation(self, cdh_response):
|
||||
cdh_response.return_value = {
|
||||
'yarn01': {
|
||||
'summary': 'GOOD',
|
||||
},
|
||||
'hdfs01': {
|
||||
'summary': 'GOOD',
|
||||
}
|
||||
}
|
||||
cluster = mock.Mock()
|
||||
self.assertEqual('All services are healthy',
|
||||
health.CDH540HealthCheck(cluster).check_health())
|
||||
|
||||
cdh_response.return_value = {
|
||||
'yarn01': {
|
||||
'summary': 'BAD',
|
||||
},
|
||||
'hdfs01': {
|
||||
'summary': 'GOOD',
|
||||
}
|
||||
}
|
||||
|
||||
with testtools.ExpectedException(base_health.RedHealthError):
|
||||
health.CDH540HealthCheck(cluster).check_health()
|
||||
|
||||
cdh_response.return_value = {
|
||||
'yarn01': {
|
||||
'summary': 'CONCERNING',
|
||||
},
|
||||
'hdfs01': {
|
||||
'summary': 'GOOD',
|
||||
}
|
||||
}
|
||||
|
||||
with testtools.ExpectedException(base_health.YellowHealthError):
|
||||
health.CDH540HealthCheck(cluster).check_health()
|
||||
|
||||
cdh_response.return_value = {
|
||||
'yarn01': {
|
||||
'summary': 'GOOD',
|
||||
},
|
||||
'hdfs01': {
|
||||
'summary': 'GOOD',
|
||||
},
|
||||
'some_service01': {
|
||||
'summary': 'BAD'
|
||||
}
|
||||
}
|
||||
with testtools.ExpectedException(base_health.YellowHealthError):
|
||||
health.CDH540HealthCheck(cluster).check_health()
|
||||
|
||||
cdh_response.return_value = {
|
||||
'yarn01': {
|
||||
'summary': 'UNEXPECTED_STATE',
|
||||
},
|
||||
'hdfs01': {
|
||||
'summary': 'GOOD',
|
||||
},
|
||||
'some_service01': {
|
||||
'summary': 'BAD'
|
||||
}
|
||||
}
|
||||
with testtools.ExpectedException(base_health.RedHealthError):
|
||||
health.CDH540HealthCheck(cluster).check_health()
|
Loading…
Reference in New Issue
Block a user