Add event logs for MapR plugin

MapR plugin now supports event logs. To complete
this task several additional changes were made:
1. Move node awareness checking from within configure_topology method
2. Add instances parameter to set_cluster_mode
3. Move topology map generation to cluster context
4. Move configs resolving to cluster context

Change-Id: Ic00b27f57eacf5664980358b7fd154632ef3f31e
Implements: blueprint mapr-event-log
This commit is contained in:
Artem Osadchyi 2015-08-17 17:17:09 +03:00
parent d479d2ece4
commit 45009845e8
4 changed files with 214 additions and 96 deletions

View File

@ -21,6 +21,7 @@ import six
from sahara import conductor
from sahara import context
from sahara.i18n import _
from sahara.i18n import _LI
from sahara.i18n import _LW
import sahara.plugins.mapr.abstract.configurer as ac
@ -30,9 +31,8 @@ import sahara.plugins.mapr.services.mapreduce.mapreduce as mr
from sahara.plugins.mapr.services.maprfs import maprfs
from sahara.plugins.mapr.services.mysql import mysql
import sahara.plugins.mapr.services.yarn.yarn as yarn
from sahara.plugins.mapr.util import event_log as el
import sahara.plugins.mapr.util.general as util
from sahara.topology import topology_helper as th
import sahara.utils.configs as sahara_configs
import sahara.utils.files as files
LOG = logging.getLogger(__name__)
@ -69,11 +69,12 @@ class BaseConfigurer(ac.AbstractConfigurer):
if not cluster_context.is_prebuilt:
self._prepare_bare_image(cluster_context, instances)
self._install_services(cluster_context, instances)
if cluster_context.is_node_aware:
self._configure_topology(cluster_context, instances)
self._configure_database(cluster_context, instances)
self._configure_services(cluster_context, instances)
self._configure_sh_cluster(cluster_context, instances)
self._set_cluster_mode(cluster_context)
self._set_cluster_mode(cluster_context, instances)
self._write_config_files(cluster_context, instances)
self._configure_environment(cluster_context, instances)
self._update_cluster_info(cluster_context)
@ -82,6 +83,7 @@ class BaseConfigurer(ac.AbstractConfigurer):
LOG.debug('Configuring existing instances')
instances = instances or cluster_context.get_instances()
existing = cluster_context.existing_instances()
if cluster_context.is_node_aware:
self._configure_topology(cluster_context, existing)
if cluster_context.has_control_nodes(instances):
self._configure_sh_cluster(cluster_context, existing)
@ -97,8 +99,14 @@ class BaseConfigurer(ac.AbstractConfigurer):
service.configure(cluster_context, instances)
def _install_services(self, cluster_context, instances):
step_name_template = _("Install %s service")
for service in self._service_install_sequence(cluster_context):
service.install(cluster_context, instances)
# Add own provision step for each service
step_name = step_name_template % service.ui_name
decorator = el.provision_step(step_name, 0, 1)
install_service = decorator(service.install)
install_service(cluster_context, instances)
def _service_install_sequence(self, cluster_context):
def key(service):
@ -110,82 +118,99 @@ class BaseConfigurer(ac.AbstractConfigurer):
def _prepare_bare_image(self, cluster_context, instances):
LOG.debug('Preparing bare image')
if d.UBUNTU == cluster_context.distro:
LOG.debug("Installing security repos")
util.execute_on_instances(
instances, util.run_script, ADD_SECURITY_REPO_SCRIPT, 'root')
d_name = cluster_context.distro.name
LOG.debug('Installing Java')
util.execute_on_instances(
instances, util.run_script, INSTALL_JAVA_SCRIPT, 'root', d_name)
LOG.debug('Installing Scala')
util.execute_on_instances(
instances, util.run_script, INSTALL_SCALA_SCRIPT, 'root', d_name)
LOG.debug('Installing MySQL client')
util.execute_on_instances(
instances, util.run_script, INSTALL_MYSQL_CLIENT, 'root', d_name)
self._install_security_repos(cluster_context, instances)
self._install_java(cluster_context, instances)
self._install_scala(cluster_context, instances)
self._install_mysql_client(cluster_context, instances)
LOG.debug('Bare images successfully prepared')
def _configure_topology(self, context, instances):
def write_file(instance, path, data):
with instance.remote() as r:
r.write_file_to(path, data, run_as_root=True)
@el.provision_step(_("Install security repos"))
def _install_security_repos(self, cluster_context, instances):
LOG.debug("Installing security repos")
LOG.debug('Configuring cluster topology')
is_node_aware = context.is_node_aware
if is_node_aware:
topo = th.generate_topology_map(context.cluster, is_node_aware)
topo = '\n'.join(['%s %s' % i for i in six.iteritems(topo)]) + '\n'
data_path = '%s/topology.data' % context.mapr_home
@el.provision_event()
def install_security_repos(instance):
return util.run_script(instance, ADD_SECURITY_REPO_SCRIPT, "root")
util.execute_on_instances(instances, install_security_repos)
@el.provision_step(_("Install MySQL client"))
def _install_mysql_client(self, cluster_context, instances):
LOG.debug("Installing MySQL client")
distro_name = cluster_context.distro.name
@el.provision_event()
def install_mysql_client(instance):
return util.run_script(instance, INSTALL_MYSQL_CLIENT,
"root", distro_name)
util.execute_on_instances(instances, install_mysql_client)
@el.provision_step(_("Install Scala"))
def _install_scala(self, cluster_context, instances):
LOG.debug("Installing Scala")
distro_name = cluster_context.distro.name
@el.provision_event()
def install_scala(instance):
return util.run_script(instance, INSTALL_SCALA_SCRIPT,
"root", distro_name)
util.execute_on_instances(instances, install_scala)
@el.provision_step(_("Install Java"))
def _install_java(self, cluster_context, instances):
LOG.debug("Installing Java")
distro_name = cluster_context.distro.name
@el.provision_event()
def install_java(instance):
return util.run_script(instance, INSTALL_JAVA_SCRIPT,
"root", distro_name)
util.execute_on_instances(instances, install_java)
@el.provision_step(_("Configure cluster topology"))
def _configure_topology(self, context, instances):
LOG.debug("Configuring cluster topology")
topology_map = context.topology_map
topology_map = ("%s %s" % item for item in six.iteritems(topology_map))
topology_map = "\n".join(topology_map) + "\n"
data_path = "%s/topology.data" % context.mapr_home
script = files.get_file_text(_TOPO_SCRIPT)
script_path = '%s/topology.sh' % context.mapr_home
util.execute_on_instances(instances, write_file, data_path, topo)
util.execute_on_instances(
instances, util.write_file, script_path, script, '+x', 'root')
else:
LOG.debug('Data locality is disabled.')
@el.provision_event()
def write_topology_data(instance):
util.write_file(instance, data_path, topology_map, owner="root")
util.write_file(instance, script_path, script,
mode="+x", owner="root")
util.execute_on_instances(instances, write_topology_data)
LOG.info(_LI('Cluster topology successfully configured'))
@el.provision_step(_("Write config files to instances"))
def _write_config_files(self, cluster_context, instances):
LOG.debug('Writing config files')
def get_node_groups(instances):
return util.unique_list(instances, lambda i: i.node_group)
@el.provision_event()
def write_config_files(instance, config_files):
for path, data in six.iteritems(config_files):
util.mkdir(instance, os.path.dirname(path), owner="root")
util.write_file(instance, path, data, owner="root")
for ng in get_node_groups(instances):
ng_services = cluster_context.get_cluster_services(ng)
ng_user_configs = ng.configuration()
ng_default_configs = cluster_context.get_services_configs_dict(
ng_services)
ng_configs = sahara_configs.merge_configs(
ng_default_configs, ng_user_configs)
ng_config_files = dict()
for service in ng_services:
service_conf_files = service.get_config_files(
cluster_context=cluster_context,
configs=ng_configs[service.ui_name],
instance=ng.instances[0]
)
LOG.debug('Rendering {ui_name} config files'.format(
ui_name=service.ui_name))
for conf_file in service_conf_files:
ng_config_files.update({
conf_file.remote_path: conf_file.render()
})
node_groups = util.unique_list(instances, lambda i: i.node_group)
for node_group in node_groups:
config_files = cluster_context.get_config_files(node_group)
ng_instances = [i for i in node_group.instances if i in instances]
util.execute_on_instances(ng_instances, write_config_files,
config_files=config_files)
ng_instances = filter(lambda i: i in instances, ng.instances)
self._write_ng_config_files(ng_instances, ng_config_files)
LOG.debug('Config files successfully wrote')
def _write_ng_config_files(self, instances, conf_files):
with context.ThreadGroup() as tg:
for instance in instances:
tg.spawn('write-config-files-%s' % instance.id,
self._write_config_files_instance, instance,
conf_files)
LOG.debug("Config files are successfully written")
def _configure_environment(self, cluster_context, instances):
self.configure_general_environment(cluster_context, instances)
@ -198,15 +223,6 @@ class BaseConfigurer(ac.AbstractConfigurer):
mysql.MySQL.start_mysql_server(cluster_context)
mysql.MySQL.create_databases(cluster_context, instances)
@staticmethod
def _write_config_files_instance(instance, config_files):
paths = six.iterkeys(config_files)
with instance.remote() as r:
for path in paths:
r.execute_command('mkdir -p ' + os.path.dirname(path),
run_as_root=True)
r.write_files_to(config_files, run_as_root=True)
def _post_install_services(self, cluster_context, instances):
LOG.debug('Executing service post install hooks')
for s in cluster_context.cluster_services:
@ -266,6 +282,7 @@ class BaseConfigurer(ac.AbstractConfigurer):
util.execute_on_instances(instances, set_user_password)
util.execute_on_instances(instances, create_home_mapr)
@el.provision_step(_("Execute configure.sh"))
def _configure_sh_cluster(self, cluster_context, instances):
LOG.debug('Executing configure.sh')
@ -286,6 +303,7 @@ class BaseConfigurer(ac.AbstractConfigurer):
instance, script, db_specs)
LOG.debug('Executing configure.sh successfully completed')
@el.provision_event(instance_reference=2)
def _configure_sh_instance(self, context, instance, command, specs):
if not self.mapr_user_exists(instance):
command += ' --create-user'
@ -296,15 +314,17 @@ class BaseConfigurer(ac.AbstractConfigurer):
r.execute_command('sudo -i ' + command,
timeout=_CONFIGURE_SH_TIMEOUT)
@el.provision_step(_("Configure SSH connection"))
def _configure_ssh_connection(self, cluster_context, instances):
def keep_alive_connection(instance):
@el.provision_event()
def configure_ssh(instance):
echo_param = 'echo "KeepAlive yes" >> ~/.ssh/config'
echo_timeout = 'echo "ServerAliveInterval 60" >> ~/.ssh/config'
with instance.remote() as r:
r.execute_command(echo_param)
r.execute_command(echo_timeout)
util.execute_on_instances(instances, keep_alive_connection)
util.execute_on_instances(instances, configure_ssh)
def mapr_user_exists(self, instance):
with instance.remote() as r:
@ -320,18 +340,30 @@ class BaseConfigurer(ac.AbstractConfigurer):
service.post_start(c_context, updated)
LOG.info(_LI('Post start hooks successfully executed'))
def _set_cluster_mode(self, cluster_context):
@el.provision_step(_("Set cluster mode"))
def _set_cluster_mode(self, cluster_context, instances):
cluster_mode = cluster_context.cluster_mode
if not cluster_mode:
return
cmd = 'maprcli cluster mapreduce set -mode %s' % cluster_mode
util.execute_command(cluster_context.get_instances(), cmd, 'mapr')
command = "maprcli cluster mapreduce set -mode %s" % cluster_mode
@el.provision_event()
def set_cluster_mode(instance):
return util.execute_command([instance], command, run_as="mapr")
util.execute_on_instances(instances, set_cluster_mode)
@el.provision_step(_("Install MapR repositories"))
def _install_mapr_repo(self, cluster_context, instances):
d_name = cluster_context.distro.name
util.execute_on_instances(
instances, util.run_script, ADD_MAPR_REPO_SCRIPT, 'root', d_name,
**cluster_context.mapr_repos)
distro_name = cluster_context.distro.name
@el.provision_event()
def install_mapr_repos(instance):
return util.run_script(instance, ADD_MAPR_REPO_SCRIPT, "root",
distro_name, **cluster_context.mapr_repos)
util.execute_on_instances(instances, install_mapr_repos)
def _update_services(self, c_context, instances):
for service in c_context.cluster_services:

View File

@ -29,6 +29,8 @@ import sahara.plugins.mapr.services.yarn.yarn as yarn
import sahara.plugins.mapr.util.general as g
import sahara.plugins.mapr.util.service_utils as su
import sahara.plugins.utils as u
from sahara.topology import topology_helper as th
import sahara.utils.configs as sahara_configs
CONF = cfg.CONF
CONF.import_opt("enable_data_locality", "sahara.topology.topology_helper")
@ -381,3 +383,30 @@ class BaseClusterContext(cc.AbstractClusterContext):
@property
def centos_ecosystem_repo(self):
return self._centos_ecosystem_repo
def get_configuration(self, node_group):
services = self.get_cluster_services(node_group)
user_configs = node_group.configuration()
default_configs = self.get_services_configs_dict(services)
return sahara_configs.merge_configs(default_configs, user_configs)
def get_config_files(self, node_group):
services = self.get_cluster_services(node_group)
configuration = self.get_configuration(node_group)
instance = node_group.instances[0]
config_files = {}
for service in services:
service_conf_files = service.get_config_files(
cluster_context=self,
configs=configuration[service.ui_name],
instance=instance,
)
for conf_file in service_conf_files:
config_files[conf_file.remote_path] = conf_file.render()
return config_files
@property
def topology_map(self):
return th.generate_topology_map(self.cluster, self.is_node_aware)

