Add custom health check for MapR plugin

Change-Id: I3aaf85f99a81461b1f40e59b522a165fcade6edf
Implements: blueprint mapr-custom-health-check
This commit is contained in:
Grigoriy Roghkov 2016-08-10 17:05:44 +03:00 committed by Vitaly Gridnev
parent a2cf9813f7
commit cf7ed7f25e
6 changed files with 171 additions and 0 deletions

View File

@ -0,0 +1,3 @@
---
features:
- Custom health check is added to MapR plugin

View File

@ -0,0 +1,24 @@
# Copyright (c) 2015, MapR Technologies
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class AbstractHealthChecker(object):
@abc.abstractmethod
def get_checks(self, cluster_context, instances=None):
pass

View File

@ -207,6 +207,20 @@ class BaseClusterContext(cc.AbstractClusterContext):
else:
return config.default_value
def get_node_processes(self):
node_processes = []
for ng in self.cluster.node_groups:
for np in ng.node_processes:
if np not in node_processes:
node_processes.append(self.get_node_process_by_name(np))
return node_processes
def get_node_process_by_name(self, name):
for service in self.cluster_services:
for node_process in service.node_processes:
if node_process.ui_name == name:
return node_process
def get_instances(self, node_process=None):
if node_process is not None:
node_process = su.get_node_process_name(node_process)

View File

@ -0,0 +1,120 @@
# Copyright (c) 2015, MapR Technologies
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from sahara.i18n import _
import sahara.plugins.mapr.abstract.health_checker as hc
from sahara.plugins.mapr.domain import node_process as np
from sahara.plugins.mapr.services.management import management
from sahara.plugins.mapr.services.spark import spark
from sahara.service.health import health_check_base
class BaseHealthChecker(hc.AbstractHealthChecker):
def _is_avaliable(self, process):
return process.open_ports and process not in spark.SparkOnYarn().\
node_processes
def get_checks(self, cluster_context, instances=None):
checks = [
functools.partial(ZookeeperCheck, cluster_context=cluster_context)]
for node_process in cluster_context.get_node_processes():
if self._is_avaliable(
node_process) and node_process.ui_name != 'ZooKeeper':
checks.append(functools.partial
(MapRNodeProcessCheck,
cluster_context=cluster_context,
process=node_process))
return checks
class ZookeeperCheck(health_check_base.BasicHealthCheck):
def __init__(self, cluster, cluster_context):
super(ZookeeperCheck, self).__init__(cluster)
self.cluster_context = cluster_context
def get_health_check_name(self):
return 'MapR ZooKeeper check'
def is_available(self):
return self.cluster_context.cluster.plugin_name == 'mapr'
def _is_zookeeper_running(self, instance):
cmd = 'service mapr-zookeeper status'
with instance.remote() as r:
__, out = r.execute_command(cmd, run_as_root=True)
return 'zookeeper running as process' in out
def check_health(self):
instances = self.cluster_context.get_instances(
node_process=management.ZOOKEEPER)
active_count = 0
for instance in instances:
if self._is_zookeeper_running(instance):
active_count += 1
if active_count == 0:
raise health_check_base.RedHealthError(_(
"ZooKeeper is not in running state"))
if active_count < len(instances):
raise health_check_base.YellowHealthError(_(
"Some ZooKeeper processes are not in running state"))
return _("ZooKeeper is in running state")
class MapRNodeProcessCheck(health_check_base.BasicHealthCheck):
IMPORTANT_PROCESSES = [
'CLDB',
'FileServer',
'NodeManager',
'ResourceManager'
]
def __init__(self, cluster, cluster_context, process):
super(MapRNodeProcessCheck, self).__init__(cluster)
self.process = process
self.cluster_context = cluster_context
def get_health_check_name(self):
return 'MapR %s check' % self.process.ui_name
def is_available(self):
return self.cluster_context.cluster.plugin_name == 'mapr'
def check_health(self):
instances = self.cluster_context.get_instances(
node_process=self.process)
active_count = 0
for instance in instances:
status = self.process.status(instance)
if status == np.Status.RUNNING:
active_count += 1
if active_count == 0:
if self.process.ui_name in self.IMPORTANT_PROCESSES:
raise health_check_base.RedHealthError(_(
"%s is not in running state") % self.process.ui_name)
else:
raise health_check_base.YellowHealthError(_(
"%s is not in running state") % self.process.ui_name)
if active_count < len(instances):
if self.process.ui_name in self.IMPORTANT_PROCESSES:
raise health_check_base.YellowHealthError(_(
"Some %s processes are not in running state")
% self.process.ui_name)
return _("%s is in running state") % self.process.ui_name

View File

@ -19,6 +19,7 @@ import sahara.plugins.mapr.abstract.version_handler as vh
import sahara.plugins.mapr.base.base_cluster_configurer as base_conf
import sahara.plugins.mapr.base.base_cluster_validator as bv
import sahara.plugins.mapr.base.base_edp_engine as edp
import sahara.plugins.mapr.base.base_health_checker as health
import sahara.plugins.mapr.base.base_node_manager as bs
import sahara.plugins.mapr.util.general as util
import sahara.plugins.utils as u
@ -28,6 +29,7 @@ class BaseVersionHandler(vh.AbstractVersionHandler):
def __init__(self):
self._validator = bv.BaseValidator()
self._configurer = base_conf.BaseConfigurer()
self._health_checker = health.BaseHealthChecker()
self._node_manager = bs.BaseNodeManager()
self._version = None
self._required_services = []
@ -130,3 +132,7 @@ class BaseVersionHandler(vh.AbstractVersionHandler):
if node_process.ui_name in node_group.node_processes:
result += node_process.open_ports
return util.unique_list(result)
def get_cluster_checks(self, cluster):
cluster_context = self.get_context(cluster)
return self._health_checker.get_checks(cluster_context)

View File

@ -93,3 +93,7 @@ class MapRPlugin(p.ProvisioningPluginBase):
def get_open_ports(self, node_group):
v_handler = self._get_handler(node_group.cluster.hadoop_version)
return v_handler.get_open_ports(node_group)
def get_health_checks(self, cluster):
v_handler = self._get_handler(cluster.hadoop_version)
return v_handler.get_cluster_checks(cluster)