From 9ddfb38da06346c50ec80f696fefe4f07cbfc64a Mon Sep 17 00:00:00 2001 From: Artem Osadchyi Date: Wed, 16 Sep 2015 13:03:49 +0300 Subject: [PATCH] Fixed service restart in MapR plugin MapR plugin made attempts to restart node processes which are not present in cluster. Closes-Bug: #1496513 Change-Id: Ib8279c2034ccf73a03456e577d59d625211d388d --- .../plugins/mapr/base/base_cluster_context.py | 56 +++++++------------ sahara/plugins/mapr/domain/service.py | 7 ++- sahara/plugins/mapr/services/hue/hue.py | 1 - sahara/plugins/mapr/util/service_utils.py | 51 +++++++++++++++++ 4 files changed, 76 insertions(+), 39 deletions(-) create mode 100644 sahara/plugins/mapr/util/service_utils.py diff --git a/sahara/plugins/mapr/base/base_cluster_context.py b/sahara/plugins/mapr/base/base_cluster_context.py index 5861e760..f8493789 100644 --- a/sahara/plugins/mapr/base/base_cluster_context.py +++ b/sahara/plugins/mapr/base/base_cluster_context.py @@ -16,35 +16,24 @@ import collections from oslo_config import cfg -import six import sahara.exceptions as e from sahara.i18n import _ import sahara.plugins.mapr.abstract.cluster_context as cc import sahara.plugins.mapr.domain.distro as distro -import sahara.plugins.mapr.domain.node_process as np import sahara.plugins.mapr.services.management.management as mng import sahara.plugins.mapr.services.maprfs.maprfs as mfs import sahara.plugins.mapr.services.oozie.oozie as oozie from sahara.plugins.mapr.services.swift import swift import sahara.plugins.mapr.services.yarn.yarn as yarn import sahara.plugins.mapr.util.general as g +import sahara.plugins.mapr.util.service_utils as su import sahara.plugins.utils as u - CONF = cfg.CONF CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper") -def _get_node_process_name(node_process): - name = None - if isinstance(node_process, np.NodeProcess): - name = node_process.ui_name - elif isinstance(node_process, six.string_types): - name = node_process - return name - - class BaseClusterContext(cc.AbstractClusterContext): ubuntu_base = 'http://package.mapr.com/releases/v%s/ubuntu/ mapr optional' centos_base = 'http://package.mapr.com/releases/v%s/redhat/' @@ -209,13 +198,14 @@ class BaseClusterContext(cc.AbstractClusterContext): return config.default_value def get_instances(self, node_process=None): - name = _get_node_process_name(node_process) - return u.get_instances(self.cluster, name) + if node_process is not None: + node_process = su.get_node_process_name(node_process) + return u.get_instances(self.cluster, node_process) def get_instance(self, node_process): - name = _get_node_process_name(node_process) - i = u.get_instances(self.cluster, name) - return i[0] if i else None + node_process_name = su.get_node_process_name(node_process) + instances = u.get_instances(self.cluster, node_process_name) + return instances[0] if instances else None def get_instances_ip(self, node_process): return [i.internal_ip for i in self.get_instances(node_process)] @@ -229,9 +219,7 @@ class BaseClusterContext(cc.AbstractClusterContext): for ip in self.get_instances_ip(mng.ZOOKEEPER)]) def check_for_process(self, instance, process): - processes = instance.node_group.node_processes - name = _get_node_process_name(process) - return name in processes + return su.has_node_process(instance, process) def get_services_configs_dict(self, services=None): if not services: @@ -279,19 +267,22 @@ class BaseClusterContext(cc.AbstractClusterContext): return service def get_service_name_by_node_process(self, node_process): - node_process = _get_node_process_name(node_process) + node_process_name = su.get_node_process_name(node_process) for service in self.all_services: - node_processes = [np.ui_name for np in service.node_processes] - if node_process in node_processes: + service_node_processes = [np.ui_name + for np in service.node_processes] + if node_process_name in service_node_processes: return service.ui_name def get_instances_count(self, node_process=None): - name = _get_node_process_name(node_process) - return u.get_instances_count(self.cluster, name) + if node_process is not None: + node_process = su.get_node_process_name(node_process) + return u.get_instances_count(self.cluster, node_process) def get_node_groups(self, node_process=None): - name = _get_node_process_name(node_process) - return u.get_node_groups(self.cluster, name) + if node_process is not None: + node_process = su.get_node_process_name(node_process) + return u.get_node_groups(self.cluster, node_process) def get_cldb_nodes_ip(self, separator=','): return separator.join(self.get_instances_ip(mfs.CLDB)) @@ -320,16 +311,9 @@ class BaseClusterContext(cc.AbstractClusterContext): def filter_instances(self, instances, node_process=None, service=None): if node_process: - return list(filter( - lambda i: self.check_for_process(i, node_process), instances)) + return su.filter_by_node_process(instances, node_process) if service: - result = [] - for instance in instances: - for node_process in service.node_processes: - if self.check_for_process(instance, node_process): - result += [instance] - break - return result + return su.filter_by_service(instances, service) return list(instances) def removed_instances(self, node_process=None, service=None): diff --git a/sahara/plugins/mapr/domain/service.py b/sahara/plugins/mapr/domain/service.py index 4e286d57..6e1abd59 100644 --- a/sahara/plugins/mapr/domain/service.py +++ b/sahara/plugins/mapr/domain/service.py @@ -19,10 +19,10 @@ from sahara import context import sahara.exceptions as e from sahara.i18n import _ import sahara.plugins.exceptions as ex +from sahara.plugins.mapr.util import service_utils as su import sahara.plugins.provisioning as p from sahara.utils import files as files - _INSTALL_PACKAGES_TIMEOUT = 3600 @@ -197,7 +197,10 @@ class Service(object): def restart(self, instances): for node_process in self.node_processes: - node_process.restart(instances) + filtered_instances = su.filter_by_node_process(instances, + node_process) + if filtered_instances: + node_process.restart(filtered_instances) def service_dir(self, cluster_context): args = {'mapr_home': cluster_context.mapr_home, 'name': self.name} diff --git a/sahara/plugins/mapr/services/hue/hue.py b/sahara/plugins/mapr/services/hue/hue.py index 2c1742c4..450ecf4b 100755 --- a/sahara/plugins/mapr/services/hue/hue.py +++ b/sahara/plugins/mapr/services/hue/hue.py @@ -36,7 +36,6 @@ import sahara.plugins.mapr.util.general as g import sahara.plugins.mapr.util.validation_utils as vu import sahara.utils.files as files - LOG = logging.getLogger(__name__) HUE = np.NodeProcess( diff --git a/sahara/plugins/mapr/util/service_utils.py b/sahara/plugins/mapr/util/service_utils.py new file mode 100644 index 00000000..15528b64 --- /dev/null +++ b/sahara/plugins/mapr/util/service_utils.py @@ -0,0 +1,51 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +import six + +from sahara.i18n import _ + + +def get_node_process_name(node_process): + # This import is placed here to avoid circular imports + from sahara.plugins.mapr.domain import node_process as np # noqa + + if isinstance(node_process, np.NodeProcess): + return node_process.ui_name + if isinstance(node_process, six.string_types): + return node_process + + raise TypeError(_("Invalid argument type %s") % type(node_process)) + + +def has_node_process(instance, node_process): + node_process_name = get_node_process_name(node_process) + instance_node_processes = instance.node_group.node_processes + return node_process_name in instance_node_processes + + +def has_service(instance, service): + return any(has_node_process(instance, node_process) + for node_process in service.node_processes) + + +def filter_by_node_process(instances, node_process): + return [instance for instance in instances + if has_node_process(instance, node_process)] + + +def filter_by_service(instances, service): + return [instance for instance in instances + if has_service(instance, service)]