Merge "Spark on Vanilla Clusters"

This commit is contained in:
Jenkins 2016-09-07 21:19:09 +00:00 committed by Gerrit Code Review
commit e12f021aa8
11 changed files with 324 additions and 5 deletions

View File

@ -12,6 +12,7 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
@ -52,7 +53,7 @@ PORTS_MAP = {
"nodemanager": [8042], "nodemanager": [8042],
"oozie": [11000], "oozie": [11000],
"hiveserver": [9999, 10000] "hiveserver": [9999, 10000]
} }
def configure_cluster(pctx, cluster): def configure_cluster(pctx, cluster):
@ -65,6 +66,70 @@ def configure_cluster(pctx, cluster):
instances = utils.get_instances(cluster) instances = utils.get_instances(cluster)
configure_instances(pctx, instances) configure_instances(pctx, instances)
configure_topology_data(pctx, cluster) configure_topology_data(pctx, cluster)
configure_spark(cluster)
def configure_spark(cluster):
extra = _extract_spark_configs_to_extra(cluster)
_push_spark_configs_to_node(cluster, extra)
def _push_spark_configs_to_node(cluster, extra):
spark_master = vu.get_spark_history_server(cluster)
if spark_master:
_push_spark_configs_to_existing_node(spark_master, cluster, extra)
_push_cleanup_job(spark_master, extra)
with spark_master.remote() as r:
r.execute_command('sudo su - -c "mkdir /tmp/spark-events" hadoop')
def _push_spark_configs_to_existing_node(spark_master, cluster, extra):
sp_home = c_helper.get_spark_home(cluster)
files = {
os.path.join(sp_home,
'conf/spark-env.sh'): extra['sp_master'],
os.path.join(
sp_home,
'conf/spark-defaults.conf'): extra['sp_defaults']
}
with spark_master.remote() as r:
r.write_files_to(files, run_as_root=True)
def _push_cleanup_job(sp_master, extra):
with sp_master.remote() as r:
if extra['job_cleanup']['valid']:
r.write_file_to('/opt/hadoop/tmp-cleanup.sh',
extra['job_cleanup']['script'],
run_as_root=True)
r.execute_command("sudo chmod 755 /opt/hadoop/tmp-cleanup.sh")
cmd = 'sudo sh -c \'echo "%s" > /etc/cron.d/spark-cleanup\''
r.execute_command(cmd % extra['job_cleanup']['cron'])
else:
r.execute_command("sudo rm -f /opt/hadoop/tmp-cleanup.sh")
r.execute_command("sudo rm -f /etc/cron.d/spark-cleanup")
def _extract_spark_configs_to_extra(cluster):
sp_master = utils.get_instance(cluster, "spark history server")
extra = dict()
config_master = ''
if sp_master is not None:
config_master = c_helper.generate_spark_env_configs(cluster)
# Any node that might be used to run spark-submit will need
# these libs for swift integration
config_defaults = c_helper.generate_spark_executor_classpath(cluster)
extra['job_cleanup'] = c_helper.generate_job_cleanup_config(cluster)
extra['sp_master'] = config_master
extra['sp_defaults'] = config_defaults
return extra
def configure_instances(pctx, instances): def configure_instances(pctx, instances):

View File

