Spark on Vanilla Clusters
This patch adds ability to run Spark jobs on Vanilla Clusters. bp spark-jobs-for-vanilla-hadoop Change-Id: I8a47bab44391207865d2534afd452e17a55697eb
This commit is contained in:
parent
08ab12af6d
commit
efce9c4df6
|
@ -12,6 +12,7 @@
|
|||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
@ -51,7 +52,7 @@ PORTS_MAP = {
|
|||
"nodemanager": [8042],
|
||||
"oozie": [11000],
|
||||
"hiveserver": [9999, 10000]
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def configure_cluster(pctx, cluster):
|
||||
|
@ -64,6 +65,70 @@ def configure_cluster(pctx, cluster):
|
|||
instances = utils.get_instances(cluster)
|
||||
configure_instances(pctx, instances)
|
||||
configure_topology_data(pctx, cluster)
|
||||
configure_spark(cluster)
|
||||
|
||||
|
||||
def configure_spark(cluster):
|
||||
extra = _extract_spark_configs_to_extra(cluster)
|
||||
_push_spark_configs_to_node(cluster, extra)
|
||||
|
||||
|
||||
def _push_spark_configs_to_node(cluster, extra):
|
||||
spark_master = vu.get_spark_history_server(cluster)
|
||||
if spark_master:
|
||||
_push_spark_configs_to_existing_node(spark_master, cluster, extra)
|
||||
_push_cleanup_job(spark_master, extra)
|
||||
with spark_master.remote() as r:
|
||||
r.execute_command('sudo su - -c "mkdir /tmp/spark-events" hadoop')
|
||||
|
||||
|
||||
def _push_spark_configs_to_existing_node(spark_master, cluster, extra):
|
||||
|
||||
sp_home = c_helper.get_spark_home(cluster)
|
||||
files = {
|
||||
os.path.join(sp_home,
|
||||
'conf/spark-env.sh'): extra['sp_master'],
|
||||
os.path.join(
|
||||
sp_home,
|
||||
'conf/spark-defaults.conf'): extra['sp_defaults']
|
||||
}
|
||||
|
||||
with spark_master.remote() as r:
|
||||
r.write_files_to(files, run_as_root=True)
|
||||
|
||||
|
||||
def _push_cleanup_job(sp_master, extra):
|
||||
with sp_master.remote() as r:
|
||||
if extra['job_cleanup']['valid']:
|
||||
r.write_file_to('/opt/hadoop/tmp-cleanup.sh',
|
||||
extra['job_cleanup']['script'],
|
||||
run_as_root=True)
|
||||
r.execute_command("sudo chmod 755 /opt/hadoop/tmp-cleanup.sh")
|
||||
cmd = 'sudo sh -c \'echo "%s" > /etc/cron.d/spark-cleanup\''
|
||||
r.execute_command(cmd % extra['job_cleanup']['cron'])
|
||||
else:
|
||||
r.execute_command("sudo rm -f /opt/hadoop/tmp-cleanup.sh")
|
||||
r.execute_command("sudo rm -f /etc/cron.d/spark-cleanup")
|
||||
|
||||
|
||||
def _extract_spark_configs_to_extra(cluster):
|
||||
sp_master = utils.get_instance(cluster, "spark history server")
|
||||
|
||||
extra = dict()
|
||||
|
||||
config_master = ''
|
||||
if sp_master is not None:
|
||||
config_master = c_helper.generate_spark_env_configs(cluster)
|
||||
|
||||
# Any node that might be used to run spark-submit will need
|
||||
# these libs for swift integration
|
||||
config_defaults = c_helper.generate_spark_executor_classpath(cluster)
|
||||
|
||||
extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster)
|
||||
extra['sp_master'] = config_master
|
||||
extra['sp_defaults'] = config_defaults
|
||||
|
||||
return extra
|
||||
|
||||
|
||||
def configure_instances(pctx, instances):
|
||||
|
|
|
@ -19,6 +19,8 @@ import six
|
|||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins import provisioning as p
|
||||
from sahara.plugins import utils
|
||||
from sahara.utils import files as f
|
||||
from sahara.utils import types
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -84,6 +86,52 @@ PRIORITY_1_CONFS = [
|
|||
'yarn.scheduler.minimum-allocation-vcores'
|
||||
]
|
||||
|
||||
_default_executor_classpath = ":".join(
|
||||
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.1.jar'])
|
||||
|
||||
SPARK_CONFS = {
|
||||
'Spark': {
|
||||
"OPTIONS": [
|
||||
{
|
||||
'name': 'Executor extra classpath',
|
||||
'description': 'Value for spark.executor.extraClassPath'
|
||||
' in spark-defaults.conf'
|
||||
' (default: %s)' % _default_executor_classpath,
|
||||
'default': '%s' % _default_executor_classpath,
|
||||
'priority': 2,
|
||||
},
|
||||
{
|
||||
'name': 'Spark home',
|
||||
'description': 'The location of the spark installation'
|
||||
' (default: /opt/spark)',
|
||||
'default': '/opt/spark',
|
||||
'priority': 2,
|
||||
},
|
||||
{
|
||||
'name': 'Minimum cleanup seconds',
|
||||
'description': 'Job data will never be purged before this'
|
||||
' amount of time elapses (default: 86400 = 1 day)',
|
||||
'default': '86400',
|
||||
'priority': 2,
|
||||
},
|
||||
{
|
||||
'name': 'Maximum cleanup seconds',
|
||||
'description': 'Job data will always be purged after this'
|
||||
' amount of time elapses (default: 1209600 = 14 days)',
|
||||
'default': '1209600',
|
||||
'priority': 2,
|
||||
},
|
||||
{
|
||||
'name': 'Minimum cleanup megabytes',
|
||||
'description': 'No job data will be purged unless the total'
|
||||
' job data exceeds this size (default: 4096 = 4GB)',
|
||||
'default': '4096',
|
||||
'priority': 2,
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
|
||||
# for now we have not so many cluster-wide configs
|
||||
# lets consider all of them having high priority
|
||||
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
|
||||
|
@ -202,3 +250,58 @@ def is_data_locality_enabled(pctx, cluster):
|
|||
return False
|
||||
return get_config_value(pctx, ENABLE_DATA_LOCALITY.applicable_target,
|
||||
ENABLE_DATA_LOCALITY.name, cluster)
|
||||
|
||||
|
||||
def _get_spark_opt_default(opt_name):
|
||||
for opt in SPARK_CONFS["Spark"]["OPTIONS"]:
|
||||
if opt_name == opt["name"]:
|
||||
return opt["default"]
|
||||
return None
|
||||
|
||||
|
||||
def generate_spark_env_configs(cluster):
|
||||
configs = []
|
||||
|
||||
# point to the hadoop conf dir so that Spark can read things
|
||||
# like the swift configuration without having to copy core-site
|
||||
# to /opt/spark/conf
|
||||
HADOOP_CONF_DIR = '/opt/hadoop/etc/hadoop'
|
||||
configs.append('HADOOP_CONF_DIR=' + HADOOP_CONF_DIR)
|
||||
|
||||
# Hadoop and YARN configs there are in one folder
|
||||
configs.append('YARN_CONF_DIR=' + HADOOP_CONF_DIR)
|
||||
|
||||
return '\n'.join(configs)
|
||||
|
||||
|
||||
def generate_spark_executor_classpath(cluster):
|
||||
cp = utils.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", cluster)
|
||||
if cp:
|
||||
return "spark.executor.extraClassPath " + cp
|
||||
return "\n"
|
||||
|
||||
|
||||
def generate_job_cleanup_config(cluster):
|
||||
args = {
|
||||
'minimum_cleanup_megabytes': utils.get_config_value_or_default(
|
||||
"Spark", "Minimum cleanup megabytes", cluster),
|
||||
'minimum_cleanup_seconds': utils.get_config_value_or_default(
|
||||
"Spark", "Minimum cleanup seconds", cluster),
|
||||
'maximum_cleanup_seconds': utils.get_config_value_or_default(
|
||||
"Spark", "Maximum cleanup seconds", cluster)
|
||||
}
|
||||
job_conf = {'valid': (args['maximum_cleanup_seconds'] > 0 and
|
||||
(args['minimum_cleanup_megabytes'] > 0
|
||||
and args['minimum_cleanup_seconds'] > 0))}
|
||||
if job_conf['valid']:
|
||||
job_conf['cron'] = f.get_file_text(
|
||||
'plugins/vanilla/hadoop2/resources/spark-cleanup.cron'),
|
||||
job_cleanup_script = f.get_file_text(
|
||||
'plugins/vanilla/hadoop2/resources/tmp-cleanup.sh.template')
|
||||
job_conf['script'] = job_cleanup_script.format(**args)
|
||||
return job_conf
|
||||
|
||||
|
||||
def get_spark_home(cluster):
|
||||
return utils.get_config_value_or_default("Spark", "Spark home", cluster)
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
# Cleans up old Spark job directories once per hour.
|
||||
0 * * * * root /etc/hadoop/tmp-cleanup.sh
|
|
@ -0,0 +1,48 @@
|
|||
#!/bin/sh
|
||||
|
||||
MINIMUM_CLEANUP_MEGABYTES={minimum_cleanup_megabytes}
|
||||
MINIMUM_CLEANUP_SECONDS={minimum_cleanup_seconds}
|
||||
MAXIMUM_CLEANUP_SECONDS={maximum_cleanup_seconds}
|
||||
|
||||
CURRENT_TIMESTAMP=`date +%s`
|
||||
POSSIBLE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MINIMUM_CLEANUP_SECONDS))
|
||||
DEFINITE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MAXIMUM_CLEANUP_SECONDS))
|
||||
|
||||
unset MAY_DELETE
|
||||
unset WILL_DELETE
|
||||
|
||||
if [ ! -d /tmp/spark-edp ]
|
||||
then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
cd /tmp/spark-edp
|
||||
for JOB in $(find . -maxdepth 1 -mindepth 1 -type d -printf '%f\n')
|
||||
do
|
||||
for EXECUTION in $(find $JOB -maxdepth 1 -mindepth 1 -type d -printf '%f\n')
|
||||
do
|
||||
TIMESTAMP=`stat $JOB/$EXECUTION --printf '%Y'`
|
||||
if [[ $TIMESTAMP -lt $DEFINITE_CLEANUP_THRESHOLD ]]
|
||||
then
|
||||
WILL_DELETE="$WILL_DELETE $JOB/$EXECUTION"
|
||||
else
|
||||
if [[ $TIMESTAMP -lt $POSSIBLE_CLEANUP_THRESHOLD ]]
|
||||
then
|
||||
MAY_DELETE="$MAY_DELETE $JOB/$EXECUTION"
|
||||
fi
|
||||
fi
|
||||
done
|
||||
done
|
||||
|
||||
for EXECUTION in $WILL_DELETE
|
||||
do
|
||||
rm -Rf $EXECUTION
|
||||
done
|
||||
|
||||
for EXECUTION in $(ls $MAY_DELETE -trd)
|
||||
do
|
||||
if [[ `du -s -BM | grep -o '[0-9]\+'` -le $MINIMUM_CLEANUP_MEGABYTES ]]; then
|
||||
break
|
||||
fi
|
||||
rm -Rf $EXECUTION
|
||||
done
|
|
@ -105,6 +105,16 @@ def start_oozie_process(pctx, instance):
|
|||
_start_oozie(r)
|
||||
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=pu.start_process_event_message("Spark History Server"))
|
||||
def start_spark_history_server(master):
|
||||
sp_home = c_helper.get_spark_home(master.cluster)
|
||||
with context.set_current_instance_id(master.instance_id):
|
||||
with master.remote() as r:
|
||||
r.execute_command('sudo su - -c "bash %s" hadoop' % os.path.join(
|
||||
sp_home, 'sbin/start-history-server.sh'))
|
||||
|
||||
|
||||
def format_namenode(instance):
|
||||
instance.remote().execute_command(
|
||||
'sudo su - -c "hdfs namenode -format" hadoop')
|
||||
|
|
|
@ -71,3 +71,9 @@ def start_hiveserver(pctx, cluster):
|
|||
hiveserver = vu.get_hiveserver(cluster)
|
||||
if hiveserver:
|
||||
run.start_hiveserver_process(pctx, hiveserver)
|
||||
|
||||
|
||||
def start_spark(cluster):
|
||||
spark = vu.get_spark_history_server(cluster)
|
||||
if spark:
|
||||
run.start_spark_history_server(spark)
|
||||
|
|
|
@ -65,6 +65,12 @@ def validate_cluster_creating(pctx, cluster):
|
|||
raise ex.RequiredServiceMissingException('historyserver',
|
||||
required_by='oozie')
|
||||
|
||||
spark_hist_count = _get_inst_count(cluster, 'spark history server')
|
||||
if spark_hist_count > 1:
|
||||
raise ex.InvalidComponentCountException('spark history server',
|
||||
_('0 or 1'),
|
||||
spark_hist_count)
|
||||
|
||||
rep_factor = cu.get_config_value(pctx, 'HDFS', 'dfs.replication', cluster)
|
||||
if dn_count < rep_factor:
|
||||
raise ex.InvalidComponentCountException(
|
||||
|
|
|
@ -37,6 +37,10 @@ def get_oozie(cluster):
|
|||
return u.get_instance(cluster, "oozie")
|
||||
|
||||
|
||||
def get_spark_history_server(cluster):
|
||||
return u.get_instance(cluster, "spark history server")
|
||||
|
||||
|
||||
def get_hiveserver(cluster):
|
||||
return u.get_instance(cluster, "hiveserver")
|
||||
|
||||
|
|
|
@ -15,7 +15,9 @@
|
|||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
|
||||
from sahara.plugins import provisioning as p
|
||||
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
|
||||
from sahara.utils import xmlutils as x
|
||||
|
||||
|
@ -69,7 +71,6 @@ ENV_CONFS = {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
# Initialise plugin Hadoop configurations
|
||||
PLUGIN_XML_CONFIGS = c_helper.init_xml_configs(XML_CONFS)
|
||||
PLUGIN_ENV_CONFIGS = c_helper.init_env_configs(ENV_CONFS)
|
||||
|
@ -80,9 +81,24 @@ def _init_all_configs():
|
|||
configs.extend(PLUGIN_XML_CONFIGS)
|
||||
configs.extend(PLUGIN_ENV_CONFIGS)
|
||||
configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS)
|
||||
configs.extend(_get_spark_configs())
|
||||
return configs
|
||||
|
||||
|
||||
def _get_spark_configs():
|
||||
spark_configs = []
|
||||
for service, config_items in six.iteritems(c_helper.SPARK_CONFS):
|
||||
for item in config_items['OPTIONS']:
|
||||
cfg = p.Config(name=item["name"],
|
||||
description=item["description"],
|
||||
default_value=item["default"],
|
||||
applicable_target=service,
|
||||
scope="cluster", is_optional=True,
|
||||
priority=item["priority"])
|
||||
spark_configs.append(cfg)
|
||||
return spark_configs
|
||||
|
||||
|
||||
PLUGIN_CONFIGS = _init_all_configs()
|
||||
|
||||
|
||||
|
|
|
@ -12,9 +12,15 @@
|
|||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import os
|
||||
|
||||
from sahara import exceptions as ex
|
||||
from sahara.i18n import _
|
||||
from sahara.plugins import utils as plugin_utils
|
||||
from sahara.plugins.vanilla import confighints_helper as ch_helper
|
||||
from sahara.plugins.vanilla.hadoop2 import edp_engine
|
||||
from sahara.plugins.vanilla import utils as v_utils
|
||||
from sahara.service.edp.spark import engine as edp_spark_engine
|
||||
from sahara.utils import edp
|
||||
|
||||
|
||||
|
@ -33,3 +39,43 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
|
|||
return {'job_config': ch_helper.get_possible_pig_config_from(
|
||||
'plugins/vanilla/v2_7_1/resources/mapred-default.xml')}
|
||||
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
|
||||
|
||||
|
||||
class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
|
||||
|
||||
edp_base_version = "2.7.1"
|
||||
|
||||
def __init__(self, cluster):
|
||||
super(EdpSparkEngine, self).__init__(cluster)
|
||||
self.master = plugin_utils.get_instance(cluster,
|
||||
"spark history server")
|
||||
self.plugin_params["spark-user"] = "sudo -u hadoop "
|
||||
self.plugin_params["spark-submit"] = os.path.join(
|
||||
plugin_utils.get_config_value_or_default(
|
||||
"Spark", "Spark home", self.cluster),
|
||||
"bin/spark-submit")
|
||||
self.plugin_params["deploy-mode"] = "cluster"
|
||||
self.plugin_params["master"] = "yarn"
|
||||
|
||||
driver_cp = plugin_utils.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", self.cluster)
|
||||
self.plugin_params["driver-class-path"] = driver_cp
|
||||
|
||||
@staticmethod
|
||||
def edp_supported(version):
|
||||
return version >= EdpSparkEngine.edp_base_version
|
||||
|
||||
@staticmethod
|
||||
def job_type_supported(job_type):
|
||||
return (job_type in
|
||||
edp_spark_engine.SparkJobEngine.get_supported_job_types())
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
if (not self.edp_supported(cluster.hadoop_version) or
|
||||
not v_utils.get_spark_history_server(cluster)):
|
||||
|
||||
raise ex.InvalidDataException(
|
||||
_('Spark {base} or higher required to run {type} jobs').format(
|
||||
base=EdpSparkEngine.edp_base_version, type=job.type))
|
||||
|
||||
super(EdpSparkEngine, self).validate_job_execution(cluster, job, data)
|
||||
|
|
|
@ -57,7 +57,8 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
"HDFS": ["namenode", "datanode", "secondarynamenode"],
|
||||
"YARN": ["resourcemanager", "nodemanager"],
|
||||
"JobFlow": ["oozie"],
|
||||
"Hive": ["hiveserver"]
|
||||
"Hive": ["hiveserver"],
|
||||
"Spark": ["spark history server"]
|
||||
}
|
||||
|
||||
def validate(self, cluster):
|
||||
|
@ -86,6 +87,7 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
swift_helper.install_ssl_certs(cluster_utils.get_instances(cluster))
|
||||
|
||||
self._set_cluster_info(cluster)
|
||||
s_scripts.start_spark(cluster)
|
||||
|
||||
def decommission_nodes(self, cluster, instances):
|
||||
sc.decommission_nodes(self.pctx, cluster, instances)
|
||||
|
@ -103,7 +105,7 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
rm = vu.get_resourcemanager(cluster)
|
||||
hs = vu.get_historyserver(cluster)
|
||||
oo = vu.get_oozie(cluster)
|
||||
|
||||
sp = vu.get_spark_history_server(cluster)
|
||||
info = {}
|
||||
|
||||
if rm:
|
||||
|
@ -129,16 +131,27 @@ class VersionHandler(avm.AbstractVersionHandler):
|
|||
'Web UI': 'http://%s:%s' % (hs.get_ip_or_dns_name(), '19888')
|
||||
}
|
||||
|
||||
if sp:
|
||||
info['Apache Spark'] = {
|
||||
'Spark UI': 'http://%s:%s' % (sp.management_ip, '4040'),
|
||||
'Spark History Server UI':
|
||||
'http://%s:%s' % (sp.management_ip, '18080')
|
||||
}
|
||||
|
||||
ctx = context.ctx()
|
||||
conductor.cluster_update(ctx, cluster, {'info': info})
|
||||
|
||||
def get_edp_engine(self, cluster, job_type):
|
||||
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
|
||||
return edp_engine.EdpOozieEngine(cluster)
|
||||
if job_type in edp_engine.EdpSparkEngine.get_supported_job_types():
|
||||
return edp_engine.EdpSparkEngine(cluster)
|
||||
|
||||
return None
|
||||
|
||||
def get_edp_job_types(self):
|
||||
return edp_engine.EdpOozieEngine.get_supported_job_types()
|
||||
return (edp_engine.EdpOozieEngine.get_supported_job_types() +
|
||||
edp_engine.EdpSparkEngine.get_supported_job_types())
|
||||
|
||||
def get_edp_config_hints(self, job_type):
|
||||
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
|
||||
|
|
Loading…
Reference in New Issue