From cf7ed7f25e89eea667b9d3632ebbc615a32b4a2e Mon Sep 17 00:00:00 2001 From: Grigoriy Roghkov Date: Wed, 10 Aug 2016 17:05:44 +0300 Subject: [PATCH] Add custom health check for MapR plugin Change-Id: I3aaf85f99a81461b1f40e59b522a165fcade6edf Implements: blueprint mapr-custom-health-check --- .../mapr-health-check-2eba3d742a2b853f.yaml | 3 + .../plugins/mapr/abstract/health_checker.py | 24 ++++ .../plugins/mapr/base/base_cluster_context.py | 14 ++ .../plugins/mapr/base/base_health_checker.py | 120 ++++++++++++++++++ .../plugins/mapr/base/base_version_handler.py | 6 + sahara/plugins/mapr/plugin.py | 4 + 6 files changed, 171 insertions(+) create mode 100644 releasenotes/notes/mapr-health-check-2eba3d742a2b853f.yaml create mode 100644 sahara/plugins/mapr/abstract/health_checker.py create mode 100644 sahara/plugins/mapr/base/base_health_checker.py diff --git a/releasenotes/notes/mapr-health-check-2eba3d742a2b853f.yaml b/releasenotes/notes/mapr-health-check-2eba3d742a2b853f.yaml new file mode 100644 index 00000000..09534e69 --- /dev/null +++ b/releasenotes/notes/mapr-health-check-2eba3d742a2b853f.yaml @@ -0,0 +1,3 @@ +--- +features: + - Custom health check is added to MapR plugin \ No newline at end of file diff --git a/sahara/plugins/mapr/abstract/health_checker.py b/sahara/plugins/mapr/abstract/health_checker.py new file mode 100644 index 00000000..7050fed9 --- /dev/null +++ b/sahara/plugins/mapr/abstract/health_checker.py @@ -0,0 +1,24 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import six + + +@six.add_metaclass(abc.ABCMeta) +class AbstractHealthChecker(object): + @abc.abstractmethod + def get_checks(self, cluster_context, instances=None): + pass diff --git a/sahara/plugins/mapr/base/base_cluster_context.py b/sahara/plugins/mapr/base/base_cluster_context.py index 3d8f656c..092a1d16 100644 --- a/sahara/plugins/mapr/base/base_cluster_context.py +++ b/sahara/plugins/mapr/base/base_cluster_context.py @@ -207,6 +207,20 @@ class BaseClusterContext(cc.AbstractClusterContext): else: return config.default_value + def get_node_processes(self): + node_processes = [] + for ng in self.cluster.node_groups: + for np in ng.node_processes: + if np not in node_processes: + node_processes.append(self.get_node_process_by_name(np)) + return node_processes + + def get_node_process_by_name(self, name): + for service in self.cluster_services: + for node_process in service.node_processes: + if node_process.ui_name == name: + return node_process + def get_instances(self, node_process=None): if node_process is not None: node_process = su.get_node_process_name(node_process) diff --git a/sahara/plugins/mapr/base/base_health_checker.py b/sahara/plugins/mapr/base/base_health_checker.py new file mode 100644 index 00000000..ec8a2c98 --- /dev/null +++ b/sahara/plugins/mapr/base/base_health_checker.py @@ -0,0 +1,120 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import functools + +from sahara.i18n import _ +import sahara.plugins.mapr.abstract.health_checker as hc +from sahara.plugins.mapr.domain import node_process as np +from sahara.plugins.mapr.services.management import management +from sahara.plugins.mapr.services.spark import spark +from sahara.service.health import health_check_base + + +class BaseHealthChecker(hc.AbstractHealthChecker): + def _is_avaliable(self, process): + return process.open_ports and process not in spark.SparkOnYarn().\ + node_processes + + def get_checks(self, cluster_context, instances=None): + checks = [ + functools.partial(ZookeeperCheck, cluster_context=cluster_context)] + for node_process in cluster_context.get_node_processes(): + if self._is_avaliable( + node_process) and node_process.ui_name != 'ZooKeeper': + checks.append(functools.partial + (MapRNodeProcessCheck, + cluster_context=cluster_context, + process=node_process)) + return checks + + +class ZookeeperCheck(health_check_base.BasicHealthCheck): + def __init__(self, cluster, cluster_context): + super(ZookeeperCheck, self).__init__(cluster) + self.cluster_context = cluster_context + + def get_health_check_name(self): + return 'MapR ZooKeeper check' + + def is_available(self): + return self.cluster_context.cluster.plugin_name == 'mapr' + + def _is_zookeeper_running(self, instance): + cmd = 'service mapr-zookeeper status' + with instance.remote() as r: + __, out = r.execute_command(cmd, run_as_root=True) + return 'zookeeper running as process' in out + + def check_health(self): + instances = self.cluster_context.get_instances( + node_process=management.ZOOKEEPER) + active_count = 0 + for instance in instances: + if self._is_zookeeper_running(instance): + active_count += 1 + + if active_count == 0: + raise health_check_base.RedHealthError(_( + "ZooKeeper is not in running state")) + + if active_count < len(instances): + raise health_check_base.YellowHealthError(_( + "Some ZooKeeper processes are not in running state")) + return _("ZooKeeper is in running state") + + +class MapRNodeProcessCheck(health_check_base.BasicHealthCheck): + IMPORTANT_PROCESSES = [ + 'CLDB', + 'FileServer', + 'NodeManager', + 'ResourceManager' + ] + + def __init__(self, cluster, cluster_context, process): + super(MapRNodeProcessCheck, self).__init__(cluster) + self.process = process + self.cluster_context = cluster_context + + def get_health_check_name(self): + return 'MapR %s check' % self.process.ui_name + + def is_available(self): + return self.cluster_context.cluster.plugin_name == 'mapr' + + def check_health(self): + instances = self.cluster_context.get_instances( + node_process=self.process) + active_count = 0 + for instance in instances: + status = self.process.status(instance) + if status == np.Status.RUNNING: + active_count += 1 + + if active_count == 0: + if self.process.ui_name in self.IMPORTANT_PROCESSES: + raise health_check_base.RedHealthError(_( + "%s is not in running state") % self.process.ui_name) + else: + raise health_check_base.YellowHealthError(_( + "%s is not in running state") % self.process.ui_name) + + if active_count < len(instances): + if self.process.ui_name in self.IMPORTANT_PROCESSES: + raise health_check_base.YellowHealthError(_( + "Some %s processes are not in running state") + % self.process.ui_name) + return _("%s is in running state") % self.process.ui_name diff --git a/sahara/plugins/mapr/base/base_version_handler.py b/sahara/plugins/mapr/base/base_version_handler.py index 24d88789..8e39bd16 100644 --- a/sahara/plugins/mapr/base/base_version_handler.py +++ b/sahara/plugins/mapr/base/base_version_handler.py @@ -19,6 +19,7 @@ import sahara.plugins.mapr.abstract.version_handler as vh import sahara.plugins.mapr.base.base_cluster_configurer as base_conf import sahara.plugins.mapr.base.base_cluster_validator as bv import sahara.plugins.mapr.base.base_edp_engine as edp +import sahara.plugins.mapr.base.base_health_checker as health import sahara.plugins.mapr.base.base_node_manager as bs import sahara.plugins.mapr.util.general as util import sahara.plugins.utils as u @@ -28,6 +29,7 @@ class BaseVersionHandler(vh.AbstractVersionHandler): def __init__(self): self._validator = bv.BaseValidator() self._configurer = base_conf.BaseConfigurer() + self._health_checker = health.BaseHealthChecker() self._node_manager = bs.BaseNodeManager() self._version = None self._required_services = [] @@ -130,3 +132,7 @@ class BaseVersionHandler(vh.AbstractVersionHandler): if node_process.ui_name in node_group.node_processes: result += node_process.open_ports return util.unique_list(result) + + def get_cluster_checks(self, cluster): + cluster_context = self.get_context(cluster) + return self._health_checker.get_checks(cluster_context) diff --git a/sahara/plugins/mapr/plugin.py b/sahara/plugins/mapr/plugin.py index d1e2ff34..098acba3 100644 --- a/sahara/plugins/mapr/plugin.py +++ b/sahara/plugins/mapr/plugin.py @@ -93,3 +93,7 @@ class MapRPlugin(p.ProvisioningPluginBase): def get_open_ports(self, node_group): v_handler = self._get_handler(node_group.cluster.hadoop_version) return v_handler.get_open_ports(node_group) + + def get_health_checks(self, cluster): + v_handler = self._get_handler(cluster.hadoop_version) + return v_handler.get_cluster_checks(cluster)