@ -19,6 +19,8 @@ import six
from sahara import exceptions as ex from sahara import exceptions as ex
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins import provisioning as p from sahara.plugins import provisioning as p
from sahara.plugins import utils
from sahara.utils import files as f
from sahara.utils import types from sahara.utils import types
CONF = cfg.CONF CONF = cfg.CONF
@ -84,6 +86,52 @@ PRIORITY_1_CONFS = [
'yarn.scheduler.minimum-allocation-vcores' 'yarn.scheduler.minimum-allocation-vcores'
] ]
_default_executor_classpath = ":".join(
['/opt/hadoop/share/hadoop/tools/lib/hadoop-openstack-2.7.1.jar'])
SPARK_CONFS = {
'Spark': {
"OPTIONS": [
{
'name': 'Executor extra classpath',
'description': 'Value for spark.executor.extraClassPath'
' in spark-defaults.conf'
' (default: %s)' % _default_executor_classpath,
'default': '%s' % _default_executor_classpath,
'priority': 2,
},
{
'name': 'Spark home',
'description': 'The location of the spark installation'
' (default: /opt/spark)',
'default': '/opt/spark',
'priority': 2,
},
{
'name': 'Minimum cleanup seconds',
'description': 'Job data will never be purged before this'
' amount of time elapses (default: 86400 = 1 day)',
'default': '86400',
'priority': 2,
},
{
'name': 'Maximum cleanup seconds',
'description': 'Job data will always be purged after this'
' amount of time elapses (default: 1209600 = 14 days)',
'default': '1209600',
'priority': 2,
},
{
'name': 'Minimum cleanup megabytes',
'description': 'No job data will be purged unless the total'
' job data exceeds this size (default: 4096 = 4GB)',
'default': '4096',
'priority': 2,
},
]
}
}
# for now we have not so many cluster-wide configs # for now we have not so many cluster-wide configs
# lets consider all of them having high priority # lets consider all of them having high priority
PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS PRIORITY_1_CONFS += CLUSTER_WIDE_CONFS
@ -202,3 +250,58 @@ def is_data_locality_enabled(pctx, cluster):
return False return False
return get_config_value(pctx, ENABLE_DATA_LOCALITY.applicable_target, return get_config_value(pctx, ENABLE_DATA_LOCALITY.applicable_target,
ENABLE_DATA_LOCALITY.name, cluster) ENABLE_DATA_LOCALITY.name, cluster)
def _get_spark_opt_default(opt_name):
for opt in SPARK_CONFS["Spark"]["OPTIONS"]:
if opt_name == opt["name"]:
return opt["default"]
return None
def generate_spark_env_configs(cluster):
configs = []
# point to the hadoop conf dir so that Spark can read things
# like the swift configuration without having to copy core-site
# to /opt/spark/conf
HADOOP_CONF_DIR = '/opt/hadoop/etc/hadoop'
configs.append('HADOOP_CONF_DIR=' + HADOOP_CONF_DIR)
# Hadoop and YARN configs there are in one folder
configs.append('YARN_CONF_DIR=' + HADOOP_CONF_DIR)
return '\n'.join(configs)
def generate_spark_executor_classpath(cluster):
cp = utils.get_config_value_or_default(
"Spark", "Executor extra classpath", cluster)
if cp:
return "spark.executor.extraClassPath " + cp
return "\n"
def generate_job_cleanup_config(cluster):
args = {
'minimum_cleanup_megabytes': utils.get_config_value_or_default(
"Spark", "Minimum cleanup megabytes", cluster),
'minimum_cleanup_seconds': utils.get_config_value_or_default(
"Spark", "Minimum cleanup seconds", cluster),
'maximum_cleanup_seconds': utils.get_config_value_or_default(
"Spark", "Maximum cleanup seconds", cluster)
}
job_conf = {'valid': (args['maximum_cleanup_seconds'] > 0 and
(args['minimum_cleanup_megabytes'] > 0
and args['minimum_cleanup_seconds'] > 0))}
if job_conf['valid']:
job_conf['cron'] = f.get_file_text(
'plugins/vanilla/hadoop2/resources/spark-cleanup.cron'),
job_cleanup_script = f.get_file_text(
'plugins/vanilla/hadoop2/resources/tmp-cleanup.sh.template')
job_conf['script'] = job_cleanup_script.format(**args)
return job_conf
def get_spark_home(cluster):
return utils.get_config_value_or_default("Spark", "Spark home", cluster)

View File

@ -0,0 +1,2 @@
# Cleans up old Spark job directories once per hour.
0 * * * * root /etc/hadoop/tmp-cleanup.sh

View File

@ -0,0 +1,48 @@
#!/bin/sh
MINIMUM_CLEANUP_MEGABYTES={minimum_cleanup_megabytes}
MINIMUM_CLEANUP_SECONDS={minimum_cleanup_seconds}
MAXIMUM_CLEANUP_SECONDS={maximum_cleanup_seconds}
CURRENT_TIMESTAMP=`date +%s`
POSSIBLE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MINIMUM_CLEANUP_SECONDS))
DEFINITE_CLEANUP_THRESHOLD=$(($CURRENT_TIMESTAMP - $MAXIMUM_CLEANUP_SECONDS))
unset MAY_DELETE
unset WILL_DELETE
if [ ! -d /tmp/spark-edp ]
then
exit 0
fi
cd /tmp/spark-edp
for JOB in $(find . -maxdepth 1 -mindepth 1 -type d -printf '%f\n')
do
for EXECUTION in $(find $JOB -maxdepth 1 -mindepth 1 -type d -printf '%f\n')
do
TIMESTAMP=`stat $JOB/$EXECUTION --printf '%Y'`
if [[ $TIMESTAMP -lt $DEFINITE_CLEANUP_THRESHOLD ]]
then
WILL_DELETE="$WILL_DELETE $JOB/$EXECUTION"
else
if [[ $TIMESTAMP -lt $POSSIBLE_CLEANUP_THRESHOLD ]]
then
MAY_DELETE="$MAY_DELETE $JOB/$EXECUTION"
fi
fi
done
done
for EXECUTION in $WILL_DELETE
do
rm -Rf $EXECUTION
done
for EXECUTION in $(ls $MAY_DELETE -trd)
do
if [[ `du -s -BM | grep -o '[0-9]\+'` -le $MINIMUM_CLEANUP_MEGABYTES ]]; then
break
fi
rm -Rf $EXECUTION
done

