diff --git a/MANIFEST.in b/MANIFEST.in
index 9af7f095..7ab75bdb 100644
--- a/MANIFEST.in
+++ b/MANIFEST.in
@@ -22,6 +22,9 @@ include sahara/plugins/hdp/versions/version_1_3_2/resources/*.sh
include sahara/plugins/hdp/versions/version_2_0_6/resources/*.template
include sahara/plugins/hdp/versions/version_2_0_6/resources/*.json
include sahara/plugins/hdp/versions/version_2_0_6/resources/*.sh
+include sahara/plugins/spark/resources/*.xml
+include sahara/plugins/spark/resources/*.sh
+include sahara/plugins/spark/resources/*.template
include sahara/resources/*.heat
include sahara/service/edp/resources/*.xml
include sahara/swift/resources/*.xml
diff --git a/sahara/plugins/spark/__init__.py b/sahara/plugins/spark/__init__.py
new file mode 100644
index 00000000..e69de29b
diff --git a/sahara/plugins/spark/config_helper.py b/sahara/plugins/spark/config_helper.py
new file mode 100644
index 00000000..7d70b14a
--- /dev/null
+++ b/sahara/plugins/spark/config_helper.py
@@ -0,0 +1,395 @@
+# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from oslo.config import cfg
+
+from sahara import conductor as c
+from sahara.openstack.common import log as logging
+from sahara.plugins.general import utils
+from sahara.plugins import provisioning as p
+from sahara.topology import topology_helper as topology
+from sahara.utils import types as types
+from sahara.utils import xmlutils as x
+
+
+conductor = c.API
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+CORE_DEFAULT = x.load_hadoop_xml_defaults(
+ 'plugins/spark/resources/core-default.xml')
+
+HDFS_DEFAULT = x.load_hadoop_xml_defaults(
+ 'plugins/spark/resources/hdfs-default.xml')
+
+XML_CONFS = {
+ "HDFS": [CORE_DEFAULT, HDFS_DEFAULT]
+}
+
+SPARK_CONFS = {
+ 'Spark': {
+ "OPTIONS": [
+ {
+ 'name': 'Master port',
+ 'description': 'Start the master on a different port'
+ ' (default: 7077)',
+ 'default': '7077',
+ 'priority': 2,
+ },
+ {
+ 'name': 'Worker port',
+ 'description': 'Start the Spark worker on a specific port'
+ ' (default: random)',
+ 'default': 'random',
+ 'priority': 2,
+ },
+ {
+ 'name': 'Master webui port',
+ 'description': 'Port for the master web UI (default: 8080)',
+ 'default': '8080',
+ 'priority': 1,
+ },
+ {
+ 'name': 'Worker webui port',
+ 'description': 'Port for the worker web UI (default: 8081)',
+ 'default': '8081',
+ 'priority': 1,
+ },
+ {
+ 'name': 'Worker cores',
+ 'description': 'Total number of cores to allow Spark'
+ ' applications to use on the machine'
+ ' (default: all available cores)',
+ 'default': 'all',
+ 'priority': 2,
+ },
+ {
+ 'name': 'Worker memory',
+ 'description': 'Total amount of memory to allow Spark'
+ ' applications to use on the machine, e.g. 1000m,'
+ ' 2g (default: total memory minus 1 GB)',
+ 'default': 'all',
+ 'priority': 1,
+ },
+ {
+ 'name': 'Worker instances',
+ 'description': 'Number of worker instances to run on each'
+ ' machine (default: 1)',
+ 'default': '1',
+ 'priority': 2,
+ }
+ ]
+ }
+}
+
+ENV_CONFS = {
+ "HDFS": {
+ 'Name Node Heap Size': 'HADOOP_NAMENODE_OPTS=\\"-Xmx%sm\\"',
+ 'Data Node Heap Size': 'HADOOP_DATANODE_OPTS=\\"-Xmx%sm\\"'
+ }
+}
+
+ENABLE_DATA_LOCALITY = p.Config('Enable Data Locality', 'general', 'cluster',
+ config_type="bool", priority=1,
+ default_value=True, is_optional=True)
+
+HIDDEN_CONFS = ['fs.defaultFS', 'dfs.namenode.name.dir',
+ 'dfs.datanode.data.dir']
+
+CLUSTER_WIDE_CONFS = ['dfs.block.size', 'dfs.permissions', 'dfs.replication',
+ 'dfs.replication.min', 'dfs.replication.max',
+ 'io.file.buffer.size']
+
+PRIORITY_1_CONFS = ['dfs.datanode.du.reserved',
+ 'dfs.datanode.failed.volumes.tolerated',
+ 'dfs.datanode.max.xcievers', 'dfs.datanode.handler.count',
+ 'dfs.namenode.handler.count']
+
+# for now we have not so many cluster-wide configs
+# lets consider all of them having high priority
+PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
+
+
+def _initialise_configs():
+ configs = []
+ for service, config_lists in XML_CONFS.iteritems():
+ for config_list in config_lists:
+ for config in config_list:
+ if config['name'] not in HIDDEN_CONFS:
+ cfg = p.Config(config['name'], service, "node",
+ is_optional=True, config_type="string",
+ default_value=str(config['value']),
+ description=config['description'])
+ if cfg.default_value in ["true", "false"]:
+ cfg.config_type = "bool"
+ cfg.default_value = (cfg.default_value == 'true')
+ elif types.is_int(cfg.default_value):
+ cfg.config_type = "int"
+ cfg.default_value = int(cfg.default_value)
+ if config['name'] in CLUSTER_WIDE_CONFS:
+ cfg.scope = 'cluster'
+ if config['name'] in PRIORITY_1_CONFS:
+ cfg.priority = 1
+ configs.append(cfg)
+
+ for service, config_items in ENV_CONFS.iteritems():
+ for name, param_format_str in config_items.iteritems():
+ configs.append(p.Config(name, service, "node",
+ default_value=1024, priority=1,
+ config_type="int"))
+
+ for service, config_items in SPARK_CONFS.iteritems():
+ for item in config_items['OPTIONS']:
+ cfg = p.Config(name=item["name"],
+ description=item["description"],
+ default_value=item["default"],
+ applicable_target=service,
+ scope="cluster", is_optional=True,
+ priority=item["priority"])
+ configs.append(cfg)
+
+ if CONF.enable_data_locality:
+ configs.append(ENABLE_DATA_LOCALITY)
+
+ return configs
+
+# Initialise plugin Hadoop configurations
+PLUGIN_CONFIGS = _initialise_configs()
+
+
+def get_plugin_configs():
+ return PLUGIN_CONFIGS
+
+
+def get_config_value(service, name, cluster=None):
+ if cluster:
+ for ng in cluster.node_groups:
+ if (ng.configuration().get(service) and
+ ng.configuration()[service].get(name)):
+ return ng.configuration()[service][name]
+
+ for c in PLUGIN_CONFIGS:
+ if c.applicable_target == service and c.name == name:
+ return c.default_value
+
+ raise RuntimeError("Unable to get parameter '%s' from service %s",
+ name, service)
+
+
+def generate_cfg_from_general(cfg, configs, general_config,
+ rest_excluded=False):
+ if 'general' in configs:
+ for nm in general_config:
+ if nm not in configs['general'] and not rest_excluded:
+ configs['general'][nm] = general_config[nm]['default_value']
+ for name, value in configs['general'].items():
+ if value:
+ cfg = _set_config(cfg, general_config, name)
+ LOG.info("Applying config: %s" % name)
+ else:
+ cfg = _set_config(cfg, general_config)
+ return cfg
+
+
+def _get_hostname(service):
+ return service.hostname() if service else None
+
+
+def generate_xml_configs(configs, storage_path, nn_hostname, hadoop_port):
+ """dfs.name.dir': extract_hadoop_path(storage_path,
+ '/lib/hadoop/hdfs/namenode'),
+ 'dfs.data.dir': extract_hadoop_path(storage_path,
+ '/lib/hadoop/hdfs/datanode'),
+ 'dfs.name.dir': storage_path + 'hdfs/name',
+ 'dfs.data.dir': storage_path + 'hdfs/data',
+
+ 'dfs.hosts': '/etc/hadoop/dn.incl',
+ 'dfs.hosts.exclude': '/etc/hadoop/dn.excl',
+ """
+ if hadoop_port is None:
+ hadoop_port = 8020
+
+ cfg = {
+ 'fs.defaultFS': 'hdfs://%s:%s' % (nn_hostname, str(hadoop_port)),
+ 'dfs.namenode.name.dir': extract_hadoop_path(storage_path,
+ '/dfs/nn'),
+ 'dfs.datanode.data.dir': extract_hadoop_path(storage_path,
+ '/dfs/dn'),
+ 'hadoop.tmp.dir': extract_hadoop_path(storage_path,
+ '/dfs'),
+ }
+
+ # inserting user-defined configs
+ for key, value in extract_hadoop_xml_confs(configs):
+ cfg[key] = value
+
+ # invoking applied configs to appropriate xml files
+ core_all = CORE_DEFAULT
+
+ if CONF.enable_data_locality:
+ cfg.update(topology.TOPOLOGY_CONFIG)
+ # applying vm awareness configs
+ core_all += topology.vm_awareness_core_config()
+
+ xml_configs = {
+ 'core-site': x.create_hadoop_xml(cfg, core_all),
+ 'hdfs-site': x.create_hadoop_xml(cfg, HDFS_DEFAULT)
+ }
+
+ return xml_configs
+
+
+def _get_spark_opt_default(opt_name):
+ for opt in SPARK_CONFS["Spark"]["OPTIONS"]:
+ if opt_name == opt["name"]:
+ return opt["default"]
+ return None
+
+
+def generate_spark_env_configs(cluster):
+ configs = []
+
+ # master configuration
+ sp_master = utils.get_instance(cluster, "master")
+ configs.append('SPARK_MASTER_IP=' + sp_master.hostname())
+
+ masterport = get_config_value("Spark", "Master port", cluster)
+ if masterport and masterport != _get_spark_opt_default("Master port"):
+ configs.append('SPARK_MASTER_PORT=' + str(masterport))
+
+ masterwebport = get_config_value("Spark", "Master webui port", cluster)
+ if masterwebport and \
+ masterwebport != _get_spark_opt_default("Master webui port"):
+ configs.append('SPARK_MASTER_WEBUI_PORT=' + str(masterwebport))
+
+ # configuration for workers
+ workercores = get_config_value("Spark", "Worker cores", cluster)
+ if workercores and workercores != _get_spark_opt_default("Worker cores"):
+ configs.append('SPARK_WORKER_CORES=' + str(workercores))
+
+ workermemory = get_config_value("Spark", "Worker memory", cluster)
+ if workermemory and \
+ workermemory != _get_spark_opt_default("Worker memory"):
+ configs.append('SPARK_WORKER_MEMORY=' + str(workermemory))
+
+ workerport = get_config_value("Spark", "Worker port", cluster)
+ if workerport and workerport != _get_spark_opt_default("Worker port"):
+ configs.append('SPARK_WORKER_PORT=' + str(workerport))
+
+ workerwebport = get_config_value("Spark", "Worker webui port", cluster)
+ if workerwebport and \
+ workerwebport != _get_spark_opt_default("Worker webui port"):
+ configs.append('SPARK_WORKER_WEBUI_PORT=' + str(workerwebport))
+
+ workerinstances = get_config_value("Spark", "Worker instances", cluster)
+ if workerinstances and \
+ workerinstances != _get_spark_opt_default("Worker instances"):
+ configs.append('SPARK_WORKER_INSTANCES=' + str(workerinstances))
+ return '\n'.join(configs)
+
+
+# workernames need to be a list of woker names
+def generate_spark_slaves_configs(workernames):
+ return '\n'.join(workernames)
+
+
+def extract_hadoop_environment_confs(configs):
+ """Returns list of Hadoop parameters which should be passed via environment
+ """
+ lst = []
+ for service, srv_confs in configs.items():
+ if ENV_CONFS.get(service):
+ for param_name, param_value in srv_confs.items():
+ for cfg_name, cfg_format_str in ENV_CONFS[service].items():
+ if param_name == cfg_name and param_value is not None:
+ lst.append(cfg_format_str % param_value)
+ return lst
+
+
+def extract_hadoop_xml_confs(configs):
+ """Returns list of Hadoop parameters which should be passed into general
+ configs like core-site.xml
+ """
+ lst = []
+ for service, srv_confs in configs.items():
+ if XML_CONFS.get(service):
+ for param_name, param_value in srv_confs.items():
+ for cfg_list in XML_CONFS[service]:
+ names = [cfg['name'] for cfg in cfg_list]
+ if param_name in names and param_value is not None:
+ lst.append((param_name, param_value))
+ return lst
+
+
+def generate_hadoop_setup_script(storage_paths, env_configs):
+ script_lines = ["#!/bin/bash -x"]
+ script_lines.append("echo -n > /tmp/hadoop-env.sh")
+ for line in env_configs:
+ if 'HADOOP' in line:
+ script_lines.append('echo "%s" >> /tmp/hadoop-env.sh' % line)
+ script_lines.append("cat /etc/hadoop/hadoop-env.sh >> /tmp/hadoop-env.sh")
+ script_lines.append("cp /tmp/hadoop-env.sh /etc/hadoop/hadoop-env.sh")
+
+ hadoop_log = storage_paths[0] + "/log/hadoop/\$USER/"
+ script_lines.append('sed -i "s,export HADOOP_LOG_DIR=.*,'
+ 'export HADOOP_LOG_DIR=%s," /etc/hadoop/hadoop-env.sh'
+ % hadoop_log)
+
+ hadoop_log = storage_paths[0] + "/log/hadoop/hdfs"
+ script_lines.append('sed -i "s,export HADOOP_SECURE_DN_LOG_DIR=.*,'
+ 'export HADOOP_SECURE_DN_LOG_DIR=%s," '
+ '/etc/hadoop/hadoop-env.sh' % hadoop_log)
+
+ for path in storage_paths:
+ script_lines.append("chown -R hadoop:hadoop %s" % path)
+ script_lines.append("chmod -R 755 %s" % path)
+ return "\n".join(script_lines)
+
+
+def extract_name_values(configs):
+ return dict((cfg['name'], cfg['value']) for cfg in configs)
+
+
+def extract_hadoop_path(lst, hadoop_dir):
+ if lst:
+ return ",".join([p + hadoop_dir for p in lst])
+
+
+def _set_config(cfg, gen_cfg, name=None):
+ if name in gen_cfg:
+ cfg.update(gen_cfg[name]['conf'])
+ if name is None:
+ for name in gen_cfg:
+ cfg.update(gen_cfg[name]['conf'])
+ return cfg
+
+
+def _is_general_option_enabled(cluster, option):
+ for ng in cluster.node_groups:
+ conf = ng.configuration()
+ if 'general' in conf and option.name in conf['general']:
+ return conf['general'][option.name]
+ return option.default_value
+
+
+def is_data_locality_enabled(cluster):
+ if not CONF.enable_data_locality:
+ return False
+ return _is_general_option_enabled(cluster, ENABLE_DATA_LOCALITY)
+
+
+def get_port_from_config(service, name, cluster=None):
+ address = get_config_value(service, name, cluster)
+ return utils.get_port_from_address(address)
diff --git a/sahara/plugins/spark/plugin.py b/sahara/plugins/spark/plugin.py
new file mode 100644
index 00000000..d78ccfa3
--- /dev/null
+++ b/sahara/plugins/spark/plugin.py
@@ -0,0 +1,296 @@
+# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from oslo.config import cfg
+
+from sahara import conductor
+from sahara import context
+from sahara.openstack.common import log as logging
+from sahara.plugins.general import exceptions as ex
+from sahara.plugins.general import utils
+from sahara.plugins import provisioning as p
+from sahara.plugins.spark import config_helper as c_helper
+from sahara.plugins.spark import run_scripts as run
+from sahara.topology import topology_helper as th
+from sahara.utils import files as f
+from sahara.utils import remote
+
+
+conductor = conductor.API
+LOG = logging.getLogger(__name__)
+CONF = cfg.CONF
+
+
+class SparkProvider(p.ProvisioningPluginBase):
+ def __init__(self):
+ self.processes = {
+ "HDFS": ["namenode", "datanode"],
+ "Spark": ["master", "slave"]
+ }
+
+ def get_title(self):
+ return "Apache Spark"
+
+ def get_description(self):
+ return (
+ "This plugin provides an ability to launch Spark on Hadoop "
+ "CDH cluster without any management consoles.")
+
+ def get_versions(self):
+ return ['0.9.1']
+
+ def get_configs(self, hadoop_version):
+ return c_helper.get_plugin_configs()
+
+ def get_node_processes(self, hadoop_version):
+ return self.processes
+
+ def validate(self, cluster):
+ nn_count = sum([ng.count for ng
+ in utils.get_node_groups(cluster, "namenode")])
+ if nn_count != 1:
+ raise ex.InvalidComponentCountException("namenode", 1, nn_count)
+
+ dn_count = sum([ng.count for ng
+ in utils.get_node_groups(cluster, "datanode")])
+ if dn_count < 1:
+ raise ex.InvalidComponentCountException("datanode", "1 or more",
+ nn_count)
+
+ # validate Spark Master Node and Spark Slaves
+ sm_count = sum([ng.count for ng
+ in utils.get_node_groups(cluster, "master")])
+
+ if sm_count != 1:
+ raise ex.RequiredServiceMissingException("Spark master")
+
+ sl_count = sum([ng.count for ng
+ in utils.get_node_groups(cluster, "slave")])
+
+ if sl_count < 1:
+ raise ex.InvalidComponentCountException("Spark slave", "1 or more",
+ sl_count)
+
+ def update_infra(self, cluster):
+ pass
+
+ def configure_cluster(self, cluster):
+ self._setup_instances(cluster)
+
+ def start_cluster(self, cluster):
+ nn_instance = utils.get_instance(cluster, "namenode")
+ sm_instance = utils.get_instance(cluster, "master")
+ dn_instances = utils.get_instances(cluster, "datanode")
+
+ # Start the name node
+ with remote.get_remote(nn_instance) as r:
+ run.format_namenode(r)
+ run.start_processes(r, "namenode")
+
+ # start the data nodes
+ self._start_slave_datanode_processes(dn_instances)
+
+ LOG.info("Hadoop services in cluster %s have been started" %
+ cluster.name)
+
+ with remote.get_remote(nn_instance) as r:
+ r.execute_command("sudo -u hdfs hdfs dfs -mkdir -p /user/$USER/")
+ r.execute_command("sudo -u hdfs hdfs dfs -chown $USER \
+ /user/$USER/")
+
+ # start spark nodes
+ if sm_instance:
+ with remote.get_remote(sm_instance) as r:
+ run.start_spark_master(r)
+ LOG.info("Spark service at '%s' has been started",
+ sm_instance.hostname())
+
+ LOG.info('Cluster %s has been started successfully' % cluster.name)
+ self._set_cluster_info(cluster)
+
+ def _extract_configs_to_extra(self, cluster):
+ nn = utils.get_instance(cluster, "namenode")
+ sp_master = utils.get_instance(cluster, "master")
+ sp_slaves = utils.get_instances(cluster, "slave")
+
+ extra = dict()
+
+ config_master = config_slaves = ''
+ if sp_master is not None:
+ config_master = c_helper.generate_spark_env_configs(cluster)
+
+ if sp_slaves is not None:
+ slavenames = []
+ for slave in sp_slaves:
+ slavenames.append(slave.hostname())
+ config_slaves = c_helper.generate_spark_slaves_configs(slavenames)
+ else:
+ config_slaves = "\n"
+
+ for ng in cluster.node_groups:
+ extra[ng.id] = {
+ 'xml': c_helper.generate_xml_configs(
+ ng.configuration(),
+ ng.storage_paths(),
+ nn.hostname(), None,
+ ),
+ 'setup_script': c_helper.generate_hadoop_setup_script(
+ ng.storage_paths(),
+ c_helper.extract_hadoop_environment_confs(
+ ng.configuration())
+ ),
+ 'sp_master': config_master,
+ 'sp_slaves': config_slaves
+ }
+
+ if c_helper.is_data_locality_enabled(cluster):
+ topology_data = th.generate_topology_map(
+ cluster, CONF.enable_hypervisor_awareness)
+ extra['topology_data'] = "\n".join(
+ [k + " " + v for k, v in topology_data.items()]) + "\n"
+
+ return extra
+
+ def validate_scaling(self, cluster, existing, additional):
+ raise ex.ClusterCannotBeScaled("Scaling Spark clusters has not been"
+ "implemented yet")
+
+ def decommission_nodes(self, cluster, instances):
+ pass
+
+ def scale_cluster(self, cluster, instances):
+ pass
+
+ def _start_slave_datanode_processes(self, dn_instances):
+ with context.ThreadGroup() as tg:
+ for i in dn_instances:
+ tg.spawn('spark-start-dn-%s' % i.instance_name,
+ self._start_datanode, i)
+
+ def _start_datanode(self, instance):
+ with instance.remote() as r:
+ run.start_processes(r, "datanode")
+
+ def _setup_instances(self, cluster):
+ extra = self._extract_configs_to_extra(cluster)
+
+ instances = utils.get_instances(cluster)
+ self._push_configs_to_nodes(cluster, extra, instances)
+
+ def _push_configs_to_nodes(self, cluster, extra, new_instances):
+ all_instances = utils.get_instances(cluster)
+ with context.ThreadGroup() as tg:
+ for instance in all_instances:
+ if instance in new_instances:
+ tg.spawn('spark-configure-%s' % instance.instance_name,
+ self._push_configs_to_new_node, cluster,
+ extra, instance)
+
+ def _push_configs_to_new_node(self, cluster, extra, instance):
+ ng_extra = extra[instance.node_group.id]
+
+ files_hadoop = {
+ '/etc/hadoop/conf/core-site.xml': ng_extra['xml']['core-site'],
+ '/etc/hadoop/conf/hdfs-site.xml': ng_extra['xml']['hdfs-site'],
+ }
+
+ files_spark = {
+ '/opt/spark/conf/spark-env.sh': ng_extra['sp_master'],
+ '/opt/spark/conf/slaves': ng_extra['sp_slaves']
+ }
+
+ files_init = {
+ '/tmp/sahara-hadoop-init.sh': ng_extra['setup_script'],
+ 'id_rsa': cluster.management_private_key,
+ 'authorized_keys': cluster.management_public_key
+ }
+
+ # pietro: This is required because the (secret) key is not stored in
+ # .ssh which hinders password-less ssh required by spark scripts
+ key_cmd = 'sudo cp $HOME/id_rsa $HOME/.ssh/; '\
+ 'sudo chown $USER $HOME/.ssh/id_rsa; '\
+ 'sudo chmod 600 $HOME/.ssh/id_rsa'
+
+ for ng in cluster.node_groups:
+ dn_path = c_helper.extract_hadoop_path(ng.storage_paths(),
+ '/dfs/dn')
+ nn_path = c_helper.extract_hadoop_path(ng.storage_paths(),
+ '/dfs/nn')
+ hdfs_dir_cmd = 'sudo mkdir -p %s %s;'\
+ 'sudo chown -R hdfs:hadoop %s %s;'\
+ 'sudo chmod 755 %s %s;'\
+ % (nn_path, dn_path,
+ nn_path, dn_path,
+ nn_path, dn_path)
+
+ with remote.get_remote(instance) as r:
+ r.execute_command(
+ 'sudo chown -R $USER:$USER /etc/hadoop'
+ )
+ r.execute_command(
+ 'sudo chown -R $USER:$USER /opt/spark'
+ )
+ r.write_files_to(files_hadoop)
+ r.write_files_to(files_spark)
+ r.write_files_to(files_init)
+ r.execute_command(
+ 'sudo chmod 0500 /tmp/sahara-hadoop-init.sh'
+ )
+ r.execute_command(
+ 'sudo /tmp/sahara-hadoop-init.sh '
+ '>> /tmp/sahara-hadoop-init.log 2>&1')
+
+ r.execute_command(hdfs_dir_cmd)
+ r.execute_command(key_cmd)
+
+ if c_helper.is_data_locality_enabled(cluster):
+ r.write_file_to(
+ '/etc/hadoop/topology.sh',
+ f.get_file_text(
+ 'plugins/spark/resources/topology.sh'))
+ r.execute_command(
+ 'sudo chmod +x /etc/hadoop/topology.sh'
+ )
+
+ self._write_topology_data(r, cluster, extra)
+
+ def _write_topology_data(self, r, cluster, extra):
+ if c_helper.is_data_locality_enabled(cluster):
+ topology_data = extra['topology_data']
+ r.write_file_to('/etc/hadoop/topology.data', topology_data)
+
+ def _set_cluster_info(self, cluster):
+ nn = utils.get_instance(cluster, "namenode")
+ sp_master = utils.get_instance(cluster, "master")
+ info = {}
+
+ if nn:
+ address = c_helper.get_config_value(
+ 'HDFS', 'dfs.http.address', cluster)
+ port = address[address.rfind(':') + 1:]
+ info['HDFS'] = {
+ 'Web UI': 'http://%s:%s' % (nn.management_ip, port)
+ }
+ info['HDFS']['NameNode'] = 'hdfs://%s:8020' % nn.hostname()
+
+ if sp_master:
+ port = c_helper.get_config_value(
+ 'Spark', 'Master webui port', cluster)
+ if port is not None:
+ info['Spark'] = {
+ 'Web UI': 'http://%s:%s' % (sp_master.management_ip, port)
+ }
+ ctx = context.ctx()
+ conductor.cluster_update(ctx, cluster, {'info': info})
diff --git a/sahara/plugins/spark/resources/README.rst b/sahara/plugins/spark/resources/README.rst
new file mode 100644
index 00000000..741fcf7a
--- /dev/null
+++ b/sahara/plugins/spark/resources/README.rst
@@ -0,0 +1,21 @@
+Apache Spark and HDFS Configurations for Sahara
+===============================================
+
+This directory contains default XML configuration files and Spark scripts:
+
+* core-default.xml,
+* hdfs-default.xml,
+* spark-env.sh.template,
+* topology.sh
+
+These files are used by Sahara's plugin for Apache Spark and Cloudera HDFS.
+XML config files were taken from here:
+ * https://github.com/apache/hadoop-common/blob/release-1.2.1/src/core/core-default.xml
+ * https://github.com/apache/hadoop-common/blob/release-1.2.1/src/hdfs/hdfs-default.xml
+
+Cloudera packages use the same configuration files as standard Apache Hadoop.
+
+XML configs are used to expose default Hadoop configurations to the users through
+Sahara's REST API. It allows users to override some config values which will be
+pushed to the provisioned VMs running Hadoop services as part of appropriate xml
+config.
diff --git a/sahara/plugins/spark/resources/core-default.xml b/sahara/plugins/spark/resources/core-default.xml
new file mode 100644
index 00000000..32e20c45
--- /dev/null
+++ b/sahara/plugins/spark/resources/core-default.xml
@@ -0,0 +1,632 @@
+
+
+
+
+
+
+
+
+
+
+
+
+ hadoop.tmp.dir
+ /tmp/hadoop-${user.name}
+ A base for other temporary directories.
+
+
+
+ hadoop.native.lib
+ true
+ Should native hadoop libraries, if present, be used.
+
+
+
+ hadoop.http.filter.initializers
+
+ A comma separated list of class names. Each class in the list
+ must extend org.apache.hadoop.http.FilterInitializer. The corresponding
+ Filter will be initialized. Then, the Filter will be applied to all user
+ facing jsp and servlet web pages. The ordering of the list defines the
+ ordering of the filters.
+
+
+
+ hadoop.security.group.mapping
+ org.apache.hadoop.security.ShellBasedUnixGroupsMapping
+ Class for user to group mapping (get groups for a given user)
+
+
+
+
+ hadoop.security.authorization
+ false
+ Is service-level authorization enabled?
+
+
+
+ hadoop.security.instrumentation.requires.admin
+ false
+
+ Indicates if administrator ACLs are required to access
+ instrumentation servlets (JMX, METRICS, CONF, STACKS).
+
+
+
+
+ hadoop.security.authentication
+ simple
+ Possible values are simple (no authentication), and kerberos
+
+
+
+
+ hadoop.security.token.service.use_ip
+ true
+ Controls whether tokens always use IP addresses. DNS changes
+ will not be detected if this option is enabled. Existing client connections
+ that break will always reconnect to the IP of the original host. New clients
+ will connect to the host's new IP but fail to locate a token. Disabling
+ this option will allow existing and new clients to detect an IP change and
+ continue to locate the new host's token.
+
+
+
+
+ hadoop.security.use-weak-http-crypto
+ false
+ If enabled, use KSSL to authenticate HTTP connections to the
+ NameNode. Due to a bug in JDK6, using KSSL requires one to configure
+ Kerberos tickets to use encryption types that are known to be
+ cryptographically weak. If disabled, SPNEGO will be used for HTTP
+ authentication, which supports stronger encryption types.
+
+
+
+
+
+
+
+
+ hadoop.logfile.size
+ 10000000
+ The max size of each log file
+
+
+
+ hadoop.logfile.count
+ 10
+ The max number of log files
+
+
+
+
+ io.file.buffer.size
+ 4096
+ The size of buffer for use in sequence files.
+ The size of this buffer should probably be a multiple of hardware
+ page size (4096 on Intel x86), and it determines how much data is
+ buffered during read and write operations.
+
+
+
+ io.bytes.per.checksum
+ 512
+ The number of bytes per checksum. Must not be larger than
+ io.file.buffer.size.
+
+
+
+ io.skip.checksum.errors
+ false
+ If true, when a checksum error is encountered while
+ reading a sequence file, entries are skipped, instead of throwing an
+ exception.
+
+
+
+ io.compression.codecs
+ org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec
+ A list of the compression codec classes that can be used
+ for compression/decompression.
+
+
+
+ io.serializations
+ org.apache.hadoop.io.serializer.WritableSerialization
+ A list of serialization classes that can be used for
+ obtaining serializers and deserializers.
+
+
+
+
+
+ fs.defaultFS
+ file:///
+ The name of the default file system. A URI whose
+ scheme and authority determine the FileSystem implementation. The
+ uri's scheme determines the config property (fs.SCHEME.impl) naming
+ the FileSystem implementation class. The uri's authority is used to
+ determine the host, port, etc. for a filesystem.
+
+
+
+ fs.trash.interval
+ 0
+ Number of minutes between trash checkpoints.
+ If zero, the trash feature is disabled.
+
+
+
+
+ fs.file.impl
+ org.apache.hadoop.fs.LocalFileSystem
+ The FileSystem for file: uris.
+
+
+
+ fs.hdfs.impl
+ org.apache.hadoop.hdfs.DistributedFileSystem
+ The FileSystem for hdfs: uris.
+
+
+
+ fs.s3.impl
+ org.apache.hadoop.fs.s3.S3FileSystem
+ The FileSystem for s3: uris.
+
+
+
+ fs.s3n.impl
+ org.apache.hadoop.fs.s3native.NativeS3FileSystem
+ The FileSystem for s3n: (Native S3) uris.
+
+
+
+ fs.kfs.impl
+ org.apache.hadoop.fs.kfs.KosmosFileSystem
+ The FileSystem for kfs: uris.
+
+
+
+ fs.hftp.impl
+ org.apache.hadoop.hdfs.HftpFileSystem
+
+
+
+ fs.hsftp.impl
+ org.apache.hadoop.hdfs.HsftpFileSystem
+
+
+
+ fs.webhdfs.impl
+ org.apache.hadoop.hdfs.web.WebHdfsFileSystem
+
+
+
+ fs.ftp.impl
+ org.apache.hadoop.fs.ftp.FTPFileSystem
+ The FileSystem for ftp: uris.
+
+
+
+ fs.ramfs.impl
+ org.apache.hadoop.fs.InMemoryFileSystem
+ The FileSystem for ramfs: uris.
+
+
+
+ fs.har.impl
+ org.apache.hadoop.fs.HarFileSystem
+ The filesystem for Hadoop archives.
+
+
+
+ fs.har.impl.disable.cache
+ true
+ Don't cache 'har' filesystem instances.
+
+
+
+ fs.checkpoint.dir
+ ${hadoop.tmp.dir}/dfs/namesecondary
+ Determines where on the local filesystem the DFS secondary
+ name node should store the temporary images to merge.
+ If this is a comma-delimited list of directories then the image is
+ replicated in all of the directories for redundancy.
+
+
+
+
+ fs.checkpoint.edits.dir
+ ${fs.checkpoint.dir}
+ Determines where on the local filesystem the DFS secondary
+ name node should store the temporary edits to merge.
+ If this is a comma-delimited list of directoires then teh edits is
+ replicated in all of the directoires for redundancy.
+ Default value is same as fs.checkpoint.dir
+
+
+
+
+ fs.checkpoint.period
+ 3600
+ The number of seconds between two periodic checkpoints.
+
+
+
+
+ fs.checkpoint.size
+ 67108864
+ The size of the current edit log (in bytes) that triggers
+ a periodic checkpoint even if the fs.checkpoint.period hasn't expired.
+
+
+
+
+
+
+ fs.s3.block.size
+ 67108864
+ Block size to use when writing files to S3.
+
+
+
+ fs.s3.buffer.dir
+ ${hadoop.tmp.dir}/s3
+ Determines where on the local filesystem the S3 filesystem
+ should store files before sending them to S3
+ (or after retrieving them from S3).
+
+
+
+
+ fs.s3.maxRetries
+ 4
+ The maximum number of retries for reading or writing files to S3,
+ before we signal failure to the application.
+
+
+
+
+ fs.s3.sleepTimeSeconds
+ 10
+ The number of seconds to sleep between each S3 retry.
+
+
+
+
+
+ local.cache.size
+ 10737418240
+ The limit on the size of cache you want to keep, set by default
+ to 10GB. This will act as a soft limit on the cache directory for out of band data.
+
+
+
+
+ io.seqfile.compress.blocksize
+ 1000000
+ The minimum block size for compression in block compressed
+ SequenceFiles.
+
+
+
+
+ io.seqfile.lazydecompress
+ true
+ Should values of block-compressed SequenceFiles be decompressed
+ only when necessary.
+
+
+
+
+ io.seqfile.sorter.recordlimit
+ 1000000
+ The limit on number of records to be kept in memory in a spill
+ in SequenceFiles.Sorter
+
+
+
+
+ io.mapfile.bloom.size
+ 1048576
+ The size of BloomFilter-s used in BloomMapFile. Each time this many
+ keys is appended the next BloomFilter will be created (inside a DynamicBloomFilter).
+ Larger values minimize the number of filters, which slightly increases the performance,
+ but may waste too much space if the total number of keys is usually much smaller
+ than this number.
+
+
+
+
+ io.mapfile.bloom.error.rate
+ 0.005
+ The rate of false positives in BloomFilter-s used in BloomMapFile.
+ As this value decreases, the size of BloomFilter-s increases exponentially. This
+ value is the probability of encountering false positives (default is 0.5%).
+
+
+
+
+ hadoop.util.hash.type
+ murmur
+ The default implementation of Hash. Currently this can take one of the
+ two values: 'murmur' to select MurmurHash and 'jenkins' to select JenkinsHash.
+
+
+
+
+
+
+
+ ipc.client.idlethreshold
+ 4000
+ Defines the threshold number of connections after which
+ connections will be inspected for idleness.
+
+
+
+
+ ipc.client.kill.max
+ 10
+ Defines the maximum number of clients to disconnect in one go.
+
+
+
+
+ ipc.client.connection.maxidletime
+ 10000
+ The maximum time in msec after which a client will bring down the
+ connection to the server.
+
+
+
+
+ ipc.client.connect.max.retries
+ 10
+ Indicates the number of retries a client will make to establish
+ a server connection.
+
+
+
+
+ ipc.server.listen.queue.size
+ 128
+ Indicates the length of the listen queue for servers accepting
+ client connections.
+
+
+
+
+ ipc.server.tcpnodelay
+ false
+ Turn on/off Nagle's algorithm for the TCP socket connection on
+ the server. Setting to true disables the algorithm and may decrease latency
+ with a cost of more/smaller packets.
+
+
+
+
+ ipc.client.tcpnodelay
+ false
+ Turn on/off Nagle's algorithm for the TCP socket connection on
+ the client. Setting to true disables the algorithm and may decrease latency
+ with a cost of more/smaller packets.
+
+
+
+
+
+
+
+ webinterface.private.actions
+ false
+ If set to true, the web interfaces of JT and NN may contain
+ actions, such as kill job, delete file, etc., that should
+ not be exposed to public. Enable this option if the interfaces
+ are only reachable by those who have the right authorization.
+
+
+
+
+
+
+ hadoop.rpc.socket.factory.class.default
+ org.apache.hadoop.net.StandardSocketFactory
+ Default SocketFactory to use. This parameter is expected to be
+ formatted as "package.FactoryClassName".
+
+
+
+
+ hadoop.rpc.socket.factory.class.ClientProtocol
+
+ SocketFactory to use to connect to a DFS. If null or empty, use
+ hadoop.rpc.socket.class.default. This socket factory is also used by
+ DFSClient to create sockets to DataNodes.
+
+
+
+
+
+
+ hadoop.socks.server
+
+ Address (host:port) of the SOCKS server to be used by the
+ SocksSocketFactory.
+
+
+
+
+
+
+
+ topology.node.switch.mapping.impl
+ org.apache.hadoop.net.ScriptBasedMapping
+ The default implementation of the DNSToSwitchMapping. It
+ invokes a script specified in topology.script.file.name to resolve
+ node names. If the value for topology.script.file.name is not set, the
+ default value of DEFAULT_RACK is returned for all node names.
+
+
+
+
+ net.topology.impl
+ org.apache.hadoop.net.NetworkTopology
+ The default implementation of NetworkTopology which is classic three layer one.
+
+
+
+
+ topology.script.file.name
+
+ The script name that should be invoked to resolve DNS names to
+ NetworkTopology names. Example: the script would take host.foo.bar as an
+ argument, and return /rack1 as the output.
+
+
+
+
+ topology.script.number.args
+ 100
+ The max number of args that the script configured with
+ topology.script.file.name should be run with. Each arg is an
+ IP address.
+
+
+
+
+ hadoop.security.uid.cache.secs
+ 14400
+ NativeIO maintains a cache from UID to UserName. This is
+ the timeout for an entry in that cache.
+
+
+
+
+
+ hadoop.http.authentication.type
+ simple
+
+ Defines authentication used for Oozie HTTP endpoint.
+ Supported values are: simple | kerberos | #AUTHENTICATION_HANDLER_CLASSNAME#
+
+
+
+
+ hadoop.http.authentication.token.validity
+ 36000
+
+ Indicates how long (in seconds) an authentication token is valid before it has
+ to be renewed.
+
+
+
+
+ hadoop.http.authentication.signature.secret.file
+ ${user.home}/hadoop-http-auth-signature-secret
+
+ The signature secret for signing the authentication tokens.
+ If not set a random secret is generated at startup time.
+ The same secret should be used for JT/NN/DN/TT configurations.
+
+
+
+
+ hadoop.http.authentication.cookie.domain
+
+
+ The domain to use for the HTTP cookie that stores the authentication token.
+ In order to authentiation to work correctly across all Hadoop nodes web-consoles
+ the domain must be correctly set.
+ IMPORTANT: when using IP addresses, browsers ignore cookies with domain settings.
+ For this setting to work properly all nodes in the cluster must be configured
+ to generate URLs with hostname.domain names on it.
+
+
+
+
+ hadoop.http.authentication.simple.anonymous.allowed
+ true
+
+ Indicates if anonymous requests are allowed when using 'simple' authentication.
+
+
+
+
+ hadoop.http.authentication.kerberos.principal
+ HTTP/localhost@LOCALHOST
+
+ Indicates the Kerberos principal to be used for HTTP endpoint.
+ The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification.
+
+
+
+
+ hadoop.http.authentication.kerberos.keytab
+ ${user.home}/hadoop.keytab
+
+ Location of the keytab file with the credentials for the principal.
+ Referring to the same keytab file Oozie uses for its Kerberos credentials for Hadoop.
+
+
+
+
+ hadoop.relaxed.worker.version.check
+ false
+
+ By default datanodes refuse to connect to namenodes if their build
+ revision (svn revision) do not match, and tasktrackers refuse to
+ connect to jobtrackers if their build version (version, revision,
+ user, and source checksum) do not match. This option changes the
+ behavior of hadoop workers to only check for a version match (eg
+ "1.0.2") but ignore the other build fields (revision, user, and
+ source checksum).
+
+
+
+
+ hadoop.skip.worker.version.check
+ false
+
+ By default datanodes refuse to connect to namenodes if their build
+ revision (svn revision) do not match, and tasktrackers refuse to
+ connect to jobtrackers if their build version (version, revision,
+ user, and source checksum) do not match. This option changes the
+ behavior of hadoop workers to skip doing a version check at all.
+ This option supersedes the 'hadoop.relaxed.worker.version.check'
+ option.
+
+
+
+
+ hadoop.jetty.logs.serve.aliases
+ true
+
+ Enable/Disable aliases serving from jetty
+
+
+
+
+ ipc.client.fallback-to-simple-auth-allowed
+ false
+
+ When a client is configured to attempt a secure connection, but attempts to
+ connect to an insecure server, that server may instruct the client to
+ switch to SASL SIMPLE (unsecure) authentication. This setting controls
+ whether or not the client will accept this instruction from the server.
+ When false (the default), the client will not allow the fallback to SIMPLE
+ authentication, and will abort the connection.
+
+
+
+
diff --git a/sahara/plugins/spark/resources/hdfs-default.xml b/sahara/plugins/spark/resources/hdfs-default.xml
new file mode 100644
index 00000000..35c888fe
--- /dev/null
+++ b/sahara/plugins/spark/resources/hdfs-default.xml
@@ -0,0 +1,709 @@
+
+
+
+
+
+
+
+
+
+
+ dfs.namenode.logging.level
+ info
+ The logging level for dfs namenode. Other values are "dir"(trac
+e namespace mutations), "block"(trace block under/over replications and block
+creations/deletions), or "all".
+
+
+
+ dfs.namenode.rpc-address
+
+
+ RPC address that handles all clients requests. If empty then we'll get the
+ value from fs.default.name.
+ The value of this property will take the form of hdfs://nn-host1:rpc-port.
+
+
+
+
+ dfs.secondary.http.address
+ 0.0.0.0:50090
+
+ The secondary namenode http server address and port.
+ If the port is 0 then the server will start on a free port.
+
+
+
+
+ dfs.datanode.address
+ 0.0.0.0:50010
+
+ The datanode server address and port for data transfer.
+ If the port is 0 then the server will start on a free port.
+
+
+
+
+ dfs.datanode.http.address
+ 0.0.0.0:50075
+
+ The datanode http server address and port.
+ If the port is 0 then the server will start on a free port.
+
+
+
+
+ dfs.datanode.ipc.address
+ 0.0.0.0:50020
+
+ The datanode ipc server address and port.
+ If the port is 0 then the server will start on a free port.
+
+
+
+
+ dfs.datanode.handler.count
+ 3
+ The number of server threads for the datanode.
+
+
+
+ dfs.http.address
+ 0.0.0.0:50070
+
+ The address and the base port where the dfs namenode web ui will listen on.
+ If the port is 0 then the server will start on a free port.
+
+
+
+
+ dfs.https.enable
+ false
+ Decide if HTTPS(SSL) is supported on HDFS
+
+
+
+
+ dfs.https.need.client.auth
+ false
+ Whether SSL client certificate authentication is required
+
+
+
+
+ dfs.https.server.keystore.resource
+ ssl-server.xml
+ Resource file from which ssl server keystore
+ information will be extracted
+
+
+
+
+ dfs.https.client.keystore.resource
+ ssl-client.xml
+ Resource file from which ssl client keystore
+ information will be extracted
+
+
+
+
+ dfs.datanode.https.address
+ 0.0.0.0:50475
+
+
+
+ dfs.https.address
+ 0.0.0.0:50470
+
+
+
+ dfs.datanode.dns.interface
+ default
+ The name of the Network Interface from which a data node should
+ report its IP address.
+
+
+
+
+ dfs.datanode.dns.nameserver
+ default
+ The host name or IP address of the name server (DNS)
+ which a DataNode should use to determine the host name used by the
+ NameNode for communication and display purposes.
+
+
+
+
+
+
+ dfs.replication.considerLoad
+ true
+ Decide if chooseTarget considers the target's load or not
+
+
+
+ dfs.default.chunk.view.size
+ 32768
+ The number of bytes to view for a file on the browser.
+
+
+
+
+ dfs.datanode.du.reserved
+ 0
+ Reserved space in bytes per volume. Always leave this much space free for non dfs use.
+
+
+
+
+ dfs.namenode.name.dir
+ ${hadoop.tmp.dir}/dfs/name
+ Determines where on the local filesystem the DFS name node
+ should store the name table(fsimage). If this is a comma-delimited list
+ of directories then the name table is replicated in all of the
+ directories, for redundancy.
+
+
+
+ dfs.name.edits.dir
+ ${dfs.name.dir}
+ Determines where on the local filesystem the DFS name node
+ should store the transaction (edits) file. If this is a comma-delimited list
+ of directories then the transaction file is replicated in all of the
+ directories, for redundancy. Default value is same as dfs.name.dir
+
+
+
+
+ dfs.namenode.edits.toleration.length
+ 0
+
+ The length in bytes that namenode is willing to tolerate when the edit log
+ is corrupted. The edit log toleration feature checks the entire edit log.
+ It computes read length (the length of valid data), corruption length and
+ padding length. In case that corruption length is non-zero, the corruption
+ will be tolerated only if the corruption length is less than or equal to
+ the toleration length.
+
+ For disabling edit log toleration feature, set this property to -1. When
+ the feature is disabled, the end of edit log will not be checked. In this
+ case, namenode will startup normally even if the end of edit log is
+ corrupted.
+
+
+
+
+ dfs.web.ugi
+ webuser,webgroup
+ The user account used by the web interface.
+ Syntax: USERNAME,GROUP1,GROUP2, ...
+
+
+
+
+ dfs.permissions
+ true
+
+ If "true", enable permission checking in HDFS.
+ If "false", permission checking is turned off,
+ but all other behavior is unchanged.
+ Switching from one parameter value to the other does not change the mode,
+ owner or group of files or directories.
+
+
+
+
+ dfs.permissions.supergroup
+ supergroup
+ The name of the group of super-users.
+
+
+
+ dfs.block.access.token.enable
+ false
+
+ If "true", access tokens are used as capabilities for accessing datanodes.
+ If "false", no access tokens are checked on accessing datanodes.
+
+
+
+
+ dfs.block.access.key.update.interval
+ 600
+
+ Interval in minutes at which namenode updates its access keys.
+
+
+
+
+ dfs.block.access.token.lifetime
+ 600
+ The lifetime of access tokens in minutes.
+
+
+
+
+ dfs.datanode.data.dir
+ ${hadoop.tmp.dir}/dfs/data
+ Determines where on the local filesystem an DFS data node
+ should store its blocks. If this is a comma-delimited
+ list of directories, then data will be stored in all named
+ directories, typically on different devices.
+ Directories that do not exist are ignored.
+
+
+
+
+ dfs.datanode.data.dir.perm
+ 755
+ Permissions for the directories on on the local filesystem where
+ the DFS data node store its blocks. The permissions can either be octal or
+ symbolic.
+
+
+
+ dfs.replication
+ 3
+ Default block replication.
+ The actual number of replications can be specified when the file is created.
+ The default is used if replication is not specified in create time.
+
+
+
+
+ dfs.replication.max
+ 512
+ Maximal block replication.
+
+
+
+
+ dfs.replication.min
+ 1
+ Minimal block replication.
+
+
+
+
+ dfs.block.size
+ 67108864
+ The default block size for new files.
+
+
+
+ dfs.df.interval
+ 60000
+ Disk usage statistics refresh interval in msec.
+
+
+
+ dfs.client.block.write.retries
+ 3
+ The number of retries for writing blocks to the data nodes,
+ before we signal failure to the application.
+
+
+
+
+ dfs.blockreport.intervalMsec
+ 3600000
+ Determines block reporting interval in milliseconds.
+
+
+
+ dfs.blockreport.initialDelay 0
+ Delay for first block report in seconds.
+
+
+
+ dfs.heartbeat.interval
+ 3
+ Determines datanode heartbeat interval in seconds.
+
+
+
+ dfs.namenode.handler.count
+ 10
+ The number of server threads for the namenode.
+
+
+
+ dfs.safemode.threshold.pct
+ 0.999f
+
+ Specifies the percentage of blocks that should satisfy
+ the minimal replication requirement defined by dfs.replication.min.
+ Values less than or equal to 0 mean not to wait for any particular
+ percentage of blocks before exiting safemode.
+ Values greater than 1 will make safe mode permanent.
+
+
+
+
+ dfs.namenode.safemode.min.datanodes
+ 0
+
+ Specifies the number of datanodes that must be considered alive
+ before the name node exits safemode.
+ Values less than or equal to 0 mean not to take the number of live
+ datanodes into account when deciding whether to remain in safe mode
+ during startup.
+ Values greater than the number of datanodes in the cluster
+ will make safe mode permanent.
+
+
+
+
+ dfs.safemode.extension
+ 30000
+
+ Determines extension of safe mode in milliseconds
+ after the threshold level is reached.
+
+
+
+
+ dfs.balance.bandwidthPerSec
+ 1048576
+
+ Specifies the maximum amount of bandwidth that each datanode
+ can utilize for the balancing purpose in term of
+ the number of bytes per second.
+
+
+
+
+ dfs.hosts
+
+ Names a file that contains a list of hosts that are
+ permitted to connect to the namenode. The full pathname of the file
+ must be specified. If the value is empty, all hosts are
+ permitted.
+
+
+
+ dfs.hosts.exclude
+
+ Names a file that contains a list of hosts that are
+ not permitted to connect to the namenode. The full pathname of the
+ file must be specified. If the value is empty, no hosts are
+ excluded.
+
+
+
+ dfs.max.objects
+ 0
+ The maximum number of files, directories and blocks
+ dfs supports. A value of zero indicates no limit to the number
+ of objects that dfs supports.
+
+
+
+
+ dfs.namenode.decommission.interval
+ 30
+ Namenode periodicity in seconds to check if decommission is
+ complete.
+
+
+
+ dfs.namenode.decommission.nodes.per.interval
+ 5
+ The number of nodes namenode checks if decommission is complete
+ in each dfs.namenode.decommission.interval.
+
+
+
+ dfs.replication.interval
+ 3
+ The periodicity in seconds with which the namenode computes
+ repliaction work for datanodes.
+
+
+
+ dfs.access.time.precision
+ 3600000
+ The access time for HDFS file is precise upto this value.
+ The default value is 1 hour. Setting a value of 0 disables
+ access times for HDFS.
+
+
+
+
+ dfs.support.append
+
+ This option is no longer supported. HBase no longer requires that
+ this option be enabled as sync is now enabled by default. See
+ HADOOP-8230 for additional information.
+
+
+
+
+ dfs.namenode.delegation.key.update-interval
+ 86400000
+ The update interval for master key for delegation tokens
+ in the namenode in milliseconds.
+
+
+
+
+ dfs.namenode.delegation.token.max-lifetime
+ 604800000
+ The maximum lifetime in milliseconds for which a delegation
+ token is valid.
+
+
+
+
+ dfs.namenode.delegation.token.renew-interval
+ 86400000
+ The renewal interval for delegation token in milliseconds.
+
+
+
+
+ dfs.datanode.failed.volumes.tolerated
+ 0
+ The number of volumes that are allowed to
+ fail before a datanode stops offering service. By default
+ any volume failure will cause a datanode to shutdown.
+
+
+
+
+ dfs.datanode.max.xcievers
+ 4096
+ Specifies the maximum number of threads to use for transferring data
+ in and out of the DN.
+
+
+
+
+ dfs.datanode.readahead.bytes
+ 4193404
+
+ While reading block files, if the Hadoop native libraries are available,
+ the datanode can use the posix_fadvise system call to explicitly
+ page data into the operating system buffer cache ahead of the current
+ reader's position. This can improve performance especially when
+ disks are highly contended.
+
+ This configuration specifies the number of bytes ahead of the current
+ read position which the datanode will attempt to read ahead. This
+ feature may be disabled by configuring this property to 0.
+
+ If the native libraries are not available, this configuration has no
+ effect.
+
+
+
+
+ dfs.datanode.drop.cache.behind.reads
+ false
+
+ In some workloads, the data read from HDFS is known to be significantly
+ large enough that it is unlikely to be useful to cache it in the
+ operating system buffer cache. In this case, the DataNode may be
+ configured to automatically purge all data from the buffer cache
+ after it is delivered to the client. This behavior is automatically
+ disabled for workloads which read only short sections of a block
+ (e.g HBase random-IO workloads).
+
+ This may improve performance for some workloads by freeing buffer
+ cache spage usage for more cacheable data.
+
+ If the Hadoop native libraries are not available, this configuration
+ has no effect.
+
+
+
+
+ dfs.datanode.drop.cache.behind.writes
+ false
+
+ In some workloads, the data written to HDFS is known to be significantly
+ large enough that it is unlikely to be useful to cache it in the
+ operating system buffer cache. In this case, the DataNode may be
+ configured to automatically purge all data from the buffer cache
+ after it is written to disk.
+
+ This may improve performance for some workloads by freeing buffer
+ cache spage usage for more cacheable data.
+
+ If the Hadoop native libraries are not available, this configuration
+ has no effect.
+
+
+
+
+ dfs.datanode.sync.behind.writes
+ false
+
+ If this configuration is enabled, the datanode will instruct the
+ operating system to enqueue all written data to the disk immediately
+ after it is written. This differs from the usual OS policy which
+ may wait for up to 30 seconds before triggering writeback.
+
+ This may improve performance for some workloads by smoothing the
+ IO profile for data written to disk.
+
+ If the Hadoop native libraries are not available, this configuration
+ has no effect.
+
+
+
+
+ dfs.client.use.datanode.hostname
+ false
+ Whether clients should use datanode hostnames when
+ connecting to datanodes.
+
+
+
+
+ dfs.datanode.use.datanode.hostname
+ false
+ Whether datanodes should use datanode hostnames when
+ connecting to other datanodes for data transfer.
+
+
+
+
+ dfs.client.local.interfaces
+
+ A comma separated list of network interface names to use
+ for data transfer between the client and datanodes. When creating
+ a connection to read from or write to a datanode, the client
+ chooses one of the specified interfaces at random and binds its
+ socket to the IP of that interface. Individual names may be
+ specified as either an interface name (eg "eth0"), a subinterface
+ name (eg "eth0:0"), or an IP address (which may be specified using
+ CIDR notation to match a range of IPs).
+
+
+
+
+ dfs.image.transfer.bandwidthPerSec
+ 0
+
+ Specifies the maximum amount of bandwidth that can be utilized
+ for image transfer in term of the number of bytes per second.
+ A default value of 0 indicates that throttling is disabled.
+
+
+
+
+ dfs.webhdfs.enabled
+ false
+
+ Enable WebHDFS (REST API) in Namenodes and Datanodes.
+
+
+
+
+ dfs.namenode.kerberos.internal.spnego.principal
+ ${dfs.web.authentication.kerberos.principal}
+
+
+
+ dfs.secondary.namenode.kerberos.internal.spnego.principal
+ ${dfs.web.authentication.kerberos.principal}
+
+
+
+ dfs.namenode.invalidate.work.pct.per.iteration
+ 0.32f
+
+ *Note*: Advanced property. Change with caution.
+ This determines the percentage amount of block
+ invalidations (deletes) to do over a single DN heartbeat
+ deletion command. The final deletion count is determined by applying this
+ percentage to the number of live nodes in the system.
+ The resultant number is the number of blocks from the deletion list
+ chosen for proper invalidation over a single heartbeat of a single DN.
+ Value should be a positive, non-zero percentage in float notation (X.Yf),
+ with 1.0f meaning 100%.
+
+
+
+
+ dfs.namenode.replication.work.multiplier.per.iteration
+ 2
+
+ *Note*: Advanced property. Change with caution.
+ This determines the total amount of block transfers to begin in
+ parallel at a DN, for replication, when such a command list is being
+ sent over a DN heartbeat by the NN. The actual number is obtained by
+ multiplying this multiplier with the total number of live nodes in the
+ cluster. The result number is the number of blocks to begin transfers
+ immediately for, per DN heartbeat. This number can be any positive,
+ non-zero integer.
+
+
+
+
+ dfs.namenode.avoid.read.stale.datanode
+ false
+
+ Indicate whether or not to avoid reading from "stale" datanodes whose
+ heartbeat messages have not been received by the namenode
+ for more than a specified time interval. Stale datanodes will be
+ moved to the end of the node list returned for reading. See
+ dfs.namenode.avoid.write.stale.datanode for a similar setting for writes.
+
+
+
+
+ dfs.namenode.avoid.write.stale.datanode
+ false
+
+ Indicate whether or not to avoid writing to "stale" datanodes whose
+ heartbeat messages have not been received by the namenode
+ for more than a specified time interval. Writes will avoid using
+ stale datanodes unless more than a configured ratio
+ (dfs.namenode.write.stale.datanode.ratio) of datanodes are marked as
+ stale. See dfs.namenode.avoid.read.stale.datanode for a similar setting
+ for reads.
+
+
+
+
+ dfs.namenode.stale.datanode.interval
+ 30000
+
+ Default time interval for marking a datanode as "stale", i.e., if
+ the namenode has not received heartbeat msg from a datanode for
+ more than this time interval, the datanode will be marked and treated
+ as "stale" by default. The stale interval cannot be too small since
+ otherwise this may cause too frequent change of stale states.
+ We thus set a minimum stale interval value (the default value is 3 times
+ of heartbeat interval) and guarantee that the stale interval cannot be less
+ than the minimum value.
+
+
+
+
+ dfs.namenode.write.stale.datanode.ratio
+ 0.5f
+
+ When the ratio of number stale datanodes to total datanodes marked
+ is greater than this ratio, stop avoiding writing to stale nodes so
+ as to prevent causing hotspots.
+
+
+
+
+ dfs.datanode.plugins
+
+ Comma-separated list of datanode plug-ins to be activated.
+
+
+
+
+ dfs.namenode.plugins
+
+ Comma-separated list of namenode plug-ins to be activated.
+
+
+
+
diff --git a/sahara/plugins/spark/resources/spark-env.sh.template b/sahara/plugins/spark/resources/spark-env.sh.template
new file mode 100644
index 00000000..0a35ee7c
--- /dev/null
+++ b/sahara/plugins/spark/resources/spark-env.sh.template
@@ -0,0 +1,21 @@
+#!/usr/bin/env bash
+
+# This file contains environment variables required to run Spark. Copy it as
+# spark-env.sh and edit that to configure Spark for your site.
+#
+# The following variables can be set in this file:
+# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
+# - MESOS_NATIVE_LIBRARY, to point to your libmesos.so if you use Mesos
+# - SPARK_JAVA_OPTS, to set node-specific JVM options for Spark. Note that
+# we recommend setting app-wide options in the application's driver program.
+# Examples of node-specific options : -Dspark.local.dir, GC options
+# Examples of app-wide options : -Dspark.serializer
+#
+# If using the standalone deploy mode, you can also set variables for it here:
+# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
+# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports
+# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
+# - SPARK_WORKER_MEMORY, to set how much memory to use (e.g. 1000m, 2g)
+# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT
+# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
+
diff --git a/sahara/plugins/spark/resources/topology.sh b/sahara/plugins/spark/resources/topology.sh
new file mode 100644
index 00000000..84838752
--- /dev/null
+++ b/sahara/plugins/spark/resources/topology.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+HADOOP_CONF=/etc/hadoop
+
+while [ $# -gt 0 ] ; do
+ nodeArg=$1
+ exec< ${HADOOP_CONF}/topology.data
+ result=""
+ while read line ; do
+ ar=( $line )
+ if [ "${ar[0]}" = "$nodeArg" ] ; then
+ result="${ar[1]}"
+ fi
+ done
+ shift
+ if [ -z "$result" ] ; then
+ echo -n "/default/rack "
+ else
+ echo -n "$result "
+ fi
+done
\ No newline at end of file
diff --git a/sahara/plugins/spark/run_scripts.py b/sahara/plugins/spark/run_scripts.py
new file mode 100644
index 00000000..c62b3a64
--- /dev/null
+++ b/sahara/plugins/spark/run_scripts.py
@@ -0,0 +1,57 @@
+# Copyright (c) 2014 Hoang Do, Phuc Vo, P. Michiardi, D. Venzano
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from sahara.openstack.common import log as logging
+
+
+LOG = logging.getLogger(__name__)
+
+
+def start_processes(remote, *processes):
+ for proc in processes:
+ if proc == "namenode":
+ remote.execute_command("sudo service hadoop-hdfs-namenode start")
+ elif proc == "datanode":
+ remote.execute_command("sudo service hadoop-hdfs-datanode start")
+ else:
+ remote.execute_command("screen -d -m sudo hadoop %s" % proc)
+
+
+def refresh_nodes(remote, service):
+ remote.execute_command("sudo hadoop %s -refreshNodes"
+ % service)
+
+
+def format_namenode(nn_remote):
+ nn_remote.execute_command("sudo -u hdfs hadoop namenode -format")
+
+
+def clean_port_hadoop(nn_remote):
+ nn_remote.execute_command("sudo netstat -tlnp \
+ | awk '/:8020 */ \
+ {split($NF,a,\"/\"); print a[1]}' \
+ | xargs sudo kill -9")
+
+
+def start_spark_master(nn_remote):
+ nn_remote.execute_command("bash /opt/spark/sbin/start-all.sh")
+
+
+def start_spark_slaves(nn_remote):
+ nn_remote.execute_command("bash /opt/spark/sbin/start-slaves.sh")
+
+
+def stop_spark(nn_remote):
+ nn_remote.execute_command("bash /opt/spark/sbin/stop-all.sh")
diff --git a/setup.cfg b/setup.cfg
index 08a4e7a1..72f6543d 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -39,6 +39,7 @@ sahara.cluster.plugins =
vanilla = sahara.plugins.vanilla.plugin:VanillaProvider
hdp = sahara.plugins.hdp.ambariplugin:AmbariPlugin
fake = sahara.plugins.fake.plugin:FakePluginProvider
+ spark = sahara.plugins.spark.plugin:SparkProvider
sahara.infrastructure.engine =
direct = sahara.service.direct_engine:DirectEngine