Merge "split cloudera health checks"

This commit is contained in:
Jenkins 2016-02-29 15:13:33 +00:00 committed by Gerrit Code Review
commit 78a5a099c5
7 changed files with 233 additions and 165 deletions

View File

@ -20,6 +20,7 @@ import six
from sahara import conductor
from sahara import context
from sahara.plugins.cdh import db_helper as dh
from sahara.plugins.cdh import health
@six.add_metaclass(abc.ABCMeta)
@ -149,5 +150,5 @@ class BaseVersionHandler(AbstractVersionHandler):
self.plugin_utils.recommend_configs(
cluster, self.get_plugin_configs(), scaling)
def get_health_checks(self):
return []
def get_health_checks(self, cluster):
return health.get_health_checks(cluster, self.cloudera_utils)

View File

@ -13,62 +13,135 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import functools
from oslo_log import log as logging
import six
from sahara.i18n import _
from sahara.service.health import health_check_base
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class CDHHealthCheck(health_check_base.BasicHealthCheck):
def get_health_check_name(self):
return _("CDH cluster health check")
def is_available(self):
return (self.cluster.plugin_name == 'cdh' and
self.cluster.hadoop_version >= '5.4.0')
class HealthStatusProvider(object):
def __init__(self, cluster, cloudera_tools):
self.cluster = cluster
self.cloudera_tools = cloudera_tools
self._data = None
self._cluster_services = None
self._exception_store = None
self.get_health_status()
@abc.abstractmethod
def get_cloudera_tools(self):
# should return correct ClouderaUtils class instance
pass
def get_cluster_services(self):
return self._cluster_services
def is_cloudera_active(self):
if self._exception_store:
raise health_check_base.RedHealthError(self._exception_store)
return _("Cloudera Manager is Active")
def get_cloudera_health(self):
cu = self.cloudera_tools
api = cu.get_api_client(self.cluster)
return api.get_service_health_status(self.cluster.name)
def get_important_services(self):
# will be overridable in future
cu = self.get_cloudera_tools()
cu = self.cloudera_tools
return [
cu.HDFS_SERVICE_NAME,
cu.YARN_SERVICE_NAME,
cu.OOZIE_SERVICE_NAME
]
def get_cloudera_health(self):
cu = self.get_cloudera_tools()
api = cu.get_api_client(self.cluster)
return api.get_service_health_status(self.cluster.name)
def get_health_status(self, service=None):
if self._data is not None:
return self._data.get(service, []) if service else self._data
self._data = {}
self._cluster_services = []
try:
# all data already grouped by services
self._data = self.get_cloudera_health()
self._cluster_services = self._data.keys()
except Exception as e:
msg = _("Can't get response from Cloudera "
"Manager")
LOG.exception(msg)
self._exception_store = _(
"%(problem)s, reason: %(reason)s") % {
'problem': msg, 'reason': six.text_type(e)}
class ClouderaManagerHealthCheck(health_check_base.BasicHealthCheck):
def __init__(self, cluster, provider):
self.provider = provider
super(ClouderaManagerHealthCheck, self).__init__(cluster)
def get_health_check_name(self):
return _("Cloudera Manager health check")
def is_available(self):
return (self.cluster.plugin_name == 'cdh' and
self.cluster.hadoop_version >= '5.4.0')
def check_health(self):
important_services = self.get_important_services()
observed_data = self.get_cloudera_health()
states = {'green': [], 'yellow': [], 'red': []}
return self.provider.is_cloudera_active()
class ServiceHealthCheck(health_check_base.BasicHealthCheck):
def __init__(self, cluster, provider, service):
self.provider = provider
self.service = service
super(ServiceHealthCheck, self).__init__(cluster)
def get_health_check_name(self):
return _("CDH %s health check") % self.service
def is_available(self):
return (self.cluster.plugin_name == 'cdh' and
self.cluster.hadoop_version >= '5.4.0')
def check_health(self):
important_services = self.provider.get_important_services()
observed_data = self.provider.get_health_status(self.service)
imp_map = {'BAD': 'red', 'CONCERNING': 'yellow', 'GOOD': 'green'}
for el in list(observed_data.keys()):
summary_for_service = observed_data[el]['summary']
if el in important_services:
target = imp_map.get(summary_for_service, 'red')
else:
target = 'green'
if summary_for_service != 'GOOD':
target = 'yellow'
states[target].append(el)
if len(states['red']) > 0:
raise health_check_base.RedHealthError(
_("The following services are in red state: %s")
% states['red'])
if len(states['yellow']) > 0:
raise health_check_base.YellowHealthError(
_("The following services are in yellow state: %s")
% states['yellow'])
return _("All services are healthy")
summary = observed_data['summary']
checks = observed_data.get('checks', [])
failed_checks = []
for check in checks:
if check['summary'] != 'GOOD':
failed_checks.append('%(name)s - %(summary)s state' % {
'name': check['name'], 'summary': check['summary']
})
additional_info = None
if failed_checks:
additional_info = _(
"The following checks did not pass: %s") % ",".join(
failed_checks)
if self.service in important_services:
overall = imp_map.get(summary, 'red')
else:
overall = 'green'
if summary != 'GOOD':
overall = 'yellow'
msg = _("Cloudera Manager has responded that service is in "
"the %s state") % summary
if additional_info:
msg = _("%(problem)s. %(description)s") % {
'problem': msg, 'description': additional_info}
if overall == 'red':
raise health_check_base.RedHealthError(msg)
elif overall == 'yellow':
raise health_check_base.YellowHealthError(msg)
return msg
def get_health_checks(cluster, cloudera_utils):
provider = HealthStatusProvider(cluster, cloudera_utils)
checks = [functools.partial(
ClouderaManagerHealthCheck, provider=provider)]
for service in provider.get_cluster_services():
checks.append(functools.partial(
ServiceHealthCheck, provider=provider, service=service))
return checks