View File

@ -106,6 +106,16 @@ def start_oozie_process(pctx, instance):
_start_oozie(r) _start_oozie(r)
@cpo.event_wrapper(
True, step=pu.start_process_event_message("Spark History Server"))
def start_spark_history_server(master):
sp_home = c_helper.get_spark_home(master.cluster)
with context.set_current_instance_id(master.instance_id):
with master.remote() as r:
r.execute_command('sudo su - -c "bash %s" hadoop' % os.path.join(
sp_home, 'sbin/start-history-server.sh'))
def format_namenode(instance): def format_namenode(instance):
instance.remote().execute_command( instance.remote().execute_command(
'sudo su - -c "hdfs namenode -format" hadoop') 'sudo su - -c "hdfs namenode -format" hadoop')

View File

@ -71,3 +71,9 @@ def start_hiveserver(pctx, cluster):
hiveserver = vu.get_hiveserver(cluster) hiveserver = vu.get_hiveserver(cluster)
if hiveserver: if hiveserver:
run.start_hiveserver_process(pctx, hiveserver) run.start_hiveserver_process(pctx, hiveserver)
def start_spark(cluster):
spark = vu.get_spark_history_server(cluster)
if spark:
run.start_spark_history_server(spark)

View File

@ -65,6 +65,12 @@ def validate_cluster_creating(pctx, cluster):
raise ex.RequiredServiceMissingException('historyserver', raise ex.RequiredServiceMissingException('historyserver',
required_by='oozie') required_by='oozie')
spark_hist_count = _get_inst_count(cluster, 'spark history server')
if spark_hist_count > 1:
raise ex.InvalidComponentCountException('spark history server',
_('0 or 1'),
spark_hist_count)
rep_factor = cu.get_config_value(pctx, 'HDFS', 'dfs.replication', cluster) rep_factor = cu.get_config_value(pctx, 'HDFS', 'dfs.replication', cluster)
if dn_count < rep_factor: if dn_count < rep_factor:
raise ex.InvalidComponentCountException( raise ex.InvalidComponentCountException(

View File

@ -37,6 +37,10 @@ def get_oozie(cluster):
return u.get_instance(cluster, "oozie") return u.get_instance(cluster, "oozie")
def get_spark_history_server(cluster):
return u.get_instance(cluster, "spark history server")
def get_hiveserver(cluster): def get_hiveserver(cluster):
return u.get_instance(cluster, "hiveserver") return u.get_instance(cluster, "hiveserver")

View File

@ -15,7 +15,9 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
import six
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper from sahara.plugins.vanilla.hadoop2 import config_helper as c_helper
from sahara.utils import xmlutils as x from sahara.utils import xmlutils as x
@ -69,7 +71,6 @@ ENV_CONFS = {
} }
} }
# Initialise plugin Hadoop configurations # Initialise plugin Hadoop configurations
PLUGIN_XML_CONFIGS = c_helper.init_xml_configs(XML_CONFS) PLUGIN_XML_CONFIGS = c_helper.init_xml_configs(XML_CONFS)
PLUGIN_ENV_CONFIGS = c_helper.init_env_configs(ENV_CONFS) PLUGIN_ENV_CONFIGS = c_helper.init_env_configs(ENV_CONFS)
@ -80,9 +81,24 @@ def _init_all_configs():
configs.extend(PLUGIN_XML_CONFIGS) configs.extend(PLUGIN_XML_CONFIGS)
configs.extend(PLUGIN_ENV_CONFIGS) configs.extend(PLUGIN_ENV_CONFIGS)
configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS) configs.extend(c_helper.PLUGIN_GENERAL_CONFIGS)
configs.extend(_get_spark_configs())
return configs return configs
def _get_spark_configs():
spark_configs = []
for service, config_items in six.iteritems(c_helper.SPARK_CONFS):
for item in config_items['OPTIONS']:
cfg = p.Config(name=item["name"],
description=item["description"],
default_value=item["default"],
applicable_target=service,
scope="cluster", is_optional=True,
priority=item["priority"])
spark_configs.append(cfg)
return spark_configs
PLUGIN_CONFIGS = _init_all_configs() PLUGIN_CONFIGS = _init_all_configs()

View File

