Merge "Ensure working dir is on driver class path for Spark/Swift"
This commit is contained in:
commit
99d7629e9e
@ -86,8 +86,6 @@ class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
|
||||
self.plugin_params["master"] = "yarn-cluster"
|
||||
driver_cp = u.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", self.cluster)
|
||||
if driver_cp:
|
||||
driver_cp = " --driver-class-path " + driver_cp
|
||||
self.plugin_params["driver-class-path"] = driver_cp
|
||||
|
||||
@staticmethod
|
||||
|
@ -89,8 +89,6 @@ class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
|
||||
self.plugin_params["master"] = "yarn-cluster"
|
||||
driver_cp = u.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", self.cluster)
|
||||
if driver_cp:
|
||||
driver_cp = " --driver-class-path " + driver_cp
|
||||
self.plugin_params["driver-class-path"] = driver_cp
|
||||
|
||||
@staticmethod
|
||||
|
@ -43,8 +43,6 @@ class EdpEngine(edp_engine.SparkJobEngine):
|
||||
self.plugin_params["master"] = ('spark://%(host)s:' + port_str)
|
||||
driver_cp = plugin_utils.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", self.cluster)
|
||||
if driver_cp:
|
||||
driver_cp = " --driver-class-path " + driver_cp
|
||||
self.plugin_params["driver-class-path"] = driver_cp
|
||||
|
||||
@staticmethod
|
||||
|
@ -50,7 +50,8 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
self.plugin_params = {"master": "",
|
||||
"spark-user": "",
|
||||
"deploy-mode": "",
|
||||
"spark-submit": ""
|
||||
"spark-submit": "",
|
||||
"driver-class-path": "",
|
||||
}
|
||||
|
||||
def _get_pid_and_inst_id(self, job_id):
|
||||
@ -174,6 +175,12 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
|
||||
return uploaded_paths, builtin_paths
|
||||
|
||||
def _check_driver_class_path(self, param_dict):
|
||||
cp = param_dict['driver-class-path'] or ""
|
||||
if not (cp.startswith(":") or cp.endswith(":")):
|
||||
cp += ":"
|
||||
param_dict['driver-class-path'] = " --driver-class-path " + cp
|
||||
|
||||
def cancel_job(self, job_execution):
|
||||
pid, instance = self._get_instance_if_running(job_execution)
|
||||
if instance is not None:
|
||||
@ -281,6 +288,12 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
|
||||
mutual_dict = self.plugin_params.copy()
|
||||
mutual_dict.update(indep_params)
|
||||
|
||||
# Handle driver classpath. Because of the way the hadoop
|
||||
# configuration is handled in the wrapper class, using
|
||||
# wrapper_xml, the working directory must be on the classpath
|
||||
self._check_driver_class_path(mutual_dict)
|
||||
|
||||
if mutual_dict.get("wrapper_jar"):
|
||||
# Substrings which may be empty have spaces
|
||||
# embedded if they are non-empty
|
||||
|
@ -42,7 +42,7 @@ class TestSpark(base.SaharaTestCase):
|
||||
self.spark_pid = "12345"
|
||||
self.spark_home = "/opt/spark"
|
||||
self.workflow_dir = "/wfdir"
|
||||
self.driver_cp = "/usr/lib/hadoop/hadoop-swift.jar"
|
||||
self.driver_cp = "/usr/lib/hadoop/hadoop-swift.jar:"
|
||||
|
||||
def test_get_pid_and_inst_id(self):
|
||||
'''Test parsing of job ids
|
||||
|
Loading…
Reference in New Issue
Block a user