View File

@ -94,4 +94,4 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
def get_health_checks(self, cluster):
return self._get_version_handler(
cluster.hadoop_version).get_health_checks()
cluster.hadoop_version).get_health_checks(cluster)

View File

@ -1,24 +0,0 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.cdh import health
from sahara.plugins.cdh.v5_4_0 import cloudera_utils
CU = cloudera_utils.ClouderaUtilsV540()
class CDH540HealthCheck(health.CDHHealthCheck):
def get_cloudera_tools(self):
return CU

View File

@ -19,7 +19,6 @@ from sahara.plugins.cdh.v5_4_0 import cloudera_utils
from sahara.plugins.cdh.v5_4_0 import config_helper
from sahara.plugins.cdh.v5_4_0 import deploy
from sahara.plugins.cdh.v5_4_0 import edp_engine
from sahara.plugins.cdh.v5_4_0 import health
from sahara.plugins.cdh.v5_4_0 import plugin_utils
from sahara.plugins.cdh.v5_4_0 import validation
@ -71,6 +70,3 @@ class VersionHandler(avm.BaseVersionHandler):
def get_edp_job_types(self):
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
edp_engine.EdpSparkEngine.get_supported_job_types())
def get_health_checks(self):
return [health.CDH540HealthCheck]

View File

