From 591c5eb1ee1bf40f48b0c00b60794a957b0f06c5 Mon Sep 17 00:00:00 2001 From: Daniele Venzano Date: Mon, 26 May 2014 19:02:59 +0200 Subject: [PATCH] Add Spark plugin to Sahara Integrate the Spark plugin in Sahara. It adds the capability of provisioning Spark clusters with (Cloudera) HDFS. The plugin assumes the use of VM images generated with the Savanna diskimage-builder and the '-p spark' option. Implements: blueprint spark-plugin Wiki: https://wiki.openstack.org/wiki/Sahara/SparkPlugin This code is running on our Bigfoot research cluster. Change-Id: Ic105f7de64248bdfb05879ededf35503bc04925b --- MANIFEST.in | 3 + sahara/plugins/spark/__init__.py | 0 sahara/plugins/spark/config_helper.py | 395 ++++++++++ sahara/plugins/spark/plugin.py | 296 ++++++++ sahara/plugins/spark/resources/README.rst | 21 + .../plugins/spark/resources/core-default.xml | 632 ++++++++++++++++ .../plugins/spark/resources/hdfs-default.xml | 709 ++++++++++++++++++ .../spark/resources/spark-env.sh.template | 21 + sahara/plugins/spark/resources/topology.sh | 20 + sahara/plugins/spark/run_scripts.py | 57 ++ setup.cfg | 1 + 11 files changed, 2155 insertions(+) create mode 100644 sahara/plugins/spark/__init__.py create mode 100644 sahara/plugins/spark/config_helper.py create mode 100644 sahara/plugins/spark/plugin.py create mode 100644 sahara/plugins/spark/resources/README.rst create mode 100644 sahara/plugins/spark/resources/core-default.xml create mode 100644 sahara/plugins/spark/resources/hdfs-default.xml create mode 100644 sahara/plugins/spark/resources/spark-env.sh.template create mode 100644 sahara/plugins/spark/resources/topology.sh create mode 100644 sahara/plugins/spark/run_scripts.py diff --git a/MANIFEST.in b/MANIFEST.in index 9af7f095..7ab75bdb 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -22,6 +22,9 @@ include sahara/plugins/hdp/versions/version_1_3_2/resources/*.sh include sahara/plugins/hdp/versions/version_2_0_6/resources/*.template include sahara/plugins/hdp/versions/version_2_0_6/resources/*.json include sahara/plugins/hdp/versions/version_2_0_6/resources/*.sh +include sahara/plugins/spark/resources/*.xml +include sahara/plugins/spark/resources/*.sh +include sahara/plugins/spark/resources/*.template include sahara/resources/*.heat include sahara/service/edp/resources/*.xml include sahara/swift/resources/*.xml diff --git a/sahara/plugins/spark/__init__.py b/sahara/plugins/spark/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/plugins/spark/config_helper.py b/sahara/plugins/spark/config_helper.py new file mode 100644 index 00000000..7d70b14a --- /dev/null +++ b/sahara/plugins/spark/config_helper.py @@ -0,0 +1,395 @@ +# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +from sahara import conductor as c +from sahara.openstack.common import log as logging +from sahara.plugins.general import utils +from sahara.plugins import provisioning as p +from sahara.topology import topology_helper as topology +from sahara.utils import types as types +from sahara.utils import xmlutils as x + + +conductor = c.API +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + +CORE_DEFAULT = x.load_hadoop_xml_defaults( + 'plugins/spark/resources/core-default.xml') + +HDFS_DEFAULT = x.load_hadoop_xml_defaults( + 'plugins/spark/resources/hdfs-default.xml') + +XML_CONFS = { + "HDFS": [CORE_DEFAULT, HDFS_DEFAULT] +} + +SPARK_CONFS = { + 'Spark': { + "OPTIONS": [ + { + 'name': 'Master port', + 'description': 'Start the master on a different port' + ' (default: 7077)', + 'default': '7077', + 'priority': 2, + }, + { + 'name': 'Worker port', + 'description': 'Start the Spark worker on a specific port' + ' (default: random)', + 'default': 'random', + 'priority': 2, + }, + { + 'name': 'Master webui port', + 'description': 'Port for the master web UI (default: 8080)', + 'default': '8080', + 'priority': 1, + }, + { + 'name': 'Worker webui port', + 'description': 'Port for the worker web UI (default: 8081)', + 'default': '8081', + 'priority': 1, + }, + { + 'name': 'Worker cores', + 'description': 'Total number of cores to allow Spark' + ' applications to use on the machine' + ' (default: all available cores)', + 'default': 'all', + 'priority': 2, + }, + { + 'name': 'Worker memory', + 'description': 'Total amount of memory to allow Spark' + ' applications to use on the machine, e.g. 1000m,' + ' 2g (default: total memory minus 1 GB)', + 'default': 'all', + 'priority': 1, + }, + { + 'name': 'Worker instances', + 'description': 'Number of worker instances to run on each' + ' machine (default: 1)', + 'default': '1', + 'priority': 2, + } + ] + } +} + +ENV_CONFS = { + "HDFS": { + 'Name Node Heap Size': 'HADOOP_NAMENODE_OPTS=\\"-Xmx%sm\\"', + 'Data Node Heap Size': 'HADOOP_DATANODE_OPTS=\\"-Xmx%sm\\"' + } +} + +ENABLE_DATA_LOCALITY = p.Config('Enable Data Locality', 'general', 'cluster', + config_type="bool", priority=1, + default_value=True, is_optional=True) + +HIDDEN_CONFS = ['fs.defaultFS', 'dfs.namenode.name.dir', + 'dfs.datanode.data.dir'] + +CLUSTER_WIDE_CONFS = ['dfs.block.size', 'dfs.permissions', 'dfs.replication', + 'dfs.replication.min', 'dfs.replication.max', + 'io.file.buffer.size'] + +PRIORITY_1_CONFS = ['dfs.datanode.du.reserved', + 'dfs.datanode.failed.volumes.tolerated', + 'dfs.datanode.max.xcievers', 'dfs.datanode.handler.count', + 'dfs.namenode.handler.count'] + +# for now we have not so many cluster-wide configs +# lets consider all of them having high priority +PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS + + +def _initialise_configs(): + configs = [] + for service, config_lists in XML_CONFS.iteritems(): + for config_list in config_lists: + for config in config_list: + if config['name'] not in HIDDEN_CONFS: + cfg = p.Config(config['name'], service, "node", + is_optional=True, config_type="string", + default_value=str(config['value']), + description=config['description']) + if cfg.default_value in ["true", "false"]: + cfg.config_type = "bool" + cfg.default_value = (cfg.default_value == 'true') + elif types.is_int(cfg.default_value): + cfg.config_type = "int" + cfg.default_value = int(cfg.default_value) + if config['name'] in CLUSTER_WIDE_CONFS: + cfg.scope = 'cluster' + if config['name'] in PRIORITY_1_CONFS: + cfg.priority = 1 + configs.append(cfg) + + for service, config_items in ENV_CONFS.iteritems(): + for name, param_format_str in config_items.iteritems(): + configs.append(p.Config(name, service, "node", + default_value=1024, priority=1, + config_type="int")) + + for service, config_items in SPARK_CONFS.iteritems(): + for item in config_items['OPTIONS']: + cfg = p.Config(name=item["name"], + description=item["description"], + default_value=item["default"], + applicable_target=service, + scope="cluster", is_optional=True, + priority=item["priority"]) + configs.append(cfg) + + if CONF.enable_data_locality: + configs.append(ENABLE_DATA_LOCALITY) + + return configs + +# Initialise plugin Hadoop configurations +PLUGIN_CONFIGS = _initialise_configs() + + +def get_plugin_configs(): + return PLUGIN_CONFIGS + + +def get_config_value(service, name, cluster=None): + if cluster: + for ng in cluster.node_groups: + if (ng.configuration().get(service) and + ng.configuration()[service].get(name)): + return ng.configuration()[service][name] + + for c in PLUGIN_CONFIGS: + if c.applicable_target == service and c.name == name: + return c.default_value + + raise RuntimeError("Unable to get parameter '%s' from service %s", + name, service) + + +def generate_cfg_from_general(cfg, configs, general_config, + rest_excluded=False): + if 'general' in configs: + for nm in general_config: + if nm not in configs['general'] and not rest_excluded: + configs['general'][nm] = general_config[nm]['default_value'] + for name, value in configs['general'].items(): + if value: + cfg = _set_config(cfg, general_config, name) + LOG.info("Applying config: %s" % name) + else: + cfg = _set_config(cfg, general_config) + return cfg + + +def _get_hostname(service): + return service.hostname() if service else None + + +def generate_xml_configs(configs, storage_path, nn_hostname, hadoop_port): + """dfs.name.dir': extract_hadoop_path(storage_path, + '/lib/hadoop/hdfs/namenode'), + 'dfs.data.dir': extract_hadoop_path(storage_path, + '/lib/hadoop/hdfs/datanode'), + 'dfs.name.dir': storage_path + 'hdfs/name', + 'dfs.data.dir': storage_path + 'hdfs/data', + + 'dfs.hosts': '/etc/hadoop/dn.incl', + 'dfs.hosts.exclude': '/etc/hadoop/dn.excl', + """ + if hadoop_port is None: + hadoop_port = 8020 + + cfg = { + 'fs.defaultFS': 'hdfs://%s:%s' % (nn_hostname, str(hadoop_port)), + 'dfs.namenode.name.dir': extract_hadoop_path(storage_path, + '/dfs/nn'), + 'dfs.datanode.data.dir': extract_hadoop_path(storage_path, + '/dfs/dn'), + 'hadoop.tmp.dir': extract_hadoop_path(storage_path, + '/dfs'), + } + + # inserting user-defined configs + for key, value in extract_hadoop_xml_confs(configs): + cfg[key] = value + + # invoking applied configs to appropriate xml files + core_all = CORE_DEFAULT + + if CONF.enable_data_locality: + cfg.update(topology.TOPOLOGY_CONFIG) + # applying vm awareness configs + core_all += topology.vm_awareness_core_config() + + xml_configs = { + 'core-site': x.create_hadoop_xml(cfg, core_all), + 'hdfs-site': x.create_hadoop_xml(cfg, HDFS_DEFAULT) + } + + return xml_configs + + +def _get_spark_opt_default(opt_name): + for opt in SPARK_CONFS["Spark"]["OPTIONS"]: + if opt_name == opt["name"]: + return opt["default"] + return None + + +def generate_spark_env_configs(cluster): + configs = [] + + # master configuration + sp_master = utils.get_instance(cluster, "master") + configs.append('SPARK_MASTER_IP=' + sp_master.hostname()) + + masterport = get_config_value("Spark", "Master port", cluster) + if masterport and masterport != _get_spark_opt_default("Master port"): + configs.append('SPARK_MASTER_PORT=' + str(masterport)) + + masterwebport = get_config_value("Spark", "Master webui port", cluster) + if masterwebport and \ + masterwebport != _get_spark_opt_default("Master webui port"): + configs.append('SPARK_MASTER_WEBUI_PORT=' + str(masterwebport)) + + # configuration for workers + workercores = get_config_value("Spark", "Worker cores", cluster) + if workercores and workercores != _get_spark_opt_default("Worker cores"): + configs.append('SPARK_WORKER_CORES=' + str(workercores)) + + workermemory = get_config_value("Spark", "Worker memory", cluster) + if workermemory and \ + workermemory != _get_spark_opt_default("Worker memory"): + configs.append('SPARK_WORKER_MEMORY=' + str(workermemory)) + + workerport = get_config_value("Spark", "Worker port", cluster) + if workerport and workerport != _get_spark_opt_default("Worker port"): + configs.append('SPARK_WORKER_PORT=' + str(workerport)) + + workerwebport = get_config_value("Spark", "Worker webui port", cluster) + if workerwebport and \ + workerwebport != _get_spark_opt_default("Worker webui port"): + configs.append('SPARK_WORKER_WEBUI_PORT=' + str(workerwebport)) + + workerinstances = get_config_value("Spark", "Worker instances", cluster) + if workerinstances and \ + workerinstances != _get_spark_opt_default("Worker instances"): + configs.append('SPARK_WORKER_INSTANCES=' + str(workerinstances)) + return '\n'.join(configs) + + +# workernames need to be a list of woker names +def generate_spark_slaves_configs(workernames): + return '\n'.join(workernames) + + +def extract_hadoop_environment_confs(configs): + """Returns list of Hadoop parameters which should be passed via environment + """ + lst = [] + for service, srv_confs in configs.items(): + if ENV_CONFS.get(service): + for param_name, param_value in srv_confs.items(): + for cfg_name, cfg_format_str in ENV_CONFS[service].items(): + if param_name == cfg_name and param_value is not None: + lst.append(cfg_format_str % param_value) + return lst + + +def extract_hadoop_xml_confs(configs): + """Returns list of Hadoop parameters which should be passed into general + configs like core-site.xml + """ + lst = [] + for service, srv_confs in configs.items(): + if XML_CONFS.get(service): + for param_name, param_value in srv_confs.items(): + for cfg_list in XML_CONFS[service]: + names = [cfg['name'] for cfg in cfg_list] + if param_name in names and param_value is not None: + lst.append((param_name, param_value)) + return lst + + +def generate_hadoop_setup_script(storage_paths, env_configs): + script_lines = ["#!/bin/bash -x"] + script_lines.append("echo -n > /tmp/hadoop-env.sh") + for line in env_configs: + if 'HADOOP' in line: + script_lines.append('echo "%s" >> /tmp/hadoop-env.sh' % line) + script_lines.append("cat /etc/hadoop/hadoop-env.sh >> /tmp/hadoop-env.sh") + script_lines.append("cp /tmp/hadoop-env.sh /etc/hadoop/hadoop-env.sh") + + hadoop_log = storage_paths[0] + "/log/hadoop/\$USER/" + script_lines.append('sed -i "s,export HADOOP_LOG_DIR=.*,' + 'export HADOOP_LOG_DIR=%s," /etc/hadoop/hadoop-env.sh' + % hadoop_log) + + hadoop_log = storage_paths[0] + "/log/hadoop/hdfs" + script_lines.append('sed -i "s,export HADOOP_SECURE_DN_LOG_DIR=.*,' + 'export HADOOP_SECURE_DN_LOG_DIR=%s," ' + '/etc/hadoop/hadoop-env.sh' % hadoop_log) + + for path in storage_paths: + script_lines.append("chown -R hadoop:hadoop %s" % path) + script_lines.append("chmod -R 755 %s" % path) + return "\n".join(script_lines) + + +def extract_name_values(configs): + return dict((cfg['name'], cfg['value']) for cfg in configs) + + +def extract_hadoop_path(lst, hadoop_dir): + if lst: + return ",".join([p + hadoop_dir for p in lst]) + + +def _set_config(cfg, gen_cfg, name=None): + if name in gen_cfg: + cfg.update(gen_cfg[name]['conf']) + if name is None: + for name in gen_cfg: + cfg.update(gen_cfg[name]['conf']) + return cfg + + +def _is_general_option_enabled(cluster, option): + for ng in cluster.node_groups: + conf = ng.configuration() + if 'general' in conf and option.name in conf['general']: + return conf['general'][option.name] + return option.default_value + + +def is_data_locality_enabled(cluster): + if not CONF.enable_data_locality: + return False + return _is_general_option_enabled(cluster, ENABLE_DATA_LOCALITY) + + +def get_port_from_config(service, name, cluster=None): + address = get_config_value(service, name, cluster) + return utils.get_port_from_address(address) diff --git a/sahara/plugins/spark/plugin.py b/sahara/plugins/spark/plugin.py new file mode 100644 index 00000000..d78ccfa3 --- /dev/null +++ b/sahara/plugins/spark/plugin.py @@ -0,0 +1,296 @@ +# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg + +from sahara import conductor +from sahara import context +from sahara.openstack.common import log as logging +from sahara.plugins.general import exceptions as ex +from sahara.plugins.general import utils +from sahara.plugins import provisioning as p +from sahara.plugins.spark import config_helper as c_helper +from sahara.plugins.spark import run_scripts as run +from sahara.topology import topology_helper as th +from sahara.utils import files as f +from sahara.utils import remote + + +conductor = conductor.API +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class SparkProvider(p.ProvisioningPluginBase): + def __init__(self): + self.processes = { + "HDFS": ["namenode", "datanode"], + "Spark": ["master", "slave"] + } + + def get_title(self): + return "Apache Spark" + + def get_description(self): + return ( + "This plugin provides an ability to launch Spark on Hadoop " + "CDH cluster without any management consoles.") + + def get_versions(self): + return ['0.9.1'] + + def get_configs(self, hadoop_version): + return c_helper.get_plugin_configs() + + def get_node_processes(self, hadoop_version): + return self.processes + + def validate(self, cluster): + nn_count = sum([ng.count for ng + in utils.get_node_groups(cluster, "namenode")]) + if nn_count != 1: + raise ex.InvalidComponentCountException("namenode", 1, nn_count) + + dn_count = sum([ng.count for ng + in utils.get_node_groups(cluster, "datanode")]) + if dn_count < 1: + raise ex.InvalidComponentCountException("datanode", "1 or more", + nn_count) + + # validate Spark Master Node and Spark Slaves + sm_count = sum([ng.count for ng + in utils.get_node_groups(cluster, "master")]) + + if sm_count != 1: + raise ex.RequiredServiceMissingException("Spark master") + + sl_count = sum([ng.count for ng + in utils.get_node_groups(cluster, "slave")]) + + if sl_count < 1: + raise ex.InvalidComponentCountException("Spark slave", "1 or more", + sl_count) + + def update_infra(self, cluster): + pass + + def configure_cluster(self, cluster): + self._setup_instances(cluster) + + def start_cluster(self, cluster): + nn_instance = utils.get_instance(cluster, "namenode") + sm_instance = utils.get_instance(cluster, "master") + dn_instances = utils.get_instances(cluster, "datanode") + + # Start the name node + with remote.get_remote(nn_instance) as r: + run.format_namenode(r) + run.start_processes(r, "namenode") + + # start the data nodes + self._start_slave_datanode_processes(dn_instances) + + LOG.info("Hadoop services in cluster %s have been started" % + cluster.name) + + with remote.get_remote(nn_instance) as r: + r.execute_command("sudo -u hdfs hdfs dfs -mkdir -p /user/$USER/") + r.execute_command("sudo -u hdfs hdfs dfs -chown $USER \ + /user/$USER/") + + # start spark nodes + if sm_instance: + with remote.get_remote(sm_instance) as r: + run.start_spark_master(r) + LOG.info("Spark service at '%s' has been started", + sm_instance.hostname()) + + LOG.info('Cluster %s has been started successfully' % cluster.name) + self._set_cluster_info(cluster) + + def _extract_configs_to_extra(self, cluster): + nn = utils.get_instance(cluster, "namenode") + sp_master = utils.get_instance(cluster, "master") + sp_slaves = utils.get_instances(cluster, "slave") + + extra = dict() + + config_master = config_slaves = '' + if sp_master is not None: + config_master = c_helper.generate_spark_env_configs(cluster) + + if sp_slaves is not None: + slavenames = [] + for slave in sp_slaves: + slavenames.append(slave.hostname()) + config_slaves = c_helper.generate_spark_slaves_configs(slavenames) + else: + config_slaves = "\n" + + for ng in cluster.node_groups: + extra[ng.id] = { + 'xml': c_helper.generate_xml_configs( + ng.configuration(), + ng.storage_paths(), + nn.hostname(), None, + ), + 'setup_script': c_helper.generate_hadoop_setup_script( + ng.storage_paths(), + c_helper.extract_hadoop_environment_confs( + ng.configuration()) + ), + 'sp_master': config_master, + 'sp_slaves': config_slaves + } + + if c_helper.is_data_locality_enabled(cluster): + topology_data = th.generate_topology_map( + cluster, CONF.enable_hypervisor_awareness) + extra['topology_data'] = "\n".join( + [k + " " + v for k, v in topology_data.items()]) + "\n" + + return extra + + def validate_scaling(self, cluster, existing, additional): + raise ex.ClusterCannotBeScaled("Scaling Spark clusters has not been" + "implemented yet") + + def decommission_nodes(self, cluster, instances): + pass + + def scale_cluster(self, cluster, instances): + pass + + def _start_slave_datanode_processes(self, dn_instances): + with context.ThreadGroup() as tg: + for i in dn_instances: + tg.spawn('spark-start-dn-%s' % i.instance_name, + self._start_datanode, i) + + def _start_datanode(self, instance): + with instance.remote() as r: + run.start_processes(r, "datanode") + + def _setup_instances(self, cluster): + extra = self._extract_configs_to_extra(cluster) + + instances = utils.get_instances(cluster) + self._push_configs_to_nodes(cluster, extra, instances) + + def _push_configs_to_nodes(self, cluster, extra, new_instances): + all_instances = utils.get_instances(cluster) + with context.ThreadGroup() as tg: + for instance in all_instances: + if instance in new_instances: + tg.spawn('spark-configure-%s' % instance.instance_name, + self._push_configs_to_new_node, cluster, + extra, instance) + + def _push_configs_to_new_node(self, cluster, extra, instance): + ng_extra = extra[instance.node_group.id] + + files_hadoop = { + '/etc/hadoop/conf/core-site.xml': ng_extra['xml']['core-site'], + '/etc/hadoop/conf/hdfs-site.xml': ng_extra['xml']['hdfs-site'], + } + + files_spark = { + '/opt/spark/conf/spark-env.sh': ng_extra['sp_master'], + '/opt/spark/conf/slaves': ng_extra['sp_slaves'] + } + + files_init = { + '/tmp/sahara-hadoop-init.sh': ng_extra['setup_script'], + 'id_rsa': cluster.management_private_key, + 'authorized_keys': cluster.management_public_key + } + + # pietro: This is required because the (secret) key is not stored in + # .ssh which hinders password-less ssh required by spark scripts + key_cmd = 'sudo cp $HOME/id_rsa $HOME/.ssh/; '\ + 'sudo chown $USER $HOME/.ssh/id_rsa; '\ + 'sudo chmod 600 $HOME/.ssh/id_rsa' + + for ng in cluster.node_groups: + dn_path = c_helper.extract_hadoop_path(ng.storage_paths(), + '/dfs/dn') + nn_path = c_helper.extract_hadoop_path(ng.storage_paths(), + '/dfs/nn') + hdfs_dir_cmd = 'sudo mkdir -p %s %s;'\ + 'sudo chown -R hdfs:hadoop %s %s;'\ + 'sudo chmod 755 %s %s;'\ + % (nn_path, dn_path, + nn_path, dn_path, + nn_path, dn_path) + + with remote.get_remote(instance) as r: + r.execute_command( + 'sudo chown -R $USER:$USER /etc/hadoop' + ) + r.execute_command( + 'sudo chown -R $USER:$USER /opt/spark' + ) + r.write_files_to(files_hadoop) + r.write_files_to(files_spark) + r.write_files_to(files_init) + r.execute_command( + 'sudo chmod 0500 /tmp/sahara-hadoop-init.sh' + ) + r.execute_command( + 'sudo /tmp/sahara-hadoop-init.sh ' + '>> /tmp/sahara-hadoop-init.log 2>&1') + + r.execute_command(hdfs_dir_cmd) + r.execute_command(key_cmd) + + if c_helper.is_data_locality_enabled(cluster): + r.write_file_to( + '/etc/hadoop/topology.sh', + f.get_file_text( + 'plugins/spark/resources/topology.sh')) + r.execute_command( + 'sudo chmod +x /etc/hadoop/topology.sh' + ) + + self._write_topology_data(r, cluster, extra) + + def _write_topology_data(self, r, cluster, extra): + if c_helper.is_data_locality_enabled(cluster): + topology_data = extra['topology_data'] + r.write_file_to('/etc/hadoop/topology.data', topology_data) + + def _set_cluster_info(self, cluster): + nn = utils.get_instance(cluster, "namenode") + sp_master = utils.get_instance(cluster, "master") + info = {} + + if nn: + address = c_helper.get_config_value( + 'HDFS', 'dfs.http.address', cluster) + port = address[address.rfind(':') + 1:] + info['HDFS'] = { + 'Web UI': 'http://%s:%s' % (nn.management_ip, port) + } + info['HDFS']['NameNode'] = 'hdfs://%s:8020' % nn.hostname() + + if sp_master: + port = c_helper.get_config_value( + 'Spark', 'Master webui port', cluster) + if port is not None: + info['Spark'] = { + 'Web UI': 'http://%s:%s' % (sp_master.management_ip, port) + } + ctx = context.ctx() + conductor.cluster_update(ctx, cluster, {'info': info}) diff --git a/sahara/plugins/spark/resources/README.rst b/sahara/plugins/spark/resources/README.rst new file mode 100644 index 00000000..741fcf7a --- /dev/null +++ b/sahara/plugins/spark/resources/README.rst @@ -0,0 +1,21 @@ +Apache Spark and HDFS Configurations for Sahara +=============================================== + +This directory contains default XML configuration files and Spark scripts: + +* core-default.xml, +* hdfs-default.xml, +* spark-env.sh.template, +* topology.sh + +These files are used by Sahara's plugin for Apache Spark and Cloudera HDFS. +XML config files were taken from here: + * https://github.com/apache/hadoop-common/blob/release-1.2.1/src/core/core-default.xml + * https://github.com/apache/hadoop-common/blob/release-1.2.1/src/hdfs/hdfs-default.xml + +Cloudera packages use the same configuration files as standard Apache Hadoop. + +XML configs are used to expose default Hadoop configurations to the users through +Sahara's REST API. It allows users to override some config values which will be +pushed to the provisioned VMs running Hadoop services as part of appropriate xml +config. diff --git a/sahara/plugins/spark/resources/core-default.xml b/sahara/plugins/spark/resources/core-default.xml new file mode 100644 index 00000000..32e20c45 --- /dev/null +++ b/sahara/plugins/spark/resources/core-default.xml @@ -0,0 +1,632 @@ + + + + + + + + + + + + + hadoop.tmp.dir + /tmp/hadoop-${user.name} + A base for other temporary directories. + + + + hadoop.native.lib + true + Should native hadoop libraries, if present, be used. + + + + hadoop.http.filter.initializers + + A comma separated list of class names. Each class in the list + must extend org.apache.hadoop.http.FilterInitializer. The corresponding + Filter will be initialized. Then, the Filter will be applied to all user + facing jsp and servlet web pages. The ordering of the list defines the + ordering of the filters. + + + + hadoop.security.group.mapping + org.apache.hadoop.security.ShellBasedUnixGroupsMapping + Class for user to group mapping (get groups for a given user) + + + + + hadoop.security.authorization + false + Is service-level authorization enabled? + + + + hadoop.security.instrumentation.requires.admin + false + + Indicates if administrator ACLs are required to access + instrumentation servlets (JMX, METRICS, CONF, STACKS). + + + + + hadoop.security.authentication + simple + Possible values are simple (no authentication), and kerberos + + + + + hadoop.security.token.service.use_ip + true + Controls whether tokens always use IP addresses. DNS changes + will not be detected if this option is enabled. Existing client connections + that break will always reconnect to the IP of the original host. New clients + will connect to the host's new IP but fail to locate a token. Disabling + this option will allow existing and new clients to detect an IP change and + continue to locate the new host's token. + + + + + hadoop.security.use-weak-http-crypto + false + If enabled, use KSSL to authenticate HTTP connections to the + NameNode. Due to a bug in JDK6, using KSSL requires one to configure + Kerberos tickets to use encryption types that are known to be + cryptographically weak. If disabled, SPNEGO will be used for HTTP + authentication, which supports stronger encryption types. + + + + + + + + + hadoop.logfile.size + 10000000 + The max size of each log file + + + + hadoop.logfile.count + 10 + The max number of log files + + + + + io.file.buffer.size + 4096 + The size of buffer for use in sequence files. + The size of this buffer should probably be a multiple of hardware + page size (4096 on Intel x86), and it determines how much data is + buffered during read and write operations. + + + + io.bytes.per.checksum + 512 + The number of bytes per checksum. Must not be larger than + io.file.buffer.size. + + + + io.skip.checksum.errors + false + If true, when a checksum error is encountered while + reading a sequence file, entries are skipped, instead of throwing an + exception. + + + + io.compression.codecs + org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec + A list of the compression codec classes that can be used + for compression/decompression. + + + + io.serializations + org.apache.hadoop.io.serializer.WritableSerialization + A list of serialization classes that can be used for + obtaining serializers and deserializers. + + + + + + fs.defaultFS + file:/// + The name of the default file system. A URI whose + scheme and authority determine the FileSystem implementation. The + uri's scheme determines the config property (fs.SCHEME.impl) naming + the FileSystem implementation class. The uri's authority is used to + determine the host, port, etc. for a filesystem. + + + + fs.trash.interval + 0 + Number of minutes between trash checkpoints. + If zero, the trash feature is disabled. + + + + + fs.file.impl + org.apache.hadoop.fs.LocalFileSystem + The FileSystem for file: uris. + + + + fs.hdfs.impl + org.apache.hadoop.hdfs.DistributedFileSystem + The FileSystem for hdfs: uris. + + + + fs.s3.impl + org.apache.hadoop.fs.s3.S3FileSystem + The FileSystem for s3: uris. + + + + fs.s3n.impl + org.apache.hadoop.fs.s3native.NativeS3FileSystem + The FileSystem for s3n: (Native S3) uris. + + + + fs.kfs.impl + org.apache.hadoop.fs.kfs.KosmosFileSystem + The FileSystem for kfs: uris. + + + + fs.hftp.impl + org.apache.hadoop.hdfs.HftpFileSystem + + + + fs.hsftp.impl + org.apache.hadoop.hdfs.HsftpFileSystem + + + + fs.webhdfs.impl + org.apache.hadoop.hdfs.web.WebHdfsFileSystem + + + + fs.ftp.impl + org.apache.hadoop.fs.ftp.FTPFileSystem + The FileSystem for ftp: uris. + + + + fs.ramfs.impl + org.apache.hadoop.fs.InMemoryFileSystem + The FileSystem for ramfs: uris. + + + + fs.har.impl + org.apache.hadoop.fs.HarFileSystem + The filesystem for Hadoop archives. + + + + fs.har.impl.disable.cache + true + Don't cache 'har' filesystem instances. + + + + fs.checkpoint.dir + ${hadoop.tmp.dir}/dfs/namesecondary + Determines where on the local filesystem the DFS secondary + name node should store the temporary images to merge. + If this is a comma-delimited list of directories then the image is + replicated in all of the directories for redundancy. + + + + + fs.checkpoint.edits.dir + ${fs.checkpoint.dir} + Determines where on the local filesystem the DFS secondary + name node should store the temporary edits to merge. + If this is a comma-delimited list of directoires then teh edits is + replicated in all of the directoires for redundancy. + Default value is same as fs.checkpoint.dir + + + + + fs.checkpoint.period + 3600 + The number of seconds between two periodic checkpoints. + + + + + fs.checkpoint.size + 67108864 + The size of the current edit log (in bytes) that triggers + a periodic checkpoint even if the fs.checkpoint.period hasn't expired. + + + + + + + fs.s3.block.size + 67108864 + Block size to use when writing files to S3. + + + + fs.s3.buffer.dir + ${hadoop.tmp.dir}/s3 + Determines where on the local filesystem the S3 filesystem + should store files before sending them to S3 + (or after retrieving them from S3). + + + + + fs.s3.maxRetries + 4 + The maximum number of retries for reading or writing files to S3, + before we signal failure to the application. + + + + + fs.s3.sleepTimeSeconds + 10 + The number of seconds to sleep between each S3 retry. + + + + + + local.cache.size + 10737418240 + The limit on the size of cache you want to keep, set by default + to 10GB. This will act as a soft limit on the cache directory for out of band data. + + + + + io.seqfile.compress.blocksize + 1000000 + The minimum block size for compression in block compressed + SequenceFiles. + + + + + io.seqfile.lazydecompress + true + Should values of block-compressed SequenceFiles be decompressed + only when necessary. + + + + + io.seqfile.sorter.recordlimit + 1000000 + The limit on number of records to be kept in memory in a spill + in SequenceFiles.Sorter + + + + + io.mapfile.bloom.size + 1048576 + The size of BloomFilter-s used in BloomMapFile. Each time this many + keys is appended the next BloomFilter will be created (inside a DynamicBloomFilter). + Larger values minimize the number of filters, which slightly increases the performance, + but may waste too much space if the total number of keys is usually much smaller + than this number. + + + + + io.mapfile.bloom.error.rate + 0.005 + The rate of false positives in BloomFilter-s used in BloomMapFile. + As this value decreases, the size of BloomFilter-s increases exponentially. This + value is the probability of encountering false positives (default is 0.5%). + + + + + hadoop.util.hash.type + murmur + The default implementation of Hash. Currently this can take one of the + two values: 'murmur' to select MurmurHash and 'jenkins' to select JenkinsHash. + + + + + + + + ipc.client.idlethreshold + 4000 + Defines the threshold number of connections after which + connections will be inspected for idleness. + + + + + ipc.client.kill.max + 10 + Defines the maximum number of clients to disconnect in one go. + + + + + ipc.client.connection.maxidletime + 10000 + The maximum time in msec after which a client will bring down the + connection to the server. + + + + + ipc.client.connect.max.retries + 10 + Indicates the number of retries a client will make to establish + a server connection. + + + + + ipc.server.listen.queue.size + 128 + Indicates the length of the listen queue for servers accepting + client connections. + + + + + ipc.server.tcpnodelay + false + Turn on/off Nagle's algorithm for the TCP socket connection on + the server. Setting to true disables the algorithm and may decrease latency + with a cost of more/smaller packets. + + + + + ipc.client.tcpnodelay + false + Turn on/off Nagle's algorithm for the TCP socket connection on + the client. Setting to true disables the algorithm and may decrease latency + with a cost of more/smaller packets. + + + + + + + + webinterface.private.actions + false + If set to true, the web interfaces of JT and NN may contain + actions, such as kill job, delete file, etc., that should + not be exposed to public. Enable this option if the interfaces + are only reachable by those who have the right authorization. + + + + + + + hadoop.rpc.socket.factory.class.default + org.apache.hadoop.net.StandardSocketFactory + Default SocketFactory to use. This parameter is expected to be + formatted as "package.FactoryClassName". + + + + + hadoop.rpc.socket.factory.class.ClientProtocol + + SocketFactory to use to connect to a DFS. If null or empty, use + hadoop.rpc.socket.class.default. This socket factory is also used by + DFSClient to create sockets to DataNodes. + + + + + + + hadoop.socks.server + + Address (host:port) of the SOCKS server to be used by the + SocksSocketFactory. + + + + + + + + topology.node.switch.mapping.impl + org.apache.hadoop.net.ScriptBasedMapping + The default implementation of the DNSToSwitchMapping. It + invokes a script specified in topology.script.file.name to resolve + node names. If the value for topology.script.file.name is not set, the + default value of DEFAULT_RACK is returned for all node names. + + + + + net.topology.impl + org.apache.hadoop.net.NetworkTopology + The default implementation of NetworkTopology which is classic three layer one. + + + + + topology.script.file.name + + The script name that should be invoked to resolve DNS names to + NetworkTopology names. Example: the script would take host.foo.bar as an + argument, and return /rack1 as the output. + + + + + topology.script.number.args + 100 + The max number of args that the script configured with + topology.script.file.name should be run with. Each arg is an + IP address. + + + + + hadoop.security.uid.cache.secs + 14400 + NativeIO maintains a cache from UID to UserName. This is + the timeout for an entry in that cache. + + + + + + hadoop.http.authentication.type + simple + + Defines authentication used for Oozie HTTP endpoint. + Supported values are: simple | kerberos | #AUTHENTICATION_HANDLER_CLASSNAME# + + + + + hadoop.http.authentication.token.validity + 36000 + + Indicates how long (in seconds) an authentication token is valid before it has + to be renewed. + + + + + hadoop.http.authentication.signature.secret.file + ${user.home}/hadoop-http-auth-signature-secret + + The signature secret for signing the authentication tokens. + If not set a random secret is generated at startup time. + The same secret should be used for JT/NN/DN/TT configurations. + + + + + hadoop.http.authentication.cookie.domain + + + The domain to use for the HTTP cookie that stores the authentication token. + In order to authentiation to work correctly across all Hadoop nodes web-consoles + the domain must be correctly set. + IMPORTANT: when using IP addresses, browsers ignore cookies with domain settings. + For this setting to work properly all nodes in the cluster must be configured + to generate URLs with hostname.domain names on it. + + + + + hadoop.http.authentication.simple.anonymous.allowed + true + + Indicates if anonymous requests are allowed when using 'simple' authentication. + + + + + hadoop.http.authentication.kerberos.principal + HTTP/localhost@LOCALHOST + + Indicates the Kerberos principal to be used for HTTP endpoint. + The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification. + + + + + hadoop.http.authentication.kerberos.keytab + ${user.home}/hadoop.keytab + + Location of the keytab file with the credentials for the principal. + Referring to the same keytab file Oozie uses for its Kerberos credentials for Hadoop. + + + + + hadoop.relaxed.worker.version.check + false + + By default datanodes refuse to connect to namenodes if their build + revision (svn revision) do not match, and tasktrackers refuse to + connect to jobtrackers if their build version (version, revision, + user, and source checksum) do not match. This option changes the + behavior of hadoop workers to only check for a version match (eg + "1.0.2") but ignore the other build fields (revision, user, and + source checksum). + + + + + hadoop.skip.worker.version.check + false + + By default datanodes refuse to connect to namenodes if their build + revision (svn revision) do not match, and tasktrackers refuse to + connect to jobtrackers if their build version (version, revision, + user, and source checksum) do not match. This option changes the + behavior of hadoop workers to skip doing a version check at all. + This option supersedes the 'hadoop.relaxed.worker.version.check' + option. + + + + + hadoop.jetty.logs.serve.aliases + true + + Enable/Disable aliases serving from jetty + + + + + ipc.client.fallback-to-simple-auth-allowed + false + + When a client is configured to attempt a secure connection, but attempts to + connect to an insecure server, that server may instruct the client to + switch to SASL SIMPLE (unsecure) authentication. This setting controls + whether or not the client will accept this instruction from the server. + When false (the default), the client will not allow the fallback to SIMPLE + authentication, and will abort the connection. + + + + diff --git a/sahara/plugins/spark/resources/hdfs-default.xml b/sahara/plugins/spark/resources/hdfs-default.xml new file mode 100644 index 00000000..35c888fe --- /dev/null +++ b/sahara/plugins/spark/resources/hdfs-default.xml @@ -0,0 +1,709 @@ + + + + + + + + + + + dfs.namenode.logging.level + info + The logging level for dfs namenode. Other values are "dir"(trac +e namespace mutations), "block"(trace block under/over replications and block +creations/deletions), or "all". + + + + dfs.namenode.rpc-address + + + RPC address that handles all clients requests. If empty then we'll get the + value from fs.default.name. + The value of this property will take the form of hdfs://nn-host1:rpc-port. + + + + + dfs.secondary.http.address + 0.0.0.0:50090 + + The secondary namenode http server address and port. + If the port is 0 then the server will start on a free port. + + + + + dfs.datanode.address + 0.0.0.0:50010 + + The datanode server address and port for data transfer. + If the port is 0 then the server will start on a free port. + + + + + dfs.datanode.http.address + 0.0.0.0:50075 + + The datanode http server address and port. + If the port is 0 then the server will start on a free port. + + + + + dfs.datanode.ipc.address + 0.0.0.0:50020 + + The datanode ipc server address and port. + If the port is 0 then the server will start on a free port. + + + + + dfs.datanode.handler.count + 3 + The number of server threads for the datanode. + + + + dfs.http.address + 0.0.0.0:50070 + + The address and the base port where the dfs namenode web ui will listen on. + If the port is 0 then the server will start on a free port. + + + + + dfs.https.enable + false + Decide if HTTPS(SSL) is supported on HDFS + + + + + dfs.https.need.client.auth + false + Whether SSL client certificate authentication is required + + + + + dfs.https.server.keystore.resource + ssl-server.xml + Resource file from which ssl server keystore + information will be extracted + + + + + dfs.https.client.keystore.resource + ssl-client.xml + Resource file from which ssl client keystore + information will be extracted + + + + + dfs.datanode.https.address + 0.0.0.0:50475 + + + + dfs.https.address + 0.0.0.0:50470 + + + + dfs.datanode.dns.interface + default + The name of the Network Interface from which a data node should + report its IP address. + + + + + dfs.datanode.dns.nameserver + default + The host name or IP address of the name server (DNS) + which a DataNode should use to determine the host name used by the + NameNode for communication and display purposes. + + + + + + + dfs.replication.considerLoad + true + Decide if chooseTarget considers the target's load or not + + + + dfs.default.chunk.view.size + 32768 + The number of bytes to view for a file on the browser. + + + + + dfs.datanode.du.reserved + 0 + Reserved space in bytes per volume. Always leave this much space free for non dfs use. + + + + + dfs.namenode.name.dir + ${hadoop.tmp.dir}/dfs/name + Determines where on the local filesystem the DFS name node + should store the name table(fsimage). If this is a comma-delimited list + of directories then the name table is replicated in all of the + directories, for redundancy. + + + + dfs.name.edits.dir + ${dfs.name.dir} + Determines where on the local filesystem the DFS name node + should store the transaction (edits) file. If this is a comma-delimited list + of directories then the transaction file is replicated in all of the + directories, for redundancy. Default value is same as dfs.name.dir + + + + + dfs.namenode.edits.toleration.length + 0 + + The length in bytes that namenode is willing to tolerate when the edit log + is corrupted. The edit log toleration feature checks the entire edit log. + It computes read length (the length of valid data), corruption length and + padding length. In case that corruption length is non-zero, the corruption + will be tolerated only if the corruption length is less than or equal to + the toleration length. + + For disabling edit log toleration feature, set this property to -1. When + the feature is disabled, the end of edit log will not be checked. In this + case, namenode will startup normally even if the end of edit log is + corrupted. + + + + + dfs.web.ugi + webuser,webgroup + The user account used by the web interface. + Syntax: USERNAME,GROUP1,GROUP2, ... + + + + + dfs.permissions + true + + If "true", enable permission checking in HDFS. + If "false", permission checking is turned off, + but all other behavior is unchanged. + Switching from one parameter value to the other does not change the mode, + owner or group of files or directories. + + + + + dfs.permissions.supergroup + supergroup + The name of the group of super-users. + + + + dfs.block.access.token.enable + false + + If "true", access tokens are used as capabilities for accessing datanodes. + If "false", no access tokens are checked on accessing datanodes. + + + + + dfs.block.access.key.update.interval + 600 + + Interval in minutes at which namenode updates its access keys. + + + + + dfs.block.access.token.lifetime + 600 + The lifetime of access tokens in minutes. + + + + + dfs.datanode.data.dir + ${hadoop.tmp.dir}/dfs/data + Determines where on the local filesystem an DFS data node + should store its blocks. If this is a comma-delimited + list of directories, then data will be stored in all named + directories, typically on different devices. + Directories that do not exist are ignored. + + + + + dfs.datanode.data.dir.perm + 755 + Permissions for the directories on on the local filesystem where + the DFS data node store its blocks. The permissions can either be octal or + symbolic. + + + + dfs.replication + 3 + Default block replication. + The actual number of replications can be specified when the file is created. + The default is used if replication is not specified in create time. + + + + + dfs.replication.max + 512 + Maximal block replication. + + + + + dfs.replication.min + 1 + Minimal block replication. + + + + + dfs.block.size + 67108864 + The default block size for new files. + + + + dfs.df.interval + 60000 + Disk usage statistics refresh interval in msec. + + + + dfs.client.block.write.retries + 3 + The number of retries for writing blocks to the data nodes, + before we signal failure to the application. + + + + + dfs.blockreport.intervalMsec + 3600000 + Determines block reporting interval in milliseconds. + + + + dfs.blockreport.initialDelay 0 + Delay for first block report in seconds. + + + + dfs.heartbeat.interval + 3 + Determines datanode heartbeat interval in seconds. + + + + dfs.namenode.handler.count + 10 + The number of server threads for the namenode. + + + + dfs.safemode.threshold.pct + 0.999f + + Specifies the percentage of blocks that should satisfy + the minimal replication requirement defined by dfs.replication.min. + Values less than or equal to 0 mean not to wait for any particular + percentage of blocks before exiting safemode. + Values greater than 1 will make safe mode permanent. + + + + + dfs.namenode.safemode.min.datanodes + 0 + + Specifies the number of datanodes that must be considered alive + before the name node exits safemode. + Values less than or equal to 0 mean not to take the number of live + datanodes into account when deciding whether to remain in safe mode + during startup. + Values greater than the number of datanodes in the cluster + will make safe mode permanent. + + + + + dfs.safemode.extension + 30000 + + Determines extension of safe mode in milliseconds + after the threshold level is reached. + + + + + dfs.balance.bandwidthPerSec + 1048576 + + Specifies the maximum amount of bandwidth that each datanode + can utilize for the balancing purpose in term of + the number of bytes per second. + + + + + dfs.hosts + + Names a file that contains a list of hosts that are + permitted to connect to the namenode. The full pathname of the file + must be specified. If the value is empty, all hosts are + permitted. + + + + dfs.hosts.exclude + + Names a file that contains a list of hosts that are + not permitted to connect to the namenode. The full pathname of the + file must be specified. If the value is empty, no hosts are + excluded. + + + + dfs.max.objects + 0 + The maximum number of files, directories and blocks + dfs supports. A value of zero indicates no limit to the number + of objects that dfs supports. + + + + + dfs.namenode.decommission.interval + 30 + Namenode periodicity in seconds to check if decommission is + complete. + + + + dfs.namenode.decommission.nodes.per.interval + 5 + The number of nodes namenode checks if decommission is complete + in each dfs.namenode.decommission.interval. + + + + dfs.replication.interval + 3 + The periodicity in seconds with which the namenode computes + repliaction work for datanodes. + + + + dfs.access.time.precision + 3600000 + The access time for HDFS file is precise upto this value. + The default value is 1 hour. Setting a value of 0 disables + access times for HDFS. + + + + + dfs.support.append + + This option is no longer supported. HBase no longer requires that + this option be enabled as sync is now enabled by default. See + HADOOP-8230 for additional information. + + + + + dfs.namenode.delegation.key.update-interval + 86400000 + The update interval for master key for delegation tokens + in the namenode in milliseconds. + + + + + dfs.namenode.delegation.token.max-lifetime + 604800000 + The maximum lifetime in milliseconds for which a delegation + token is valid. + + + + + dfs.namenode.delegation.token.renew-interval + 86400000 + The renewal interval for delegation token in milliseconds. + + + + + dfs.datanode.failed.volumes.tolerated + 0 + The number of volumes that are allowed to + fail before a datanode stops offering service. By default + any volume failure will cause a datanode to shutdown. + + + + + dfs.datanode.max.xcievers + 4096 + Specifies the maximum number of threads to use for transferring data + in and out of the DN. + + + + + dfs.datanode.readahead.bytes + 4193404 + + While reading block files, if the Hadoop native libraries are available, + the datanode can use the posix_fadvise system call to explicitly + page data into the operating system buffer cache ahead of the current + reader's position. This can improve performance especially when + disks are highly contended. + + This configuration specifies the number of bytes ahead of the current + read position which the datanode will attempt to read ahead. This + feature may be disabled by configuring this property to 0. + + If the native libraries are not available, this configuration has no + effect. + + + + + dfs.datanode.drop.cache.behind.reads + false + + In some workloads, the data read from HDFS is known to be significantly + large enough that it is unlikely to be useful to cache it in the + operating system buffer cache. In this case, the DataNode may be + configured to automatically purge all data from the buffer cache + after it is delivered to the client. This behavior is automatically + disabled for workloads which read only short sections of a block + (e.g HBase random-IO workloads). + + This may improve performance for some workloads by freeing buffer + cache spage usage for more cacheable data. + + If the Hadoop native libraries are not available, this configuration + has no effect. + + + + + dfs.datanode.drop.cache.behind.writes + false + + In some workloads, the data written to HDFS is known to be significantly + large enough that it is unlikely to be useful to cache it in the + operating system buffer cache. In this case, the DataNode may be + configured to automatically purge all data from the buffer cache + after it is written to disk. + + This may improve performance for some workloads by freeing buffer + cache spage usage for more cacheable data. + + If the Hadoop native libraries are not available, this configuration + has no effect. + + + + + dfs.datanode.sync.behind.writes + false + + If this configuration is enabled, the datanode will instruct the + operating system to enqueue all written data to the disk immediately + after it is written. This differs from the usual OS policy which + may wait for up to 30 seconds before triggering writeback. + + This may improve performance for some workloads by smoothing the + IO profile for data written to disk. + + If the Hadoop native libraries are not available, this configuration + has no effect. + + + + + dfs.client.use.datanode.hostname + false + Whether clients should use datanode hostnames when + connecting to datanodes. + + + + + dfs.datanode.use.datanode.hostname + false + Whether datanodes should use datanode hostnames when + connecting to other datanodes for data transfer. + + + + + dfs.client.local.interfaces + + A comma separated list of network interface names to use + for data transfer between the client and datanodes. When creating + a connection to read from or write to a datanode, the client + chooses one of the specified interfaces at random and binds its + socket to the IP of that interface. Individual names may be + specified as either an interface name (eg "eth0"), a subinterface + name (eg "eth0:0"), or an IP address (which may be specified using + CIDR notation to match a range of IPs). + + + + + dfs.image.transfer.bandwidthPerSec + 0 + + Specifies the maximum amount of bandwidth that can be utilized + for image transfer in term of the number of bytes per second. + A default value of 0 indicates that throttling is disabled. + + + + + dfs.webhdfs.enabled + false + + Enable WebHDFS (REST API) in Namenodes and Datanodes. + + + + + dfs.namenode.kerberos.internal.spnego.principal + ${dfs.web.authentication.kerberos.principal} + + + + dfs.secondary.namenode.kerberos.internal.spnego.principal + ${dfs.web.authentication.kerberos.principal} + + + + dfs.namenode.invalidate.work.pct.per.iteration + 0.32f + + *Note*: Advanced property. Change with caution. + This determines the percentage amount of block + invalidations (deletes) to do over a single DN heartbeat + deletion command. The final deletion count is determined by applying this + percentage to the number of live nodes in the system. + The resultant number is the number of blocks from the deletion list + chosen for proper invalidation over a single heartbeat of a single DN. + Value should be a positive, non-zero percentage in float notation (X.Yf), + with 1.0f meaning 100%. + + + + + dfs.namenode.replication.work.multiplier.per.iteration + 2 + + *Note*: Advanced property. Change with caution. + This determines the total amount of block transfers to begin in + parallel at a DN, for replication, when such a command list is being + sent over a DN heartbeat by the NN. The actual number is obtained by + multiplying this multiplier with the total number of live nodes in the + cluster. The result number is the number of blocks to begin transfers + immediately for, per DN heartbeat. This number can be any positive, + non-zero integer. + + + + + dfs.namenode.avoid.read.stale.datanode + false + + Indicate whether or not to avoid reading from "stale" datanodes whose + heartbeat messages have not been received by the namenode + for more than a specified time interval. Stale datanodes will be + moved to the end of the node list returned for reading. See + dfs.namenode.avoid.write.stale.datanode for a similar setting for writes. + + + + + dfs.namenode.avoid.write.stale.datanode + false + + Indicate whether or not to avoid writing to "stale" datanodes whose + heartbeat messages have not been received by the namenode + for more than a specified time interval. Writes will avoid using + stale datanodes unless more than a configured ratio + (dfs.namenode.write.stale.datanode.ratio) of datanodes are marked as + stale. See dfs.namenode.avoid.read.stale.datanode for a similar setting + for reads. + + + + + dfs.namenode.stale.datanode.interval + 30000 + + Default time interval for marking a datanode as "stale", i.e., if + the namenode has not received heartbeat msg from a datanode for + more than this time interval, the datanode will be marked and treated + as "stale" by default. The stale interval cannot be too small since + otherwise this may cause too frequent change of stale states. + We thus set a minimum stale interval value (the default value is 3 times + of heartbeat interval) and guarantee that the stale interval cannot be less + than the minimum value. + + + + + dfs.namenode.write.stale.datanode.ratio + 0.5f + + When the ratio of number stale datanodes to total datanodes marked + is greater than this ratio, stop avoiding writing to stale nodes so + as to prevent causing hotspots. + + + + + dfs.datanode.plugins + + Comma-separated list of datanode plug-ins to be activated. + + + + + dfs.namenode.plugins + + Comma-separated list of namenode plug-ins to be activated. + + + + diff --git a/sahara/plugins/spark/resources/spark-env.sh.template b/sahara/plugins/spark/resources/spark-env.sh.template new file mode 100644 index 00000000..0a35ee7c --- /dev/null +++ b/sahara/plugins/spark/resources/spark-env.sh.template @@ -0,0 +1,21 @@ +#!/usr/bin/env bash + +# This file contains environment variables required to run Spark. Copy it as +# spark-env.sh and edit that to configure Spark for your site. +# +# The following variables can be set in this file: +# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node +# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos +# - SPARK_JAVA_OPTS, to set node-specific JVM options for Spark. Note that +# we recommend setting app-wide options in the application's driver program. +# Examples of node-specific options : -Dspark.local.dir, GC options +# Examples of app-wide options : -Dspark.serializer +# +# If using the standalone deploy mode, you can also set variables for it here: +# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname +# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports +# - SPARK_WORKER_CORES, to set the number of cores to use on this machine +# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g) +# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT +# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node + diff --git a/sahara/plugins/spark/resources/topology.sh b/sahara/plugins/spark/resources/topology.sh new file mode 100644 index 00000000..84838752 --- /dev/null +++ b/sahara/plugins/spark/resources/topology.sh @@ -0,0 +1,20 @@ +#!/bin/bash +HADOOP_CONF=/etc/hadoop + +while [ $# -gt 0 ] ; do + nodeArg=$1 + exec< ${HADOOP_CONF}/topology.data + result="" + while read line ; do + ar=( $line ) + if [ "${ar[0]}" = "$nodeArg" ] ; then + result="${ar[1]}" + fi + done + shift + if [ -z "$result" ] ; then + echo -n "/default/rack " + else + echo -n "$result " + fi +done \ No newline at end of file diff --git a/sahara/plugins/spark/run_scripts.py b/sahara/plugins/spark/run_scripts.py new file mode 100644 index 00000000..c62b3a64 --- /dev/null +++ b/sahara/plugins/spark/run_scripts.py @@ -0,0 +1,57 @@ +# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +def start_processes(remote, *processes): + for proc in processes: + if proc == "namenode": + remote.execute_command("sudo service hadoop-hdfs-namenode start") + elif proc == "datanode": + remote.execute_command("sudo service hadoop-hdfs-datanode start") + else: + remote.execute_command("screen -d -m sudo hadoop %s" % proc) + + +def refresh_nodes(remote, service): + remote.execute_command("sudo hadoop %s -refreshNodes" + % service) + + +def format_namenode(nn_remote): + nn_remote.execute_command("sudo -u hdfs hadoop namenode -format") + + +def clean_port_hadoop(nn_remote): + nn_remote.execute_command("sudo netstat -tlnp \ + | awk '/:8020 */ \ + {split($NF,a,\"/\"); print a[1]}' \ + | xargs sudo kill -9") + + +def start_spark_master(nn_remote): + nn_remote.execute_command("bash /opt/spark/sbin/start-all.sh") + + +def start_spark_slaves(nn_remote): + nn_remote.execute_command("bash /opt/spark/sbin/start-slaves.sh") + + +def stop_spark(nn_remote): + nn_remote.execute_command("bash /opt/spark/sbin/stop-all.sh") diff --git a/setup.cfg b/setup.cfg index 08a4e7a1..72f6543d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,6 +39,7 @@ sahara.cluster.plugins = vanilla = sahara.plugins.vanilla.plugin:VanillaProvider hdp = sahara.plugins.hdp.ambariplugin:AmbariPlugin fake = sahara.plugins.fake.plugin:FakePluginProvider + spark = sahara.plugins.spark.plugin:SparkProvider sahara.infrastructure.engine = direct = sahara.service.direct_engine:DirectEngine