Fixed service restart in MapR plugin

MapR plugin made attempts to restart node
processes which are not present in cluster.

Closes-Bug: #1496513
Change-Id: Ib8279c2034ccf73a03456e577d59d625211d388d
This commit is contained in:
Artem Osadchyi 2015-09-16 13:03:49 +03:00
parent ea993f12c2
commit 9ddfb38da0
4 changed files with 76 additions and 39 deletions

View File

@ -16,35 +16,24 @@
import collections import collections
from oslo_config import cfg from oslo_config import cfg
import six
import sahara.exceptions as e import sahara.exceptions as e
from sahara.i18n import _ from sahara.i18n import _
import sahara.plugins.mapr.abstract.cluster_context as cc import sahara.plugins.mapr.abstract.cluster_context as cc
import sahara.plugins.mapr.domain.distro as distro import sahara.plugins.mapr.domain.distro as distro
import sahara.plugins.mapr.domain.node_process as np
import sahara.plugins.mapr.services.management.management as mng import sahara.plugins.mapr.services.management.management as mng
import sahara.plugins.mapr.services.maprfs.maprfs as mfs import sahara.plugins.mapr.services.maprfs.maprfs as mfs
import sahara.plugins.mapr.services.oozie.oozie as oozie import sahara.plugins.mapr.services.oozie.oozie as oozie
from sahara.plugins.mapr.services.swift import swift from sahara.plugins.mapr.services.swift import swift
import sahara.plugins.mapr.services.yarn.yarn as yarn import sahara.plugins.mapr.services.yarn.yarn as yarn
import sahara.plugins.mapr.util.general as g import sahara.plugins.mapr.util.general as g
import sahara.plugins.mapr.util.service_utils as su
import sahara.plugins.utils as u import sahara.plugins.utils as u
CONF = cfg.CONF CONF = cfg.CONF
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper") CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
def _get_node_process_name(node_process):
name = None
if isinstance(node_process, np.NodeProcess):
name = node_process.ui_name
elif isinstance(node_process, six.string_types):
name = node_process
return name
class BaseClusterContext(cc.AbstractClusterContext): class BaseClusterContext(cc.AbstractClusterContext):
ubuntu_base = 'http://package.mapr.com/releases/v%s/ubuntu/ mapr optional' ubuntu_base = 'http://package.mapr.com/releases/v%s/ubuntu/ mapr optional'
centos_base = 'http://package.mapr.com/releases/v%s/redhat/' centos_base = 'http://package.mapr.com/releases/v%s/redhat/'
@ -209,13 +198,14 @@ class BaseClusterContext(cc.AbstractClusterContext):
return config.default_value return config.default_value
def get_instances(self, node_process=None): def get_instances(self, node_process=None):
name = _get_node_process_name(node_process) if node_process is not None:
return u.get_instances(self.cluster, name) node_process = su.get_node_process_name(node_process)
return u.get_instances(self.cluster, node_process)
def get_instance(self, node_process): def get_instance(self, node_process):
name = _get_node_process_name(node_process) node_process_name = su.get_node_process_name(node_process)
i = u.get_instances(self.cluster, name) instances = u.get_instances(self.cluster, node_process_name)
return i[0] if i else None return instances[0] if instances else None
def get_instances_ip(self, node_process): def get_instances_ip(self, node_process):
return [i.internal_ip for i in self.get_instances(node_process)] return [i.internal_ip for i in self.get_instances(node_process)]
@ -229,9 +219,7 @@ class BaseClusterContext(cc.AbstractClusterContext):
for ip in self.get_instances_ip(mng.ZOOKEEPER)]) for ip in self.get_instances_ip(mng.ZOOKEEPER)])
def check_for_process(self, instance, process): def check_for_process(self, instance, process):
processes = instance.node_group.node_processes return su.has_node_process(instance, process)
name = _get_node_process_name(process)
return name in processes
def get_services_configs_dict(self, services=None): def get_services_configs_dict(self, services=None):
if not services: if not services:
@ -279,19 +267,22 @@ class BaseClusterContext(cc.AbstractClusterContext):
return service return service
def get_service_name_by_node_process(self, node_process): def get_service_name_by_node_process(self, node_process):
node_process = _get_node_process_name(node_process) node_process_name = su.get_node_process_name(node_process)
for service in self.all_services: for service in self.all_services:
node_processes = [np.ui_name for np in service.node_processes] service_node_processes = [np.ui_name
if node_process in node_processes: for np in service.node_processes]
if node_process_name in service_node_processes:
return service.ui_name return service.ui_name
def get_instances_count(self, node_process=None): def get_instances_count(self, node_process=None):
name = _get_node_process_name(node_process) if node_process is not None:
return u.get_instances_count(self.cluster, name) node_process = su.get_node_process_name(node_process)
return u.get_instances_count(self.cluster, node_process)
def get_node_groups(self, node_process=None): def get_node_groups(self, node_process=None):
name = _get_node_process_name(node_process) if node_process is not None:
return u.get_node_groups(self.cluster, name) node_process = su.get_node_process_name(node_process)
return u.get_node_groups(self.cluster, node_process)
def get_cldb_nodes_ip(self, separator=','): def get_cldb_nodes_ip(self, separator=','):
return separator.join(self.get_instances_ip(mfs.CLDB)) return separator.join(self.get_instances_ip(mfs.CLDB))
@ -320,16 +311,9 @@ class BaseClusterContext(cc.AbstractClusterContext):
def filter_instances(self, instances, node_process=None, service=None): def filter_instances(self, instances, node_process=None, service=None):
if node_process: if node_process:
return list(filter( return su.filter_by_node_process(instances, node_process)
lambda i: self.check_for_process(i, node_process), instances))
if service: if service:
result = [] return su.filter_by_service(instances, service)
for instance in instances:
for node_process in service.node_processes:
if self.check_for_process(instance, node_process):
result += [instance]
break
return result
return list(instances) return list(instances)
def removed_instances(self, node_process=None, service=None): def removed_instances(self, node_process=None, service=None):

