From 7dc31d33a4484f448ad43de4b3b80ad0e9afeedb Mon Sep 17 00:00:00 2001 From: Telles Nobrega Date: Thu, 27 Nov 2014 16:48:02 -0300 Subject: [PATCH] Storm integration This patch implements the Storm plugin. Implements: bp storm-integration Change-Id: I65e76b7c4d63e2fd0c4a52ec2a198a7510ccbaad --- sahara/plugins/storm/__init__.py | 0 sahara/plugins/storm/config_helper.py | 144 +++++++++++++++ sahara/plugins/storm/plugin.py | 252 ++++++++++++++++++++++++++ sahara/plugins/storm/run_scripts.py | 59 ++++++ 4 files changed, 455 insertions(+) create mode 100644 sahara/plugins/storm/__init__.py create mode 100644 sahara/plugins/storm/config_helper.py create mode 100644 sahara/plugins/storm/plugin.py create mode 100644 sahara/plugins/storm/run_scripts.py diff --git a/sahara/plugins/storm/__init__.py b/sahara/plugins/storm/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/plugins/storm/config_helper.py b/sahara/plugins/storm/config_helper.py new file mode 100644 index 00000000..12099a7c --- /dev/null +++ b/sahara/plugins/storm/config_helper.py @@ -0,0 +1,144 @@ +# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +from sahara import conductor as c +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.openstack.common import log as logging + + +conductor = c.API +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +def get_config_value(service, name, cluster=None): + if cluster: + for ng in cluster.node_groups: + if (ng.configuration().get(service) and + ng.configuration()[service].get(name)): + return ng.configuration()[service][name] + + raise ex.ConfigurationError( + _("Unable to get parameter '%(param_name)s' from " + "service %(service)s") % {'param_name': name, 'service': service}) + + +def get_plugin_configs(): + return {} + + +def generate_storm_config(master_hostname, zk_hostnames): + + cfg = { + "nimbus.host": master_hostname.encode('ascii', 'ignore'), + "worker.childopts": "-Xmx768m -Djava.net.preferIPv4Stack=true", + "nimbus.childopts": "-Xmx1024m -Djava.net.preferIPv4Stack=true", + "supervisor.childopts": "-Djava.net.preferIPv4Stack=true", + "storm.zookeeper.servers": [i.encode('ascii', 'ignore') + for i in zk_hostnames], + "ui.childopts": "-Xmx768m -Djava.net.preferIPv4Stack=true", + "storm.local.dir": "/app/storm" + } + + return cfg + + +def generate_slave_supervisor_conf(): + separator = "\n" + conf = ("[program:storm-supervisor]", + 'command=bash -exec "cd /usr/local/storm && bin/storm supervisor"', + "user=storm", + "autostart=true", + "autorestart=true", + "startsecs=10", + "startretries=999", + "log_stdout=true", + "log_stderr=true", + "logfile=/var/log/storm/supervisor.out", + "logfile_maxbytes=20MB", + "logfile_backups=10") + + return separator.join(conf) + + +def generate_master_supervisor_conf(): + separator = "\n" + seq_n = ("[program:storm-nimbus]", + "command=/usr/local/storm/bin/storm nimbus", + "user=storm", + "autostart=true", + "autorestart=true", + "startsecs=10", + "startretries=999", + "log_stdout=true", + "log_stderr=true", + "logfile=/var/log/storm/supervisor.out", + "logfile_maxbytes=20MB", + "logfile_backups=10") + + seq_u = ("[program:storm-ui]", + "command=/usr/local/storm/bin/storm ui", + "user=storm", + "autostart=true", + "autorestart=true", + "startsecs=10", + "startretries=999", + "log_stdout=true", + "log_stderr=true", + "logfile=/var/log/storm/ui.out", + "logfile_maxbytes=20MB", + "logfile_backups=10") + + conf_n = separator.join(seq_n) + conf_u = separator.join(seq_u) + conf = (conf_n, conf_u) + + return separator.join(conf) + + +def generate_zookeeper_conf(): + separator = "\n" + conf = ("tickTime=2000", + "dataDir=/var/zookeeper", + "clientPort=2181") + + return separator.join(conf) + + +def generate_storm_setup_script(env_configs): + separator = "\n" + script_lines = ["#!/bin/bash -x"] + script_lines.append("echo -n > /usr/local/storm/conf/storm.yaml") + for line in env_configs: + script_lines.append('echo "%s" >> /usr/local/storm/conf/storm.yaml' + % line) + + return separator.join(script_lines) + + +def extract_name_values(configs): + return dict((cfg['name'], cfg['value']) for cfg in configs) + + +def _set_config(cfg, gen_cfg, name=None): + if name in gen_cfg: + cfg.update(gen_cfg[name]['conf']) + if name is None: + for name in gen_cfg: + cfg.update(gen_cfg[name]['conf']) + return cfg diff --git a/sahara/plugins/storm/plugin.py b/sahara/plugins/storm/plugin.py new file mode 100644 index 00000000..c48be5dc --- /dev/null +++ b/sahara/plugins/storm/plugin.py @@ -0,0 +1,252 @@ +# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg +import six +import yaml + +from sahara import conductor +from sahara import context +from sahara.i18n import _ +from sahara.i18n import _LI +from sahara.openstack.common import log as logging +from sahara.plugins import exceptions as ex +from sahara.plugins import provisioning as p +from sahara.plugins.storm import config_helper as c_helper +from sahara.plugins.storm import run_scripts as run +from sahara.plugins import utils +from sahara.utils import remote + +conductor = conductor.API +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class StormProvider(p.ProvisioningPluginBase): + def __init__(self): + self.processes = { + "Zookeeper": ["zookeeper"], + "Storm": ["nimbus", "supervisor"] + } + + def get_title(self): + return "Apache Storm" + + def get_description(self): + return ( + _("This plugin provides an ability to launch Storm " + "cluster without any management consoles.")) + + def get_versions(self): + return ['0.9.2'] + + def get_configs(self, storm_version): + return c_helper.get_plugin_configs() + + def get_node_processes(self, storm_version): + return self.processes + + def validate(self, cluster): + # validate Storm Master Node and Storm Slaves + sm_count = sum([ng.count for ng + in utils.get_node_groups(cluster, "nimbus")]) + + if sm_count != 1: + raise ex.RequiredServiceMissingException("Storm nimbus") + + sl_count = sum([ng.count for ng + in utils.get_node_groups(cluster, "supervisor")]) + + if sl_count < 1: + raise ex.InvalidComponentCountException("Storm supervisor", + _("1 or more"), + sl_count) + + def update_infra(self, cluster): + pass + + def configure_cluster(self, cluster): + self._setup_instances(cluster) + + def start_cluster(self, cluster): + sm_instance = utils.get_instance(cluster, "nimbus") + sl_instances = utils.get_instances(cluster, "supervisor") + zk_instance = utils.get_instances(cluster, "zookeeper") + + if zk_instance: + self._start_zookeeper_processes(zk_instance) + + # start storm master + if sm_instance: + with remote.get_remote(sm_instance) as r: + run.start_storm_nimbus_and_ui(r) + LOG.info(_LI("Storm master at '%s' has been started"), + sm_instance.hostname()) + + # start storm slaves + self._start_slave_processes(sl_instances) + + LOG.info(_LI('Cluster %s has been started successfully'), + cluster.name) + self._set_cluster_info(cluster) + + def _extract_configs_to_extra(self, cluster): + st_master = utils.get_instance(cluster, "nimbus") + zk_servers = utils.get_instances(cluster, "zookeeper") + + extra = dict() + + config_instances = '' + if st_master is not None: + if zk_servers is not None: + zknames = [] + for zk in zk_servers: + zknames.append(zk.hostname()) + + config_instances = c_helper.generate_storm_config( + st_master.hostname(), + zknames) + + config = self._convert_dict_to_yaml(config_instances) + supervisor_conf = c_helper.generate_slave_supervisor_conf() + nimbus_ui_conf = c_helper.generate_master_supervisor_conf() + zk_conf = c_helper.generate_zookeeper_conf() + + for ng in cluster.node_groups: + extra[ng.id] = { + 'st_instances': config, + 'slave_sv_conf': supervisor_conf, + 'master_sv_conf': nimbus_ui_conf, + 'zk_conf': zk_conf + } + + return extra + + def _start_slave_processes(self, sl_instances): + with context.ThreadGroup() as tg: + for i in sl_instances: + tg.spawn('storm-start-sl-%s' % i.instance_name, + self._start_slaves, i) + + def _start_slaves(self, instance): + with instance.remote() as r: + run.start_storm_supervisor(r) + + def _start_zookeeper_processes(self, zk_instances): + with context.ThreadGroup() as tg: + for i in zk_instances: + tg.spawn('storm-start-zk-%s' % i.instance_name, + self._start_zookeeper, i) + + def _start_zookeeper(self, instance): + with instance.remote() as r: + run.start_zookeeper(r) + + def _setup_instances(self, cluster, instances=None): + extra = self._extract_configs_to_extra(cluster) + + if instances is None: + instances = utils.get_instances(cluster) + + self._push_configs_to_nodes(cluster, extra, instances) + + def _push_configs_to_nodes(self, cluster, extra, new_instances): + all_instances = utils.get_instances(cluster) + with context.ThreadGroup() as tg: + for instance in all_instances: + if instance in new_instances: + tg.spawn('storm-configure-%s' % instance.instance_name, + self._push_configs_to_new_node, cluster, + extra, instance) + else: + tg.spawn('storm-reconfigure-%s' % instance.instance_name, + self._push_configs_to_existing_node, cluster, + extra, instance) + + def _convert_dict_to_yaml(self, dict_to_convert): + new_dict = dict_to_convert.copy() + for key in dict_to_convert: + if isinstance(dict_to_convert[key], six.string_types): + new_dict[key] = "\"" + dict_to_convert[key] + "\"" + + stream = yaml.dump(new_dict, default_flow_style=False) + stream = stream.replace("\'", "") + + return stream + + def _push_configs_to_new_node(self, cluster, extra, instance): + ng_extra = extra[instance.node_group.id] + + files_supervisor = { + '/etc/supervisor/supervisord.conf': ng_extra['slave_sv_conf'] + } + files_storm = { + '/usr/local/storm/conf/storm.yaml': ng_extra['st_instances'] + } + files_zk = { + '/opt/zookeeper/zookeeper-3.4.6/conf/zoo.cfg': ng_extra['zk_conf'] + } + files_supervisor_master = { + '/etc/supervisor/supervisord.conf': ng_extra['master_sv_conf'] + } + + with remote.get_remote(instance) as r: + node_processes = instance.node_group.node_processes + r.write_files_to(files_storm, run_as_root=True) + if 'zookeeper' in node_processes: + self._push_zk_configs(r, files_zk) + if 'nimbus' in node_processes: + self._push_supervisor_configs(r, files_supervisor_master) + if 'supervisor' in node_processes: + self._push_supervisor_configs(r, files_supervisor) + + def _push_configs_to_existing_node(self, cluster, extra, instance): + node_processes = instance.node_group.node_processes + need_storm_update = ('nimbus' in node_processes or + 'supervisor' in node_processes) + need_zookeeper_update = 'zookeeper' in node_processes + + ng_extra = extra[instance.node_group.id] + r = remote.get_remote(instance) + + if need_storm_update: + storm_path = '/usr/local/storm/conf/storm.yaml' + files_storm = {storm_path: ng_extra['st_instances']} + r.write_files_to(files_storm) + + if need_zookeeper_update: + zk_path = '/opt/zookeeper/zookeeper-3.4.6/conf/zoo.cfg' + files_zookeeper = {zk_path: ng_extra['zk_conf']} + self._push_zk_configs(r, files_zookeeper) + + def _set_cluster_info(self, cluster): + st_master = utils.get_instance(cluster, "nimbus") + info = {} + + if st_master: + port = "8080" + + if port is not None: + info['Strom'] = { + 'Web UI': 'http://%s:%s' % (st_master.management_ip, port) + } + ctx = context.ctx() + conductor.cluster_update(ctx, cluster, {'info': info}) + + def _push_zk_configs(self, r, files): + r.write_files_to(files, run_as_root=True) + + def _push_supervisor_configs(self, r, files): + r.append_to_files(files, run_as_root=True) diff --git a/sahara/plugins/storm/run_scripts.py b/sahara/plugins/storm/run_scripts.py new file mode 100644 index 00000000..436fa948 --- /dev/null +++ b/sahara/plugins/storm/run_scripts.py @@ -0,0 +1,59 @@ +# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +def start_zookeeper(remote): + remote.execute_command("sudo %s %s" % ( + "/opt/zookeeper/zookeeper-3.4.6/bin/zkServer.sh", + "start")) + + +def start_storm_supervisor(node): + _create_supervisor_log_file(node) + _stop_supervisor_deamon(node) + _start_supervisor_deamon(node) + + +def start_storm_nimbus_and_ui(node): + _create_supervisor_log_file(node) + _stop_supervisor_deamon(node) + _start_supervisor_deamon(node) + + +def stop_storm_nimbus_and_ui(node): + _stop_supervisor_deamon(node) + + +def stop_storm_supervisor(node): + _stop_supervisor_deamon(node) + + +def _start_supervisor_deamon(node): + node.execute_command("sudo service supervisor start") + + +def _stop_supervisor_deamon(node): + node.execute_command("sudo service supervisor stop") + + +def _create_supervisor_log_file(node): + node.execute_command("sudo mkdir -p /var/log/storm") + node.execute_command("sudo chmod -R 777 /var/log/storm") + node.execute_command("sudo chown -R storm:storm /var/log/storm")