@ -12,9 +12,15 @@
# implied. # implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import os
from sahara import exceptions as ex
from sahara.i18n import _
from sahara.plugins import utils as plugin_utils
from sahara.plugins.vanilla import confighints_helper as ch_helper from sahara.plugins.vanilla import confighints_helper as ch_helper
from sahara.plugins.vanilla.hadoop2 import edp_engine from sahara.plugins.vanilla.hadoop2 import edp_engine
from sahara.plugins.vanilla import utils as v_utils
from sahara.service.edp.spark import engine as edp_spark_engine
from sahara.utils import edp from sahara.utils import edp
@ -33,3 +39,43 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
return {'job_config': ch_helper.get_possible_pig_config_from( return {'job_config': ch_helper.get_possible_pig_config_from(
'plugins/vanilla/v2_7_1/resources/mapred-default.xml')} 'plugins/vanilla/v2_7_1/resources/mapred-default.xml')}
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type) return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
edp_base_version = "2.7.1"
def __init__(self, cluster):
super(EdpSparkEngine, self).__init__(cluster)
self.master = plugin_utils.get_instance(cluster,
"spark history server")
self.plugin_params["spark-user"] = "sudo -u hadoop "
self.plugin_params["spark-submit"] = os.path.join(
plugin_utils.get_config_value_or_default(
"Spark", "Spark home", self.cluster),
"bin/spark-submit")
self.plugin_params["deploy-mode"] = "cluster"
self.plugin_params["master"] = "yarn"
driver_cp = plugin_utils.get_config_value_or_default(
"Spark", "Executor extra classpath", self.cluster)
self.plugin_params["driver-class-path"] = driver_cp
@staticmethod
def edp_supported(version):
return version >= EdpSparkEngine.edp_base_version
@staticmethod
def job_type_supported(job_type):
return (job_type in
edp_spark_engine.SparkJobEngine.get_supported_job_types())
def validate_job_execution(self, cluster, job, data):
if (not self.edp_supported(cluster.hadoop_version) or
not v_utils.get_spark_history_server(cluster)):
raise ex.InvalidDataException(
_('Spark {base} or higher required to run {type} jobs').format(
base=EdpSparkEngine.edp_base_version, type=job.type))
super(EdpSparkEngine, self).validate_job_execution(cluster, job, data)

View File

@ -57,7 +57,8 @@ class VersionHandler(avm.AbstractVersionHandler):
"HDFS": ["namenode", "datanode", "secondarynamenode"], "HDFS": ["namenode", "datanode", "secondarynamenode"],
"YARN": ["resourcemanager", "nodemanager"], "YARN": ["resourcemanager", "nodemanager"],
"JobFlow": ["oozie"], "JobFlow": ["oozie"],
"Hive": ["hiveserver"] "Hive": ["hiveserver"],
"Spark": ["spark history server"]
} }
def validate(self, cluster): def validate(self, cluster):
@ -86,6 +87,7 @@ class VersionHandler(avm.AbstractVersionHandler):
swift_helper.install_ssl_certs(cluster_utils.get_instances(cluster)) swift_helper.install_ssl_certs(cluster_utils.get_instances(cluster))
self._set_cluster_info(cluster) self._set_cluster_info(cluster)
s_scripts.start_spark(cluster)
def decommission_nodes(self, cluster, instances): def decommission_nodes(self, cluster, instances):
sc.decommission_nodes(self.pctx, cluster, instances) sc.decommission_nodes(self.pctx, cluster, instances)
@ -103,7 +105,7 @@ class VersionHandler(avm.AbstractVersionHandler):
rm = vu.get_resourcemanager(cluster) rm = vu.get_resourcemanager(cluster)
hs = vu.get_historyserver(cluster) hs = vu.get_historyserver(cluster)
oo = vu.get_oozie(cluster) oo = vu.get_oozie(cluster)
sp = vu.get_spark_history_server(cluster)
info = {} info = {}
if rm: if rm:
@ -129,16 +131,27 @@ class VersionHandler(avm.AbstractVersionHandler):
'Web UI': 'http://%s:%s' % (hs.get_ip_or_dns_name(), '19888') 'Web UI': 'http://%s:%s' % (hs.get_ip_or_dns_name(), '19888')
} }
if sp:
info['Apache Spark'] = {
'Spark UI': 'http://%s:%s' % (sp.management_ip, '4040'),
'Spark History Server UI':
'http://%s:%s' % (sp.management_ip, '18080')
}
ctx = context.ctx() ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {'info': info}) conductor.cluster_update(ctx, cluster, {'info': info})
def get_edp_engine(self, cluster, job_type): def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types(): if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster) return edp_engine.EdpOozieEngine(cluster)
if job_type in edp_engine.EdpSparkEngine.get_supported_job_types():
return edp_engine.EdpSparkEngine(cluster)
return None return None
def get_edp_job_types(self): def get_edp_job_types(self):
return edp_engine.EdpOozieEngine.get_supported_job_types() return (edp_engine.EdpOozieEngine.get_supported_job_types() +
edp_engine.EdpSparkEngine.get_supported_job_types())
def get_edp_config_hints(self, job_type): def get_edp_config_hints(self, job_type):
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type) return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)