View File

@ -16,10 +16,10 @@
from oslo_serialization import jsonutils as json
import six
from sahara import context
import sahara.exceptions as e
from sahara.i18n import _
import sahara.plugins.exceptions as ex
from sahara.plugins.mapr.util import event_log as el
from sahara.plugins.mapr.util import general as g
from sahara.plugins.mapr.util import service_utils as su
import sahara.plugins.provisioning as p
@ -78,13 +78,11 @@ class Service(object):
return self._validation_rules
def install(self, cluster_context, instances):
with context.ThreadGroup() as tg:
for instance in instances:
tg.spawn('install-packages-%s' % instance.id,
self._install_packages_on_instance, cluster_context,
instance)
g.execute_on_instances(instances, self._install_packages_on_instance,
cluster_context)
def _install_packages_on_instance(self, cluster_context, instance):
@el.provision_event(instance_reference=1)
def _install_packages_on_instance(self, instance, cluster_context):
processes = [p for p in self.node_processes if
p.ui_name in instance.node_group.node_processes]
if processes is not None and len(processes) > 0:

View File

@ -0,0 +1,59 @@
# Copyright (c) 2015, MapR Technologies
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sahara.utils import cluster_progress_ops as cpo
def provision_step(name, cluster_context_reference=1, instances_reference=2):
def wrapper(function):
def wrapped(*args, **kwargs):
cluster_context = _find_argument(
cluster_context_reference, *args, **kwargs)
instances = _find_argument(instances_reference, *args, **kwargs)
cluster_id = cluster_context.cluster.id
instance_count = len(instances)
cpo.add_provisioning_step(cluster_id, name, instance_count)
return function(*args, **kwargs)
return wrapped
return wrapper
def provision_event(instance_reference=0):
def wrapper(function):
def wrapped(*args, **kwargs):
instance = _find_argument(instance_reference, *args, **kwargs)
try:
result = function(*args, **kwargs)
cpo.add_successful_event(instance)
return result
except Exception as exception:
cpo.add_fail_event(instance, exception)
raise exception
return wrapped
return wrapper
def _find_argument(reference, *args, **kwargs):
if isinstance(reference, int):
return args[reference]
if isinstance(reference, str):
return kwargs[reference]