Merge "Support overriding of driver classpath in Spark jobs"
This commit is contained in:
commit
1efebf3638
@ -548,6 +548,11 @@ Spark jobs use some special configuration values:
|
|||||||
accessed. Without this configuration value, swift paths will not be
|
accessed. Without this configuration value, swift paths will not be
|
||||||
accessible to Spark jobs. The default is **False**.
|
accessible to Spark jobs. The default is **False**.
|
||||||
|
|
||||||
|
* ``edp.spark.driver.classpath`` (optional) If set to empty string sahara
|
||||||
|
will use default classpath for the cluster during job execution.
|
||||||
|
Otherwise this will override default value for the cluster for particular
|
||||||
|
job execution.
|
||||||
|
|
||||||
The **edp-spark** example bundled with sahara contains a Spark program for
|
The **edp-spark** example bundled with sahara contains a Spark program for
|
||||||
estimating Pi.
|
estimating Pi.
|
||||||
|
|
||||||
|
@ -175,7 +175,17 @@ class SparkJobEngine(base_engine.JobEngine):
|
|||||||
|
|
||||||
return uploaded_paths, builtin_paths
|
return uploaded_paths, builtin_paths
|
||||||
|
|
||||||
def _check_driver_class_path(self, param_dict):
|
def _check_driver_class_path(self, job_configs, param_dict):
|
||||||
|
overridden = edp.spark_driver_classpath(
|
||||||
|
job_configs.get('configs', {}))
|
||||||
|
if overridden:
|
||||||
|
param_dict['driver-class-path'] = (
|
||||||
|
" --driver-class-path " + overridden)
|
||||||
|
return
|
||||||
|
if not param_dict.get('wrapper_jar'):
|
||||||
|
# no need in driver classpath if swift as datasource is not used
|
||||||
|
param_dict['driver-class-path'] = ""
|
||||||
|
return
|
||||||
cp = param_dict['driver-class-path'] or ""
|
cp = param_dict['driver-class-path'] or ""
|
||||||
if param_dict['deploy-mode'] == 'client' and not (
|
if param_dict['deploy-mode'] == 'client' and not (
|
||||||
cp.startswith(":") or cp.endswith(":")):
|
cp.startswith(":") or cp.endswith(":")):
|
||||||
@ -252,7 +262,7 @@ class SparkJobEngine(base_engine.JobEngine):
|
|||||||
# Handle driver classpath. Because of the way the hadoop
|
# Handle driver classpath. Because of the way the hadoop
|
||||||
# configuration is handled in the wrapper class, using
|
# configuration is handled in the wrapper class, using
|
||||||
# wrapper_xml, the working directory must be on the classpath
|
# wrapper_xml, the working directory must be on the classpath
|
||||||
self._check_driver_class_path(mutual_dict)
|
self._check_driver_class_path(updated_job_configs, mutual_dict)
|
||||||
|
|
||||||
if mutual_dict.get("wrapper_jar"):
|
if mutual_dict.get("wrapper_jar"):
|
||||||
# Substrings which may be empty have spaces
|
# Substrings which may be empty have spaces
|
||||||
@ -267,7 +277,7 @@ class SparkJobEngine(base_engine.JobEngine):
|
|||||||
mutual_dict)
|
mutual_dict)
|
||||||
else:
|
else:
|
||||||
cmd = (
|
cmd = (
|
||||||
'%(spark-user)s%(spark-submit)s'
|
'%(spark-user)s%(spark-submit)s%(driver-class-path)s'
|
||||||
' --class %(job_class)s%(addnl_jars)s'
|
' --class %(job_class)s%(addnl_jars)s'
|
||||||
' --master %(master)s'
|
' --master %(master)s'
|
||||||
' --deploy-mode %(deploy-mode)s'
|
' --deploy-mode %(deploy-mode)s'
|
||||||
|
@ -668,3 +668,35 @@ class TestSpark(base.SaharaTestCase):
|
|||||||
self._setup_run_job(master_instance, job_configs, files)
|
self._setup_run_job(master_instance, job_configs, files)
|
||||||
|
|
||||||
configurer.assert_called_with("cluster", data_source)
|
configurer.assert_called_with("cluster", data_source)
|
||||||
|
|
||||||
|
@mock.patch('sahara.service.edp.hdfs_helper.configure_cluster_for_hdfs')
|
||||||
|
@mock.patch('sahara.service.edp.job_utils.resolve_data_source_references')
|
||||||
|
def test_overridden_driver_classpath(self, resolver, configurer):
|
||||||
|
job_configs = {
|
||||||
|
'configs': {"edp.java.main_class": "org.me.myclass",
|
||||||
|
'edp.spark.driver.classpath': "my-classpath.jar"},
|
||||||
|
}
|
||||||
|
|
||||||
|
files = {'jars': ["app.jar"]}
|
||||||
|
|
||||||
|
data_source = mock.Mock()
|
||||||
|
data_source.type = 'hdfs'
|
||||||
|
resolver.return_value = ([data_source], job_configs)
|
||||||
|
|
||||||
|
master_instance = self._make_master_instance()
|
||||||
|
self._setup_run_job(master_instance, job_configs, files)
|
||||||
|
|
||||||
|
# check that overridden value was applied
|
||||||
|
master_instance.execute_command.assert_called_with(
|
||||||
|
'cd %(workflow_dir)s; '
|
||||||
|
'./launch_command %(spark_user)s%(spark_submit)s '
|
||||||
|
'--driver-class-path my-classpath.jar '
|
||||||
|
'--class org.me.myclass '
|
||||||
|
'--master %(master)s '
|
||||||
|
'--deploy-mode %(deploy_mode)s '
|
||||||
|
'app.jar '
|
||||||
|
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
|
||||||
|
"spark_user": self.spark_user,
|
||||||
|
"spark_submit": self.spark_submit,
|
||||||
|
"master": self.master,
|
||||||
|
"deploy_mode": self.deploy_mode})
|
||||||
|
@ -75,7 +75,7 @@ JOB_TYPES_ACCEPTABLE_CONFIGS = {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'
|
ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie'
|
||||||
|
SPARK_DRIVER_CLASSPATH = 'edp.spark.driver.classpath'
|
||||||
ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift'
|
ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift'
|
||||||
|
|
||||||
|
|
||||||
@ -122,6 +122,11 @@ def is_adapt_spark_for_swift_enabled(configs):
|
|||||||
return configs.get(ADAPT_SPARK_FOR_SWIFT, False)
|
return configs.get(ADAPT_SPARK_FOR_SWIFT, False)
|
||||||
|
|
||||||
|
|
||||||
|
def spark_driver_classpath(configs):
|
||||||
|
# Return None in case when you need to use default value
|
||||||
|
return configs.get(SPARK_DRIVER_CLASSPATH)
|
||||||
|
|
||||||
|
|
||||||
def get_builtin_binaries(job, configs):
|
def get_builtin_binaries(job, configs):
|
||||||
if job.type == JOB_TYPE_JAVA:
|
if job.type == JOB_TYPE_JAVA:
|
||||||
if is_adapt_for_oozie_enabled(configs):
|
if is_adapt_for_oozie_enabled(configs):
|
||||||
|
Loading…
Reference in New Issue
Block a user