@ -0,0 +1,118 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import six
import testtools
from sahara.plugins.cdh import health
from sahara.service.health import health_check_base as base_health
from sahara.tests.unit import base as unit_base
class TestHealthCheck(unit_base.SaharaTestCase):
def test_check_health_availability(self):
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.3.0')
self.assertFalse(health.ClouderaManagerHealthCheck(
cluster, mock.Mock()).is_available())
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.4.0')
self.assertTrue(health.ClouderaManagerHealthCheck(
cluster, mock.Mock()).is_available())
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.5.0')
self.assertTrue(health.ClouderaManagerHealthCheck(
cluster, mock.Mock()).is_available())
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.3.0')
self.assertFalse(health.ServiceHealthCheck(
cluster, mock.Mock(), mock.Mock()).is_available())
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.4.0')
self.assertTrue(health.ServiceHealthCheck(
cluster, mock.Mock(), mock.Mock()).is_available())
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.5.0')
self.assertTrue(health.ServiceHealthCheck(
cluster, mock.Mock(), mock.Mock()).is_available())
def _base_negative_testcase(self, cdh_response_mock, return_value, msg,
col, service, postfix=None):
if not postfix:
postfix = ""
cdh_response_mock.return_value = return_value
exc = (base_health.YellowHealthError
if col == 'YELLOW' else base_health.RedHealthError)
with testtools.ExpectedException(exc):
try:
pr = health.HealthStatusProvider(mock.Mock(), mock.Mock())
health.ServiceHealthCheck(
mock.Mock(), provider=pr, service=service).check_health()
except Exception as e:
msg = "%s%s" % (msg, postfix)
all_message = "Cluster health is %(col)s. Reason: %(msg)s" % {
'col': col, 'msg': msg}
self.assertEqual(all_message, six.text_type(e))
raise
@mock.patch('sahara.plugins.cdh.health.HealthStatusProvider.'
'get_cloudera_health')
@mock.patch('sahara.plugins.cdh.health.HealthStatusProvider.'
'get_important_services')
def test_health_calculation(self, important_stuff, cdh_response):
important_stuff.return_value = ['yarn01', 'hdfs01', 'oozie01']
cdh_response.return_value = {
'yarn01': {
'summary': 'GOOD',
},
'hdfs01': {
'summary': 'GOOD',
}
}
msg = ("Cloudera Manager has responded that service is in "
"the %s state")
pr = health.HealthStatusProvider(mock.Mock(), mock.Mock())
self.assertEqual(
msg % 'GOOD', health.ServiceHealthCheck(
mock.Mock(), provider=pr, service='hdfs01').check_health())
self._base_negative_testcase(cdh_response, {
'yarn01': {'summary': 'GOOD'},
'hdfs01': {'summary': 'BAD'}
}, msg % 'BAD', 'RED', service='hdfs01')
self._base_negative_testcase(cdh_response, {
'yarn01': {'summary': 'CONCERNING'},
'hdfs01': {'summary': 'BAD'}
}, msg % 'CONCERNING', 'YELLOW', service='yarn01')
# not important service, only yellow health
self._base_negative_testcase(cdh_response, {
'yarn01': {'summary': 'CONCERNING'},
'hdfs01': {'summary': 'BAD'},
'some_service01': {'summary': 'BAD'}
}, msg % 'BAD', 'YELLOW', service='some_service01')
self._base_negative_testcase(cdh_response, {
'yarn01': {'summary': 'UNKNOWN_STATE'},
'hdfs01': {'summary': 'BAD'},
'some_service01': {'summary': 'BAD'}
}, msg % 'UNKNOWN_STATE', 'RED', service='yarn01')
# test additional info
postfix = (". The following checks did not "
"pass: SUPER_HEALTH_CHECK - BAD state")
self._base_negative_testcase(cdh_response, {
'yarn01': {'summary': 'UNKNOWN_STATE'},
'hdfs01': {'summary': 'BAD', 'checks': [
{'name': 'SUPER_HEALTH_CHECK', 'summary': 'BAD'}]},
'some_service01': {'summary': 'BAD'}
}, msg % 'BAD', 'RED', service='hdfs01', postfix=postfix)

View File

@ -1,96 +0,0 @@
# Copyright (c) 2016 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import testtools
from sahara.plugins.cdh.v5_4_0 import health
from sahara.service.health import health_check_base as base_health
from sahara.tests.unit import base as unit_base
class TestHealthCheck(unit_base.SaharaTestCase):
def test_check_health_availability(self):
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.3.0')
self.assertFalse(health.CDH540HealthCheck(cluster).is_available())
cluster = mock.Mock(plugin_name='cdh', hadoop_version='5.4.0')
self.assertTrue(health.CDH540HealthCheck(cluster).is_available())
@mock.patch('sahara.plugins.cdh.health.CDHHealthCheck.get_cloudera_health')
def test_health_calculation(self, cdh_response):
cdh_response.return_value = {
'yarn01': {
'summary': 'GOOD',
},
'hdfs01': {
'summary': 'GOOD',
}
}
cluster = mock.Mock()
self.assertEqual('All services are healthy',
health.CDH540HealthCheck(cluster).check_health())
cdh_response.return_value = {
'yarn01': {
'summary': 'BAD',
},
'hdfs01': {
'summary': 'GOOD',
}
}
with testtools.ExpectedException(base_health.RedHealthError):
health.CDH540HealthCheck(cluster).check_health()
cdh_response.return_value = {
'yarn01': {
'summary': 'CONCERNING',
},
'hdfs01': {
'summary': 'GOOD',
}
}
with testtools.ExpectedException(base_health.YellowHealthError):
health.CDH540HealthCheck(cluster).check_health()
cdh_response.return_value = {
'yarn01': {
'summary': 'GOOD',
},
'hdfs01': {
'summary': 'GOOD',
},
'some_service01': {
'summary': 'BAD'
}
}
with testtools.ExpectedException(base_health.YellowHealthError):
health.CDH540HealthCheck(cluster).check_health()
cdh_response.return_value = {
'yarn01': {
'summary': 'UNEXPECTED_STATE',
},
'hdfs01': {
'summary': 'GOOD',
},
'some_service01': {
'summary': 'BAD'
}
}
with testtools.ExpectedException(base_health.RedHealthError):
health.CDH540HealthCheck(cluster).check_health()