Merge "Fixed service restart in MapR plugin"
This commit is contained in:
commit
4634ce1884
@ -16,35 +16,24 @@
|
|||||||
import collections
|
import collections
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
import six
|
|
||||||
|
|
||||||
import sahara.exceptions as e
|
import sahara.exceptions as e
|
||||||
from sahara.i18n import _
|
from sahara.i18n import _
|
||||||
import sahara.plugins.mapr.abstract.cluster_context as cc
|
import sahara.plugins.mapr.abstract.cluster_context as cc
|
||||||
import sahara.plugins.mapr.domain.distro as distro
|
import sahara.plugins.mapr.domain.distro as distro
|
||||||
import sahara.plugins.mapr.domain.node_process as np
|
|
||||||
import sahara.plugins.mapr.services.management.management as mng
|
import sahara.plugins.mapr.services.management.management as mng
|
||||||
import sahara.plugins.mapr.services.maprfs.maprfs as mfs
|
import sahara.plugins.mapr.services.maprfs.maprfs as mfs
|
||||||
import sahara.plugins.mapr.services.oozie.oozie as oozie
|
import sahara.plugins.mapr.services.oozie.oozie as oozie
|
||||||
from sahara.plugins.mapr.services.swift import swift
|
from sahara.plugins.mapr.services.swift import swift
|
||||||
import sahara.plugins.mapr.services.yarn.yarn as yarn
|
import sahara.plugins.mapr.services.yarn.yarn as yarn
|
||||||
import sahara.plugins.mapr.util.general as g
|
import sahara.plugins.mapr.util.general as g
|
||||||
|
import sahara.plugins.mapr.util.service_utils as su
|
||||||
import sahara.plugins.utils as u
|
import sahara.plugins.utils as u
|
||||||
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
|
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
|
||||||
|
|
||||||
|
|
||||||
def _get_node_process_name(node_process):
|
|
||||||
name = None
|
|
||||||
if isinstance(node_process, np.NodeProcess):
|
|
||||||
name = node_process.ui_name
|
|
||||||
elif isinstance(node_process, six.string_types):
|
|
||||||
name = node_process
|
|
||||||
return name
|
|
||||||
|
|
||||||
|
|
||||||
class BaseClusterContext(cc.AbstractClusterContext):
|
class BaseClusterContext(cc.AbstractClusterContext):
|
||||||
ubuntu_base = 'http://package.mapr.com/releases/v%s/ubuntu/ mapr optional'
|
ubuntu_base = 'http://package.mapr.com/releases/v%s/ubuntu/ mapr optional'
|
||||||
centos_base = 'http://package.mapr.com/releases/v%s/redhat/'
|
centos_base = 'http://package.mapr.com/releases/v%s/redhat/'
|
||||||
@ -209,13 +198,14 @@ class BaseClusterContext(cc.AbstractClusterContext):
|
|||||||
return config.default_value
|
return config.default_value
|
||||||
|
|
||||||
def get_instances(self, node_process=None):
|
def get_instances(self, node_process=None):
|
||||||
name = _get_node_process_name(node_process)
|
if node_process is not None:
|
||||||
return u.get_instances(self.cluster, name)
|
node_process = su.get_node_process_name(node_process)
|
||||||
|
return u.get_instances(self.cluster, node_process)
|
||||||
|
|
||||||
def get_instance(self, node_process):
|
def get_instance(self, node_process):
|
||||||
name = _get_node_process_name(node_process)
|
node_process_name = su.get_node_process_name(node_process)
|
||||||
i = u.get_instances(self.cluster, name)
|
instances = u.get_instances(self.cluster, node_process_name)
|
||||||
return i[0] if i else None
|
return instances[0] if instances else None
|
||||||
|
|
||||||
def get_instances_ip(self, node_process):
|
def get_instances_ip(self, node_process):
|
||||||
return [i.internal_ip for i in self.get_instances(node_process)]
|
return [i.internal_ip for i in self.get_instances(node_process)]
|
||||||
@ -229,9 +219,7 @@ class BaseClusterContext(cc.AbstractClusterContext):
|
|||||||
for ip in self.get_instances_ip(mng.ZOOKEEPER)])
|
for ip in self.get_instances_ip(mng.ZOOKEEPER)])
|
||||||
|
|
||||||
def check_for_process(self, instance, process):
|
def check_for_process(self, instance, process):
|
||||||
processes = instance.node_group.node_processes
|
return su.has_node_process(instance, process)
|
||||||
name = _get_node_process_name(process)
|
|
||||||
return name in processes
|
|
||||||
|
|
||||||
def get_services_configs_dict(self, services=None):
|
def get_services_configs_dict(self, services=None):
|
||||||
if not services:
|
if not services:
|
||||||
@ -279,19 +267,22 @@ class BaseClusterContext(cc.AbstractClusterContext):
|
|||||||
return service
|
return service
|
||||||
|
|
||||||
def get_service_name_by_node_process(self, node_process):
|
def get_service_name_by_node_process(self, node_process):
|
||||||
node_process = _get_node_process_name(node_process)
|
node_process_name = su.get_node_process_name(node_process)
|
||||||
for service in self.all_services:
|
for service in self.all_services:
|
||||||
node_processes = [np.ui_name for np in service.node_processes]
|
service_node_processes = [np.ui_name
|
||||||
if node_process in node_processes:
|
for np in service.node_processes]
|
||||||
|
if node_process_name in service_node_processes:
|
||||||
return service.ui_name
|
return service.ui_name
|
||||||
|
|
||||||
def get_instances_count(self, node_process=None):
|
def get_instances_count(self, node_process=None):
|
||||||
name = _get_node_process_name(node_process)
|
if node_process is not None:
|
||||||
return u.get_instances_count(self.cluster, name)
|
node_process = su.get_node_process_name(node_process)
|
||||||
|
return u.get_instances_count(self.cluster, node_process)
|
||||||
|
|
||||||
def get_node_groups(self, node_process=None):
|
def get_node_groups(self, node_process=None):
|
||||||
name = _get_node_process_name(node_process)
|
if node_process is not None:
|
||||||
return u.get_node_groups(self.cluster, name)
|
node_process = su.get_node_process_name(node_process)
|
||||||
|
return u.get_node_groups(self.cluster, node_process)
|
||||||
|
|
||||||
def get_cldb_nodes_ip(self, separator=','):
|
def get_cldb_nodes_ip(self, separator=','):
|
||||||
return separator.join(self.get_instances_ip(mfs.CLDB))
|
return separator.join(self.get_instances_ip(mfs.CLDB))
|
||||||
@ -320,16 +311,9 @@ class BaseClusterContext(cc.AbstractClusterContext):
|
|||||||
|
|
||||||
def filter_instances(self, instances, node_process=None, service=None):
|
def filter_instances(self, instances, node_process=None, service=None):
|
||||||
if node_process:
|
if node_process:
|
||||||
return list(filter(
|
return su.filter_by_node_process(instances, node_process)
|
||||||
lambda i: self.check_for_process(i, node_process), instances))
|
|
||||||
if service:
|
if service:
|
||||||
result = []
|
return su.filter_by_service(instances, service)
|
||||||
for instance in instances:
|
|
||||||
for node_process in service.node_processes:
|
|
||||||
if self.check_for_process(instance, node_process):
|
|
||||||
result += [instance]
|
|
||||||
break
|
|
||||||
return result
|
|
||||||
return list(instances)
|
return list(instances)
|
||||||
|
|
||||||
def removed_instances(self, node_process=None, service=None):
|
def removed_instances(self, node_process=None, service=None):
|
||||||
|
@ -19,10 +19,10 @@ from sahara import context
|
|||||||
import sahara.exceptions as e
|
import sahara.exceptions as e
|
||||||
from sahara.i18n import _
|
from sahara.i18n import _
|
||||||
import sahara.plugins.exceptions as ex
|
import sahara.plugins.exceptions as ex
|
||||||
|
from sahara.plugins.mapr.util import service_utils as su
|
||||||
import sahara.plugins.provisioning as p
|
import sahara.plugins.provisioning as p
|
||||||
from sahara.utils import files as files
|
from sahara.utils import files as files
|
||||||
|
|
||||||
|
|
||||||
_INSTALL_PACKAGES_TIMEOUT = 3600
|
_INSTALL_PACKAGES_TIMEOUT = 3600
|
||||||
|
|
||||||
|
|
||||||
@ -197,7 +197,10 @@ class Service(object):
|
|||||||
|
|
||||||
def restart(self, instances):
|
def restart(self, instances):
|
||||||
for node_process in self.node_processes:
|
for node_process in self.node_processes:
|
||||||
node_process.restart(instances)
|
filtered_instances = su.filter_by_node_process(instances,
|
||||||
|
node_process)
|
||||||
|
if filtered_instances:
|
||||||
|
node_process.restart(filtered_instances)
|
||||||
|
|
||||||
def service_dir(self, cluster_context):
|
def service_dir(self, cluster_context):
|
||||||
args = {'mapr_home': cluster_context.mapr_home, 'name': self.name}
|
args = {'mapr_home': cluster_context.mapr_home, 'name': self.name}
|
||||||
|
@ -36,7 +36,6 @@ import sahara.plugins.mapr.util.general as g
|
|||||||
import sahara.plugins.mapr.util.validation_utils as vu
|
import sahara.plugins.mapr.util.validation_utils as vu
|
||||||
import sahara.utils.files as files
|
import sahara.utils.files as files
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
HUE = np.NodeProcess(
|
HUE = np.NodeProcess(
|
||||||
|
51
sahara/plugins/mapr/util/service_utils.py
Normal file
51
sahara/plugins/mapr/util/service_utils.py
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
# Copyright (c) 2015, MapR Technologies
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
|
from sahara.i18n import _
|
||||||
|
|
||||||
|
|
||||||
|
def get_node_process_name(node_process):
|
||||||
|
# This import is placed here to avoid circular imports
|
||||||
|
from sahara.plugins.mapr.domain import node_process as np # noqa
|
||||||
|
|
||||||
|
if isinstance(node_process, np.NodeProcess):
|
||||||
|
return node_process.ui_name
|
||||||
|
if isinstance(node_process, six.string_types):
|
||||||
|
return node_process
|
||||||
|
|
||||||
|
raise TypeError(_("Invalid argument type %s") % type(node_process))
|
||||||
|
|
||||||
|
|
||||||
|
def has_node_process(instance, node_process):
|
||||||
|
node_process_name = get_node_process_name(node_process)
|
||||||
|
instance_node_processes = instance.node_group.node_processes
|
||||||
|
return node_process_name in instance_node_processes
|
||||||
|
|
||||||
|
|
||||||
|
def has_service(instance, service):
|
||||||
|
return any(has_node_process(instance, node_process)
|
||||||
|
for node_process in service.node_processes)
|
||||||
|
|
||||||
|
|
||||||
|
def filter_by_node_process(instances, node_process):
|
||||||
|
return [instance for instance in instances
|
||||||
|
if has_node_process(instance, node_process)]
|
||||||
|
|
||||||
|
|
||||||
|
def filter_by_service(instances, service):
|
||||||
|
return [instance for instance in instances
|
||||||
|
if has_service(instance, service)]
|
Loading…
Reference in New Issue
Block a user