Support overriding of driver classpath in Spark jobs
It might be useful to have ability to override default value for cluster of driver-class-path option for the particular job execution. The change add new config option: "edp.spark.driver.classpath" that can override driver-class-path for users' needs. Implements blueprint: spark-override-classpath Change-Id: I94055c2ccb70c953620b62ed18c27895c3588327
This commit is contained in:
parent
d7e019b6bc
commit
14cece1ead
@ -544,6 +544,11 @@ Spark jobs use some special configuration values:
|
||||
accessed. Without this configuration value, swift paths will not be
|
||||
accessible to Spark jobs. The default is **False**.
|
||||
|
||||
* ``edp.spark.driver.classpath`` (optional) If set to empty string sahara
|
||||
will use default classpath for the cluster during job execution.
|
||||
Otherwise this will override default value for the cluster for particular
|
||||
job execution.
|
||||
|
||||
The **edp-spark** example bundled with sahara contains a Spark program for
|
||||
estimating Pi.
|
||||
|
||||
|
@ -175,7 +175,17 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
|
||||
return uploaded_paths, builtin_paths
|
||||
|
||||
def _check_driver_class_path(self, param_dict):
|
||||
def _check_driver_class_path(self, job_configs, param_dict):
|
||||
overridden = edp.spark_driver_classpath(
|
||||
job_configs.get('configs', {}))
|
||||
if overridden:
|
||||
param_dict['driver-class-path'] = (
|
||||
" --driver-class-path " + overridden)
|
||||
return
|
||||
if not param_dict.get('wrapper_jar'):
|
||||
# no need in driver classpath if swift as datasource is not used
|
||||
param_dict['driver-class-path'] = ""
|
||||
return
|
||||
cp = param_dict['driver-class-path'] or ""
|
||||
if param_dict['deploy-mode'] == 'client' and not (
|
||||
cp.startswith(":") or cp.endswith(":")):
|
||||
@ -252,7 +262,7 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
# Handle driver classpath. Because of the way the hadoop
|
||||
# configuration is handled in the wrapper class, using
|
||||
# wrapper_xml, the working directory must be on the classpath
|
||||
self._check_driver_class_path(mutual_dict)
|
||||
self._check_driver_class_path(updated_job_configs, mutual_dict)
|
||||
|
||||
if mutual_dict.get("wrapper_jar"):
|
||||
# Substrings which may be empty have spaces
|
||||
@ -267,7 +277,7 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
mutual_dict)
|
||||
else:
|
||||
cmd = (
|
||||
'%(spark-user)s%(spark-submit)s'
|
||||
'%(spark-user)s%(spark-submit)s%(driver-class-path)s'
|
||||
' --class %(job_class)s%(addnl_jars)s'
|
||||
' --master %(master)s'
|
||||
' --deploy-mode %(deploy-mode)s'
|
||||
|
@ -668,3 +668,35 @@ class TestSpark(base.SaharaTestCase):
|
||||
self._setup_run_job(master_instance, job_configs, files)
|
||||
|
||||
configurer.assert_called_with("cluster", data_source)
|
||||
|
||||
@mock.patch('sahara.service.edp.hdfs_helper.configure_cluster_for_hdfs')
|
||||
@mock.patch('sahara.service.edp.job_utils.resolve_data_source_references')
|
||||
def test_overridden_driver_classpath(self, resolver, configurer):
|
||||
job_configs = {
|
||||
'configs': {"edp.java.main_class": "org.me.myclass",
|
||||
'edp.spark.driver.classpath': "my-classpath.jar"},
|
||||
}
|
||||
|
||||
files = {'jars': ["app.jar"]}
|
||||
|
||||
data_source = mock.Mock()
|
||||
data_source.type = 'hdfs'
|
||||
resolver.return_value = ([data_source], job_configs)
|
||||
|
||||
master_instance = self._make_master_instance()
|
||||
self._setup_run_job(master_instance, job_configs, files)
|
||||
|
||||
# check that overridden value was applied
|
||||
master_instance.execute_command.assert_called_with(
|
||||
'cd %(workflow_dir)s; '
|
||||
'./launch_command %(spark_user)s%(spark_submit)s '
|
||||
'--driver-class-path my-classpath.jar '
|
||||
'--class org.me.myclass '
|
||||
'--master %(master)s '
|
||||
'--deploy-mode %(deploy_mode)s '
|
||||
'app.jar '
|
||||
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
|
||||
"spark_user": self.spark_user,
|
||||
"spark_submit": self.spark_submit,
|
||||
"master": self.master,
|
||||
"deploy_mode": self.deploy_mode})
|
||||
|
@ -75,7 +75,7 @@ JOB_TYPES_ACCEPTABLE_CONFIGS = {
|
||||
}
|
||||
|
||||
ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'
|
||||
|
||||
SPARK_DRIVER_CLASSPATH = 'edp.spark.driver.classpath'
|
||||
ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift'
|
||||
|
||||
|
||||
@ -122,6 +122,11 @@ def is_adapt_spark_for_swift_enabled(configs):
|
||||
return configs.get(ADAPT_SPARK_FOR_SWIFT, False)
|
||||
|
||||
|
||||
def spark_driver_classpath(configs):
|
||||
# Return None in case when you need to use default value
|
||||
return configs.get(SPARK_DRIVER_CLASSPATH)
|
||||
|
||||
|
||||
def get_builtin_binaries(job, configs):
|
||||
if job.type == JOB_TYPE_JAVA:
|
||||
if is_adapt_for_oozie_enabled(configs):
|
||||
|
Loading…
Reference in New Issue
Block a user