diff --git a/doc/source/userdoc/edp.rst b/doc/source/userdoc/edp.rst index 65427f66..b053dbaa 100644 --- a/doc/source/userdoc/edp.rst +++ b/doc/source/userdoc/edp.rst @@ -544,6 +544,11 @@ Spark jobs use some special configuration values: accessed. Without this configuration value, swift paths will not be accessible to Spark jobs. The default is **False**. +* ``edp.spark.driver.classpath`` (optional) If set to empty string sahara + will use default classpath for the cluster during job execution. + Otherwise this will override default value for the cluster for particular + job execution. + The **edp-spark** example bundled with sahara contains a Spark program for estimating Pi. diff --git a/sahara/service/edp/spark/engine.py b/sahara/service/edp/spark/engine.py index 7f3a1e92..605f604e 100644 --- a/sahara/service/edp/spark/engine.py +++ b/sahara/service/edp/spark/engine.py @@ -175,7 +175,17 @@ class SparkJobEngine(base_engine.JobEngine): return uploaded_paths, builtin_paths - def _check_driver_class_path(self, param_dict): + def _check_driver_class_path(self, job_configs, param_dict): + overridden = edp.spark_driver_classpath( + job_configs.get('configs', {})) + if overridden: + param_dict['driver-class-path'] = ( + " --driver-class-path " + overridden) + return + if not param_dict.get('wrapper_jar'): + # no need in driver classpath if swift as datasource is not used + param_dict['driver-class-path'] = "" + return cp = param_dict['driver-class-path'] or "" if param_dict['deploy-mode'] == 'client' and not ( cp.startswith(":") or cp.endswith(":")): @@ -252,7 +262,7 @@ class SparkJobEngine(base_engine.JobEngine): # Handle driver classpath. Because of the way the hadoop # configuration is handled in the wrapper class, using # wrapper_xml, the working directory must be on the classpath - self._check_driver_class_path(mutual_dict) + self._check_driver_class_path(updated_job_configs, mutual_dict) if mutual_dict.get("wrapper_jar"): # Substrings which may be empty have spaces @@ -267,7 +277,7 @@ class SparkJobEngine(base_engine.JobEngine): mutual_dict) else: cmd = ( - '%(spark-user)s%(spark-submit)s' + '%(spark-user)s%(spark-submit)s%(driver-class-path)s' ' --class %(job_class)s%(addnl_jars)s' ' --master %(master)s' ' --deploy-mode %(deploy-mode)s' diff --git a/sahara/tests/unit/service/edp/spark/base.py b/sahara/tests/unit/service/edp/spark/base.py index d8591039..503bbd93 100644 --- a/sahara/tests/unit/service/edp/spark/base.py +++ b/sahara/tests/unit/service/edp/spark/base.py @@ -668,3 +668,35 @@ class TestSpark(base.SaharaTestCase): self._setup_run_job(master_instance, job_configs, files) configurer.assert_called_with("cluster", data_source) + + @mock.patch('sahara.service.edp.hdfs_helper.configure_cluster_for_hdfs') + @mock.patch('sahara.service.edp.job_utils.resolve_data_source_references') + def test_overridden_driver_classpath(self, resolver, configurer): + job_configs = { + 'configs': {"edp.java.main_class": "org.me.myclass", + 'edp.spark.driver.classpath': "my-classpath.jar"}, + } + + files = {'jars': ["app.jar"]} + + data_source = mock.Mock() + data_source.type = 'hdfs' + resolver.return_value = ([data_source], job_configs) + + master_instance = self._make_master_instance() + self._setup_run_job(master_instance, job_configs, files) + + # check that overridden value was applied + master_instance.execute_command.assert_called_with( + 'cd %(workflow_dir)s; ' + './launch_command %(spark_user)s%(spark_submit)s ' + '--driver-class-path my-classpath.jar ' + '--class org.me.myclass ' + '--master %(master)s ' + '--deploy-mode %(deploy_mode)s ' + 'app.jar ' + '> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir, + "spark_user": self.spark_user, + "spark_submit": self.spark_submit, + "master": self.master, + "deploy_mode": self.deploy_mode}) diff --git a/sahara/utils/edp.py b/sahara/utils/edp.py index aee8c6f0..14fd7878 100644 --- a/sahara/utils/edp.py +++ b/sahara/utils/edp.py @@ -75,7 +75,7 @@ JOB_TYPES_ACCEPTABLE_CONFIGS = { } ADAPT_FOR_OOZIE = 'edp.java.adapt_for_oozie' - +SPARK_DRIVER_CLASSPATH = 'edp.spark.driver.classpath' ADAPT_SPARK_FOR_SWIFT = 'edp.spark.adapt_for_swift' @@ -122,6 +122,11 @@ def is_adapt_spark_for_swift_enabled(configs): return configs.get(ADAPT_SPARK_FOR_SWIFT, False) +def spark_driver_classpath(configs): + # Return None in case when you need to use default value + return configs.get(SPARK_DRIVER_CLASSPATH) + + def get_builtin_binaries(job, configs): if job.type == JOB_TYPE_JAVA: if is_adapt_for_oozie_enabled(configs):