View File

@ -19,10 +19,10 @@ from sahara import context
import sahara.exceptions as e import sahara.exceptions as e
from sahara.i18n import _ from sahara.i18n import _
import sahara.plugins.exceptions as ex import sahara.plugins.exceptions as ex
from sahara.plugins.mapr.util import service_utils as su
import sahara.plugins.provisioning as p import sahara.plugins.provisioning as p
from sahara.utils import files as files from sahara.utils import files as files
_INSTALL_PACKAGES_TIMEOUT = 3600 _INSTALL_PACKAGES_TIMEOUT = 3600
@ -197,7 +197,10 @@ class Service(object):
def restart(self, instances): def restart(self, instances):
for node_process in self.node_processes: for node_process in self.node_processes:
node_process.restart(instances) filtered_instances = su.filter_by_node_process(instances,
node_process)
if filtered_instances:
node_process.restart(filtered_instances)
def service_dir(self, cluster_context): def service_dir(self, cluster_context):
args = {'mapr_home': cluster_context.mapr_home, 'name': self.name} args = {'mapr_home': cluster_context.mapr_home, 'name': self.name}

View File

@ -36,7 +36,6 @@ import sahara.plugins.mapr.util.general as g
import sahara.plugins.mapr.util.validation_utils as vu import sahara.plugins.mapr.util.validation_utils as vu
import sahara.utils.files as files import sahara.utils.files as files
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
HUE = np.NodeProcess( HUE = np.NodeProcess(

View File

@ -0,0 +1,51 @@
# Copyright (c) 2015, MapR Technologies
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from sahara.i18n import _
def get_node_process_name(node_process):
# This import is placed here to avoid circular imports
from sahara.plugins.mapr.domain import node_process as np # noqa
if isinstance(node_process, np.NodeProcess):
return node_process.ui_name
if isinstance(node_process, six.string_types):
return node_process
raise TypeError(_("Invalid argument type %s") % type(node_process))
def has_node_process(instance, node_process):
node_process_name = get_node_process_name(node_process)
instance_node_processes = instance.node_group.node_processes
return node_process_name in instance_node_processes
def has_service(instance, service):
return any(has_node_process(instance, node_process)
for node_process in service.node_processes)
def filter_by_node_process(instances, node_process):
return [instance for instance in instances
if has_node_process(instance, node_process)]
def filter_by_service(instances, service):
return [instance for instance in instances
if has_service(instance, service)]