From efce9c4df62a1d2a757ea63f92097d8c2e23f46d Mon Sep 17 00:00:00 2001 From: Michael Lelyakin Date: Wed, 27 Jul 2016 14:23:28 +0000 Subject: [PATCH] Spark on Vanilla Clusters This patch adds ability to run Spark jobs on Vanilla Clusters. bp spark-jobs-for-vanilla-hadoop Change-Id: I8a47bab44391207865d2534afd452e17a55697eb --- sahara/plugins/vanilla/hadoop2/config.py | 67 +++++++++++- .../plugins/vanilla/hadoop2/config_helper.py | 103 ++++++++++++++++++ .../hadoop2/resources/spark-cleanup.cron | 2 + .../hadoop2/resources/tmp-cleanup.sh.template | 48 ++++++++ sahara/plugins/vanilla/hadoop2/run_scripts.py | 10 ++ .../vanilla/hadoop2/starting_scripts.py | 6 + sahara/plugins/vanilla/hadoop2/validation.py | 6 + sahara/plugins/vanilla/utils.py | 4 + .../plugins/vanilla/v2_7_1/config_helper.py | 18 ++- sahara/plugins/vanilla/v2_7_1/edp_engine.py | 46 ++++++++ .../plugins/vanilla/v2_7_1/versionhandler.py | 19 +++- 11 files changed, 324 insertions(+), 5 deletions(-) create mode 100644 sahara/plugins/vanilla/hadoop2/resources/spark-cleanup.cron create mode 100644 sahara/plugins/vanilla/hadoop2/resources/tmp-cleanup.sh.template diff --git a/sahara/plugins/vanilla/hadoop2/config.py b/sahara/plugins/vanilla/hadoop2/config.py index 64e7f52..4302f3d 100644 --- a/sahara/plugins/vanilla/hadoop2/config.py +++ b/sahara/plugins/vanilla/hadoop2/config.py @@ -12,6 +12,7 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import os from oslo_config import cfg from oslo_log import log as logging @@ -51,7 +52,7 @@ PORTS_MAP = { "nodemanager": [8042], "oozie": [11000], "hiveserver": [9999, 10000] - } +} def configure_cluster(pctx, cluster): @@ -64,6 +65,70 @@ def configure_cluster(pctx, cluster): instances = utils.get_instances(cluster) configure_instances(pctx, instances) configure_topology_data(pctx, cluster) + configure_spark(cluster) + + +def configure_spark(cluster): + extra = _extract_spark_configs_to_extra(cluster) + _push_spark_configs_to_node(cluster, extra) + + +def _push_spark_configs_to_node(cluster, extra): + spark_master = vu.get_spark_history_server(cluster) + if spark_master: + _push_spark_configs_to_existing_node(spark_master, cluster, extra) + _push_cleanup_job(spark_master, extra) + with spark_master.remote() as r: + r.execute_command('sudo su - -c "mkdir /tmp/spark-events" hadoop') + + +def _push_spark_configs_to_existing_node(spark_master, cluster, extra): + + sp_home = c_helper.get_spark_home(cluster) + files = { + os.path.join(sp_home, + 'conf/spark-env.sh'): extra['sp_master'], + os.path.join( + sp_home, + 'conf/spark-defaults.conf'): extra['sp_defaults'] + } + + with spark_master.remote() as r: + r.write_files_to(files, run_as_root=True) + + +def _push_cleanup_job(sp_master, extra): + with sp_master.remote() as r: + if extra['job_cleanup']['valid']: + r.write_file_to('/opt/hadoop/tmp-cleanup.sh', + extra['job_cleanup']['script'], + run_as_root=True) + r.execute_command("sudo chmod 755 /opt/hadoop/tmp-cleanup.sh") + cmd = 'sudo sh -c \'echo "%s" > /etc/cron.d/spark-cleanup\'' + r.execute_command(cmd % extra['job_cleanup']['cron']) + else: + r.execute_command("sudo rm -f /opt/hadoop/tmp-cleanup.sh") + r.execute_command("sudo rm -f /etc/cron.d/spark-cleanup") + + +def _extract_spark_configs_to_extra(cluster): + sp_master = utils.get_instance(cluster, "spark history server") + + extra = dict() + + config_master = '' + if sp_master is not None: + config_master = c_helper.generate_spark_env_configs(cluster) + + # Any node that might be used to run spark-submit will need + # these libs for swift integration + config_defaults = c_helper.generate_spark_executor_classpath(cluster) + + extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster) + extra['sp_master'] = config_master + extra['sp_defaults'] = config_defaults + + return extra def configure_instances(pctx, instances): diff --git a/sahara/plugins/vanilla/hadoop2/config_helper.py b/sahara/plugins/vanilla/hadoop2/config_helper.py index 67b183b..d51b15d 100644 --- a/sahara/plugins/vanilla/hadoop2/config_helper.py +++ b/sahara/plugins/vanilla/hadoop2/config_helper.py @@ -19,6 +19,8 @@ import six from sahara import exceptions as ex from sahara.i18n import _ from sahara.plugins import provisioning as p +from sahara.plugins import utils +from sahara.utils import files as f from sahara.utils import types CONF = cfg.CONF @@ -84,6 +86,52 @@ PRIORITY_1_CONFS = [ 'yarn.scheduler.minimum-allocation-vcores' ] +_default_executor_classpath = ":".join( + ['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.1.jar']) + +SPARK_CONFS = { + 'Spark': { + "OPTIONS": [ + { + 'name': 'Executor extra classpath', + 'description': 'Value for spark.executor.extraClassPath' + ' in spark-defaults.conf' + ' (default: %s)' % _default_executor_classpath, + 'default': '%s' % _default_executor_classpath, + 'priority': 2, + }, + { + 'name': 'Spark home', + 'description': 'The location of the spark installation' + ' (default: /opt/spark)', + 'default': '/opt/spark', + 'priority': 2, + }, + { + 'name': 'Minimum cleanup seconds', + 'description': 'Job data will never be purged before this' + ' amount of time elapses (default: 86400 = 1 day)', + 'default': '86400', + 'priority': 2, + }, + { + 'name': 'Maximum cleanup seconds', + 'description': 'Job data will always be purged after this' + ' amount of time elapses (default: 1209600 = 14 days)', + 'default': '1209600', + 'priority': 2, + }, + { + 'name': 'Minimum cleanup megabytes', + 'description': 'No job data will be purged unless the total' + ' job data exceeds this size (default: 4096 = 4GB)', + 'default': '4096', + 'priority': 2, + }, + ] + } +} + # for now we have not so many cluster-wide configs # lets consider all of them having high priority PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS @@ -202,3 +250,58 @@ def is_data_locality_enabled(pctx, cluster): return False return get_config_value(pctx, ENABLE_DATA_LOCALITY.applicable_target, ENABLE_DATA_LOCALITY.name, cluster) + + +def _get_spark_opt_default(opt_name): + for opt in SPARK_CONFS["Spark"]["OPTIONS"]: + if opt_name == opt["name"]: + return opt["default"] + return None + + +def generate_spark_env_configs(cluster): + configs = [] + + # point to the hadoop conf dir so that Spark can read things + # like the swift configuration without having to copy core-site + # to /opt/spark/conf + HADOOP_CONF_DIR = '/opt/hadoop/etc/hadoop' + configs.append('HADOOP_CONF_DIR=' + HADOOP_CONF_DIR) + + # Hadoop and YARN configs there are in one folder + configs.append('YARN_CONF_DIR=' + HADOOP_CONF_DIR) + + return '\n'.join(configs) + + +def generate_spark_executor_classpath(cluster): + cp = utils.get_config_value_or_default( + "Spark", "Executor extra classpath", cluster) + if cp: + return "spark.executor.extraClassPath " + cp + return "\n" + + +def generate_job_cleanup_config(cluster): + args = { + 'minimum_cleanup_megabytes': utils.get_config_value_or_default( + "Spark", "Minimum cleanup megabytes", cluster), + 'minimum_cleanup_seconds': utils.get_config_value_or_default( + "Spark", "Minimum cleanup seconds", cluster), + 'maximum_cleanup_seconds': utils.get_config_value_or_default( + "Spark", "Maximum cleanup seconds", cluster) + } + job_conf = {'valid': (args['maximum_cleanup_seconds'] > 0 and + (args['minimum_cleanup_megabytes'] > 0 + and args['minimum_cleanup_seconds'] > 0))} + if job_conf['valid']: + job_conf['cron'] = f.get_file_text( + 'plugins/vanilla/hadoop2/resources/spark-cleanup.cron'), + job_cleanup_script = f.get_file_text( + 'plugins/vanilla/hadoop2/resources/tmp-cleanup.sh.template') + job_conf['script'] = job_cleanup_script.format(**args) + return job_conf + + +def get_spark_home(cluster): + return utils.get_config_value_or_default("Spark", "Spark home", cluster) diff --git a/sahara/plugins/vanilla/hadoop2/resources/spark-cleanup.cron b/sahara/plugins/vanilla/hadoop2/resources/spark-cleanup.cron new file mode 100644 index 0000000..e182a0e --- /dev/null +++ b/sahara/plugins/vanilla/hadoop2/resources/spark-cleanup.cron @@ -0,0 +1,2 @@ +# Cleans up old Spark job directories once per hour. +0 * * * * root /etc/hadoop/tmp-cleanup.sh \ No newline at end of file diff --git a/sahara/plugins/vanilla/hadoop2/resources/tmp-cleanup.sh.template b/sahara/plugins/vanilla/hadoop2/resources/tmp-cleanup.sh.template new file mode 100644 index 0000000..e715719 --- /dev/null +++ b/sahara/plugins/vanilla/hadoop2/resources/tmp-cleanup.sh.template @@ -0,0 +1,48 @@ +#!/bin/sh + +MINIMUM_CLEANUP_MEGABYTES={minimum_cleanup_megabytes} +MINIMUM_CLEANUP_SECONDS={minimum_cleanup_seconds} +MAXIMUM_CLEANUP_SECONDS={maximum_cleanup_seconds} + +CURRENT_TIMESTAMP=`date +%s` +POSSIBLE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MINIMUM_CLEANUP_SECONDS)) +DEFINITE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MAXIMUM_CLEANUP_SECONDS)) + +unset MAY_DELETE +unset WILL_DELETE + +if [ ! -d /tmp/spark-edp ] +then + exit 0 +fi + +cd /tmp/spark-edp +for JOB in $(find . -maxdepth 1 -mindepth 1 -type d -printf '%f\n') +do + for EXECUTION in $(find $JOB -maxdepth 1 -mindepth 1 -type d -printf '%f\n') + do + TIMESTAMP=`stat $JOB/$EXECUTION --printf '%Y'` + if [[ $TIMESTAMP -lt $DEFINITE_CLEANUP_THRESHOLD ]] + then + WILL_DELETE="$WILL_DELETE $JOB/$EXECUTION" + else + if [[ $TIMESTAMP -lt $POSSIBLE_CLEANUP_THRESHOLD ]] + then + MAY_DELETE="$MAY_DELETE $JOB/$EXECUTION" + fi + fi + done +done + +for EXECUTION in $WILL_DELETE +do + rm -Rf $EXECUTION +done + +for EXECUTION in $(ls $MAY_DELETE -trd) +do + if [[ `du -s -BM | grep -o '[0-9]\+'` -le $MINIMUM_CLEANUP_MEGABYTES ]]; then + break + fi + rm -Rf $EXECUTION +done diff --git a/sahara/plugins/vanilla/hadoop2/run_scripts.py b/sahara/plugins/vanilla/hadoop2/run_scripts.py index f41e3c6..fdc48c8 100644 --- a/sahara/plugins/vanilla/hadoop2/run_scripts.py +++ b/sahara/plugins/vanilla/hadoop2/run_scripts.py @@ -105,6 +105,16 @@ def start_oozie_process(pctx, instance): _start_oozie(r) +@cpo.event_wrapper( + True, step=pu.start_process_event_message("Spark History Server")) +def start_spark_history_server(master): + sp_home = c_helper.get_spark_home(master.cluster) + with context.set_current_instance_id(master.instance_id): + with master.remote() as r: + r.execute_command('sudo su - -c "bash %s" hadoop' % os.path.join( + sp_home, 'sbin/start-history-server.sh')) + + def format_namenode(instance): instance.remote().execute_command( 'sudo su - -c "hdfs namenode -format" hadoop') diff --git a/sahara/plugins/vanilla/hadoop2/starting_scripts.py b/sahara/plugins/vanilla/hadoop2/starting_scripts.py index c292238..409c56e 100644 --- a/sahara/plugins/vanilla/hadoop2/starting_scripts.py +++ b/sahara/plugins/vanilla/hadoop2/starting_scripts.py @@ -71,3 +71,9 @@ def start_hiveserver(pctx, cluster): hiveserver = vu.get_hiveserver(cluster) if hiveserver: run.start_hiveserver_process(pctx, hiveserver) + + +def start_spark(cluster): + spark = vu.get_spark_history_server(cluster) + if spark: + run.start_spark_history_server(spark) diff --git a/sahara/plugins/vanilla/hadoop2/validation.py b/sahara/plugins/vanilla/hadoop2/validation.py index 2df19b2..37e950b 100644 --- a/sahara/plugins/vanilla/hadoop2/validation.py +++ b/sahara/plugins/vanilla/hadoop2/validation.py @@ -65,6 +65,12 @@ def validate_cluster_creating(pctx, cluster): raise ex.RequiredServiceMissingException('historyserver', required_by='oozie') + spark_hist_count = _get_inst_count(cluster, 'spark history server') + if spark_hist_count > 1: + raise ex.InvalidComponentCountException('spark history server', + _('0 or 1'), + spark_hist_count) + rep_factor = cu.get_config_value(pctx, 'HDFS', 'dfs.replication', cluster) if dn_count < rep_factor: raise ex.InvalidComponentCountException( diff --git a/sahara/plugins/vanilla/utils.py b/sahara/plugins/vanilla/utils.py index e933871..ed4a580 100644 --- a/sahara/plugins/vanilla/utils.py +++ b/sahara/plugins/vanilla/utils.py @@ -37,6 +37,10 @@ def get_oozie(cluster): return u.get_instance(cluster, "oozie") +def get_spark_history_server(cluster): + return u.get_instance(cluster, "spark history server") + + def get_hiveserver(cluster): return u.get_instance(cluster, "hiveserver") diff --git a/sahara/plugins/vanilla/v2_7_1/config_helper.py b/sahara/plugins/vanilla/v2_7_1/config_helper.py index 9b21324..e5021c5 100644 --- a/sahara/plugins/vanilla/v2_7_1/config_helper.py +++ b/sahara/plugins/vanilla/v2_7_1/config_helper.py @@ -15,7 +15,9 @@ from oslo_config import cfg from oslo_log import log as logging +import six +from sahara.plugins import provisioning as p from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper from sahara.utils import xmlutils as x @@ -69,7 +71,6 @@ ENV_CONFS = { } } - # Initialise plugin Hadoop configurations PLUGIN_XML_CONFIGS = c_helper.init_xml_configs(XML_CONFS) PLUGIN_ENV_CONFIGS = c_helper.init_env_configs(ENV_CONFS) @@ -80,9 +81,24 @@ def _init_all_configs(): configs.extend(PLUGIN_XML_CONFIGS) configs.extend(PLUGIN_ENV_CONFIGS) configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS) + configs.extend(_get_spark_configs()) return configs +def _get_spark_configs(): + spark_configs = [] + for service, config_items in six.iteritems(c_helper.SPARK_CONFS): + for item in config_items['OPTIONS']: + cfg = p.Config(name=item["name"], + description=item["description"], + default_value=item["default"], + applicable_target=service, + scope="cluster", is_optional=True, + priority=item["priority"]) + spark_configs.append(cfg) + return spark_configs + + PLUGIN_CONFIGS = _init_all_configs() diff --git a/sahara/plugins/vanilla/v2_7_1/edp_engine.py b/sahara/plugins/vanilla/v2_7_1/edp_engine.py index 6d2f656..5ca1dd2 100644 --- a/sahara/plugins/vanilla/v2_7_1/edp_engine.py +++ b/sahara/plugins/vanilla/v2_7_1/edp_engine.py @@ -12,9 +12,15 @@ # implied. # See the License for the specific language governing permissions and # limitations under the License. +import os +from sahara import exceptions as ex +from sahara.i18n import _ +from sahara.plugins import utils as plugin_utils from sahara.plugins.vanilla import confighints_helper as ch_helper from sahara.plugins.vanilla.hadoop2 import edp_engine +from sahara.plugins.vanilla import utils as v_utils +from sahara.service.edp.spark import engine as edp_spark_engine from sahara.utils import edp @@ -33,3 +39,43 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine): return {'job_config': ch_helper.get_possible_pig_config_from( 'plugins/vanilla/v2_7_1/resources/mapred-default.xml')} return edp_engine.EdpOozieEngine.get_possible_job_config(job_type) + + +class EdpSparkEngine(edp_spark_engine.SparkJobEngine): + + edp_base_version = "2.7.1" + + def __init__(self, cluster): + super(EdpSparkEngine, self).__init__(cluster) + self.master = plugin_utils.get_instance(cluster, + "spark history server") + self.plugin_params["spark-user"] = "sudo -u hadoop " + self.plugin_params["spark-submit"] = os.path.join( + plugin_utils.get_config_value_or_default( + "Spark", "Spark home", self.cluster), + "bin/spark-submit") + self.plugin_params["deploy-mode"] = "cluster" + self.plugin_params["master"] = "yarn" + + driver_cp = plugin_utils.get_config_value_or_default( + "Spark", "Executor extra classpath", self.cluster) + self.plugin_params["driver-class-path"] = driver_cp + + @staticmethod + def edp_supported(version): + return version >= EdpSparkEngine.edp_base_version + + @staticmethod + def job_type_supported(job_type): + return (job_type in + edp_spark_engine.SparkJobEngine.get_supported_job_types()) + + def validate_job_execution(self, cluster, job, data): + if (not self.edp_supported(cluster.hadoop_version) or + not v_utils.get_spark_history_server(cluster)): + + raise ex.InvalidDataException( + _('Spark {base} or higher required to run {type} jobs').format( + base=EdpSparkEngine.edp_base_version, type=job.type)) + + super(EdpSparkEngine, self).validate_job_execution(cluster, job, data) diff --git a/sahara/plugins/vanilla/v2_7_1/versionhandler.py b/sahara/plugins/vanilla/v2_7_1/versionhandler.py index 62576b6..63749dd 100644 --- a/sahara/plugins/vanilla/v2_7_1/versionhandler.py +++ b/sahara/plugins/vanilla/v2_7_1/versionhandler.py @@ -57,7 +57,8 @@ class VersionHandler(avm.AbstractVersionHandler): "HDFS": ["namenode", "datanode", "secondarynamenode"], "YARN": ["resourcemanager", "nodemanager"], "JobFlow": ["oozie"], - "Hive": ["hiveserver"] + "Hive": ["hiveserver"], + "Spark": ["spark history server"] } def validate(self, cluster): @@ -86,6 +87,7 @@ class VersionHandler(avm.AbstractVersionHandler): swift_helper.install_ssl_certs(cluster_utils.get_instances(cluster)) self._set_cluster_info(cluster) + s_scripts.start_spark(cluster) def decommission_nodes(self, cluster, instances): sc.decommission_nodes(self.pctx, cluster, instances) @@ -103,7 +105,7 @@ class VersionHandler(avm.AbstractVersionHandler): rm = vu.get_resourcemanager(cluster) hs = vu.get_historyserver(cluster) oo = vu.get_oozie(cluster) - + sp = vu.get_spark_history_server(cluster) info = {} if rm: @@ -129,16 +131,27 @@ class VersionHandler(avm.AbstractVersionHandler): 'Web UI': 'http://%s:%s' % (hs.get_ip_or_dns_name(), '19888') } + if sp: + info['Apache Spark'] = { + 'Spark UI': 'http://%s:%s' % (sp.management_ip, '4040'), + 'Spark History Server UI': + 'http://%s:%s' % (sp.management_ip, '18080') + } + ctx = context.ctx() conductor.cluster_update(ctx, cluster, {'info': info}) def get_edp_engine(self, cluster, job_type): if job_type in edp_engine.EdpOozieEngine.get_supported_job_types(): return edp_engine.EdpOozieEngine(cluster) + if job_type in edp_engine.EdpSparkEngine.get_supported_job_types(): + return edp_engine.EdpSparkEngine(cluster) + return None def get_edp_job_types(self): - return edp_engine.EdpOozieEngine.get_supported_job_types() + return (edp_engine.EdpOozieEngine.get_supported_job_types() + + edp_engine.EdpSparkEngine.get_supported_job_types()) def get_edp_config_hints(self, job_type): return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)