From 45009845e81adeb8a762c1bdfaae9d7e605a4c97 Mon Sep 17 00:00:00 2001 From: Artem Osadchyi Date: Mon, 17 Aug 2015 17:17:09 +0300 Subject: [PATCH] Add event logs for MapR plugin MapR plugin now supports event logs. To complete this task several additional changes were made: 1. Move node awareness checking from within configure_topology method 2. Add instances parameter to set_cluster_mode 3. Move topology map generation to cluster context 4. Move configs resolving to cluster context Change-Id: Ic00b27f57eacf5664980358b7fd154632ef3f31e Implements: blueprint mapr-event-log --- .../mapr/base/base_cluster_configurer.py | 210 ++++++++++-------- .../plugins/mapr/base/base_cluster_context.py | 29 +++ sahara/plugins/mapr/domain/service.py | 12 +- sahara/plugins/mapr/util/event_log.py | 59 +++++ 4 files changed, 214 insertions(+), 96 deletions(-) create mode 100644 sahara/plugins/mapr/util/event_log.py diff --git a/sahara/plugins/mapr/base/base_cluster_configurer.py b/sahara/plugins/mapr/base/base_cluster_configurer.py index b9012861..ad6a86ef 100644 --- a/sahara/plugins/mapr/base/base_cluster_configurer.py +++ b/sahara/plugins/mapr/base/base_cluster_configurer.py @@ -21,6 +21,7 @@ import six from sahara import conductor from sahara import context +from sahara.i18n import _ from sahara.i18n import _LI from sahara.i18n import _LW import sahara.plugins.mapr.abstract.configurer as ac @@ -30,9 +31,8 @@ import sahara.plugins.mapr.services.mapreduce.mapreduce as mr from sahara.plugins.mapr.services.maprfs import maprfs from sahara.plugins.mapr.services.mysql import mysql import sahara.plugins.mapr.services.yarn.yarn as yarn +from sahara.plugins.mapr.util import event_log as el import sahara.plugins.mapr.util.general as util -from sahara.topology import topology_helper as th -import sahara.utils.configs as sahara_configs import sahara.utils.files as files LOG = logging.getLogger(__name__) @@ -69,11 +69,12 @@ class BaseConfigurer(ac.AbstractConfigurer): if not cluster_context.is_prebuilt: self._prepare_bare_image(cluster_context, instances) self._install_services(cluster_context, instances) - self._configure_topology(cluster_context, instances) + if cluster_context.is_node_aware: + self._configure_topology(cluster_context, instances) self._configure_database(cluster_context, instances) self._configure_services(cluster_context, instances) self._configure_sh_cluster(cluster_context, instances) - self._set_cluster_mode(cluster_context) + self._set_cluster_mode(cluster_context, instances) self._write_config_files(cluster_context, instances) self._configure_environment(cluster_context, instances) self._update_cluster_info(cluster_context) @@ -82,7 +83,8 @@ class BaseConfigurer(ac.AbstractConfigurer): LOG.debug('Configuring existing instances') instances = instances or cluster_context.get_instances() existing = cluster_context.existing_instances() - self._configure_topology(cluster_context, existing) + if cluster_context.is_node_aware: + self._configure_topology(cluster_context, existing) if cluster_context.has_control_nodes(instances): self._configure_sh_cluster(cluster_context, existing) self._post_configure_sh(cluster_context, existing) @@ -97,8 +99,14 @@ class BaseConfigurer(ac.AbstractConfigurer): service.configure(cluster_context, instances) def _install_services(self, cluster_context, instances): + step_name_template = _("Install %s service") for service in self._service_install_sequence(cluster_context): - service.install(cluster_context, instances) + # Add own provision step for each service + step_name = step_name_template % service.ui_name + decorator = el.provision_step(step_name, 0, 1) + install_service = decorator(service.install) + + install_service(cluster_context, instances) def _service_install_sequence(self, cluster_context): def key(service): @@ -110,82 +118,99 @@ class BaseConfigurer(ac.AbstractConfigurer): def _prepare_bare_image(self, cluster_context, instances): LOG.debug('Preparing bare image') - if d.UBUNTU == cluster_context.distro: - LOG.debug("Installing security repos") - util.execute_on_instances( - instances, util.run_script, ADD_SECURITY_REPO_SCRIPT, 'root') - - d_name = cluster_context.distro.name - - LOG.debug('Installing Java') - util.execute_on_instances( - instances, util.run_script, INSTALL_JAVA_SCRIPT, 'root', d_name) - LOG.debug('Installing Scala') - util.execute_on_instances( - instances, util.run_script, INSTALL_SCALA_SCRIPT, 'root', d_name) - LOG.debug('Installing MySQL client') - util.execute_on_instances( - instances, util.run_script, INSTALL_MYSQL_CLIENT, 'root', d_name) + self._install_security_repos(cluster_context, instances) + self._install_java(cluster_context, instances) + self._install_scala(cluster_context, instances) + self._install_mysql_client(cluster_context, instances) LOG.debug('Bare images successfully prepared') - def _configure_topology(self, context, instances): - def write_file(instance, path, data): - with instance.remote() as r: - r.write_file_to(path, data, run_as_root=True) + @el.provision_step(_("Install security repos")) + def _install_security_repos(self, cluster_context, instances): + LOG.debug("Installing security repos") + + @el.provision_event() + def install_security_repos(instance): + return util.run_script(instance, ADD_SECURITY_REPO_SCRIPT, "root") + + util.execute_on_instances(instances, install_security_repos) + + @el.provision_step(_("Install MySQL client")) + def _install_mysql_client(self, cluster_context, instances): + LOG.debug("Installing MySQL client") + distro_name = cluster_context.distro.name + + @el.provision_event() + def install_mysql_client(instance): + return util.run_script(instance, INSTALL_MYSQL_CLIENT, + "root", distro_name) + + util.execute_on_instances(instances, install_mysql_client) + + @el.provision_step(_("Install Scala")) + def _install_scala(self, cluster_context, instances): + LOG.debug("Installing Scala") + distro_name = cluster_context.distro.name + + @el.provision_event() + def install_scala(instance): + return util.run_script(instance, INSTALL_SCALA_SCRIPT, + "root", distro_name) + + util.execute_on_instances(instances, install_scala) + + @el.provision_step(_("Install Java")) + def _install_java(self, cluster_context, instances): + LOG.debug("Installing Java") + distro_name = cluster_context.distro.name + + @el.provision_event() + def install_java(instance): + return util.run_script(instance, INSTALL_JAVA_SCRIPT, + "root", distro_name) + + util.execute_on_instances(instances, install_java) + + @el.provision_step(_("Configure cluster topology")) + def _configure_topology(self, context, instances): + LOG.debug("Configuring cluster topology") + + topology_map = context.topology_map + topology_map = ("%s %s" % item for item in six.iteritems(topology_map)) + topology_map = "\n".join(topology_map) + "\n" + + data_path = "%s/topology.data" % context.mapr_home + script = files.get_file_text(_TOPO_SCRIPT) + script_path = '%s/topology.sh' % context.mapr_home + + @el.provision_event() + def write_topology_data(instance): + util.write_file(instance, data_path, topology_map, owner="root") + util.write_file(instance, script_path, script, + mode="+x", owner="root") + + util.execute_on_instances(instances, write_topology_data) - LOG.debug('Configuring cluster topology') - is_node_aware = context.is_node_aware - if is_node_aware: - topo = th.generate_topology_map(context.cluster, is_node_aware) - topo = '\n'.join(['%s %s' % i for i in six.iteritems(topo)]) + '\n' - data_path = '%s/topology.data' % context.mapr_home - script = files.get_file_text(_TOPO_SCRIPT) - script_path = '%s/topology.sh' % context.mapr_home - util.execute_on_instances(instances, write_file, data_path, topo) - util.execute_on_instances( - instances, util.write_file, script_path, script, '+x', 'root') - else: - LOG.debug('Data locality is disabled.') LOG.info(_LI('Cluster topology successfully configured')) + @el.provision_step(_("Write config files to instances")) def _write_config_files(self, cluster_context, instances): LOG.debug('Writing config files') - def get_node_groups(instances): - return util.unique_list(instances, lambda i: i.node_group) + @el.provision_event() + def write_config_files(instance, config_files): + for path, data in six.iteritems(config_files): + util.mkdir(instance, os.path.dirname(path), owner="root") + util.write_file(instance, path, data, owner="root") - for ng in get_node_groups(instances): - ng_services = cluster_context.get_cluster_services(ng) - ng_user_configs = ng.configuration() - ng_default_configs = cluster_context.get_services_configs_dict( - ng_services) - ng_configs = sahara_configs.merge_configs( - ng_default_configs, ng_user_configs) - ng_config_files = dict() - for service in ng_services: - service_conf_files = service.get_config_files( - cluster_context=cluster_context, - configs=ng_configs[service.ui_name], - instance=ng.instances[0] - ) - LOG.debug('Rendering {ui_name} config files'.format( - ui_name=service.ui_name)) - for conf_file in service_conf_files: - ng_config_files.update({ - conf_file.remote_path: conf_file.render() - }) + node_groups = util.unique_list(instances, lambda i: i.node_group) + for node_group in node_groups: + config_files = cluster_context.get_config_files(node_group) + ng_instances = [i for i in node_group.instances if i in instances] + util.execute_on_instances(ng_instances, write_config_files, + config_files=config_files) - ng_instances = filter(lambda i: i in instances, ng.instances) - self._write_ng_config_files(ng_instances, ng_config_files) - LOG.debug('Config files successfully wrote') - - def _write_ng_config_files(self, instances, conf_files): - with context.ThreadGroup() as tg: - for instance in instances: - tg.spawn('write-config-files-%s' % instance.id, - self._write_config_files_instance, instance, - conf_files) + LOG.debug("Config files are successfully written") def _configure_environment(self, cluster_context, instances): self.configure_general_environment(cluster_context, instances) @@ -198,15 +223,6 @@ class BaseConfigurer(ac.AbstractConfigurer): mysql.MySQL.start_mysql_server(cluster_context) mysql.MySQL.create_databases(cluster_context, instances) - @staticmethod - def _write_config_files_instance(instance, config_files): - paths = six.iterkeys(config_files) - with instance.remote() as r: - for path in paths: - r.execute_command('mkdir -p ' + os.path.dirname(path), - run_as_root=True) - r.write_files_to(config_files, run_as_root=True) - def _post_install_services(self, cluster_context, instances): LOG.debug('Executing service post install hooks') for s in cluster_context.cluster_services: @@ -266,6 +282,7 @@ class BaseConfigurer(ac.AbstractConfigurer): util.execute_on_instances(instances, set_user_password) util.execute_on_instances(instances, create_home_mapr) + @el.provision_step(_("Execute configure.sh")) def _configure_sh_cluster(self, cluster_context, instances): LOG.debug('Executing configure.sh') @@ -286,6 +303,7 @@ class BaseConfigurer(ac.AbstractConfigurer): instance, script, db_specs) LOG.debug('Executing configure.sh successfully completed') + @el.provision_event(instance_reference=2) def _configure_sh_instance(self, context, instance, command, specs): if not self.mapr_user_exists(instance): command += ' --create-user' @@ -296,15 +314,17 @@ class BaseConfigurer(ac.AbstractConfigurer): r.execute_command('sudo -i ' + command, timeout=_CONFIGURE_SH_TIMEOUT) + @el.provision_step(_("Configure SSH connection")) def _configure_ssh_connection(self, cluster_context, instances): - def keep_alive_connection(instance): + @el.provision_event() + def configure_ssh(instance): echo_param = 'echo "KeepAlive yes" >> ~/.ssh/config' echo_timeout = 'echo "ServerAliveInterval 60" >> ~/.ssh/config' with instance.remote() as r: r.execute_command(echo_param) r.execute_command(echo_timeout) - util.execute_on_instances(instances, keep_alive_connection) + util.execute_on_instances(instances, configure_ssh) def mapr_user_exists(self, instance): with instance.remote() as r: @@ -320,18 +340,30 @@ class BaseConfigurer(ac.AbstractConfigurer): service.post_start(c_context, updated) LOG.info(_LI('Post start hooks successfully executed')) - def _set_cluster_mode(self, cluster_context): + @el.provision_step(_("Set cluster mode")) + def _set_cluster_mode(self, cluster_context, instances): cluster_mode = cluster_context.cluster_mode if not cluster_mode: return - cmd = 'maprcli cluster mapreduce set -mode %s' % cluster_mode - util.execute_command(cluster_context.get_instances(), cmd, 'mapr') + command = "maprcli cluster mapreduce set -mode %s" % cluster_mode + + @el.provision_event() + def set_cluster_mode(instance): + return util.execute_command([instance], command, run_as="mapr") + + util.execute_on_instances(instances, set_cluster_mode) + + @el.provision_step(_("Install MapR repositories")) def _install_mapr_repo(self, cluster_context, instances): - d_name = cluster_context.distro.name - util.execute_on_instances( - instances, util.run_script, ADD_MAPR_REPO_SCRIPT, 'root', d_name, - **cluster_context.mapr_repos) + distro_name = cluster_context.distro.name + + @el.provision_event() + def install_mapr_repos(instance): + return util.run_script(instance, ADD_MAPR_REPO_SCRIPT, "root", + distro_name, **cluster_context.mapr_repos) + + util.execute_on_instances(instances, install_mapr_repos) def _update_services(self, c_context, instances): for service in c_context.cluster_services: diff --git a/sahara/plugins/mapr/base/base_cluster_context.py b/sahara/plugins/mapr/base/base_cluster_context.py index f8493789..9b4a6d54 100644 --- a/sahara/plugins/mapr/base/base_cluster_context.py +++ b/sahara/plugins/mapr/base/base_cluster_context.py @@ -29,6 +29,8 @@ import sahara.plugins.mapr.services.yarn.yarn as yarn import sahara.plugins.mapr.util.general as g import sahara.plugins.mapr.util.service_utils as su import sahara.plugins.utils as u +from sahara.topology import topology_helper as th +import sahara.utils.configs as sahara_configs CONF = cfg.CONF CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper") @@ -381,3 +383,30 @@ class BaseClusterContext(cc.AbstractClusterContext): @property def centos_ecosystem_repo(self): return self._centos_ecosystem_repo + + def get_configuration(self, node_group): + services = self.get_cluster_services(node_group) + user_configs = node_group.configuration() + default_configs = self.get_services_configs_dict(services) + return sahara_configs.merge_configs(default_configs, user_configs) + + def get_config_files(self, node_group): + services = self.get_cluster_services(node_group) + configuration = self.get_configuration(node_group) + instance = node_group.instances[0] + + config_files = {} + for service in services: + service_conf_files = service.get_config_files( + cluster_context=self, + configs=configuration[service.ui_name], + instance=instance, + ) + for conf_file in service_conf_files: + config_files[conf_file.remote_path] = conf_file.render() + + return config_files + + @property + def topology_map(self): + return th.generate_topology_map(self.cluster, self.is_node_aware) diff --git a/sahara/plugins/mapr/domain/service.py b/sahara/plugins/mapr/domain/service.py index c644c3bf..2834701b 100644 --- a/sahara/plugins/mapr/domain/service.py +++ b/sahara/plugins/mapr/domain/service.py @@ -16,10 +16,10 @@ from oslo_serialization import jsonutils as json import six -from sahara import context import sahara.exceptions as e from sahara.i18n import _ import sahara.plugins.exceptions as ex +from sahara.plugins.mapr.util import event_log as el from sahara.plugins.mapr.util import general as g from sahara.plugins.mapr.util import service_utils as su import sahara.plugins.provisioning as p @@ -78,13 +78,11 @@ class Service(object): return self._validation_rules def install(self, cluster_context, instances): - with context.ThreadGroup() as tg: - for instance in instances: - tg.spawn('install-packages-%s' % instance.id, - self._install_packages_on_instance, cluster_context, - instance) + g.execute_on_instances(instances, self._install_packages_on_instance, + cluster_context) - def _install_packages_on_instance(self, cluster_context, instance): + @el.provision_event(instance_reference=1) + def _install_packages_on_instance(self, instance, cluster_context): processes = [p for p in self.node_processes if p.ui_name in instance.node_group.node_processes] if processes is not None and len(processes) > 0: diff --git a/sahara/plugins/mapr/util/event_log.py b/sahara/plugins/mapr/util/event_log.py new file mode 100644 index 00000000..de464f20 --- /dev/null +++ b/sahara/plugins/mapr/util/event_log.py @@ -0,0 +1,59 @@ +# Copyright (c) 2015, MapR Technologies +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from sahara.utils import cluster_progress_ops as cpo + + +def provision_step(name, cluster_context_reference=1, instances_reference=2): + def wrapper(function): + def wrapped(*args, **kwargs): + cluster_context = _find_argument( + cluster_context_reference, *args, **kwargs) + instances = _find_argument(instances_reference, *args, **kwargs) + + cluster_id = cluster_context.cluster.id + instance_count = len(instances) + + cpo.add_provisioning_step(cluster_id, name, instance_count) + + return function(*args, **kwargs) + + return wrapped + + return wrapper + + +def provision_event(instance_reference=0): + def wrapper(function): + def wrapped(*args, **kwargs): + instance = _find_argument(instance_reference, *args, **kwargs) + try: + result = function(*args, **kwargs) + cpo.add_successful_event(instance) + return result + except Exception as exception: + cpo.add_fail_event(instance, exception) + raise exception + + return wrapped + + return wrapper + + +def _find_argument(reference, *args, **kwargs): + if isinstance(reference, int): + return args[reference] + if isinstance(reference, str): + return kwargs[reference]