Ensure working dir is on driver class path for Spark/Swift
For Spark/Swift integration, we use a wrapper class to set up the hadoop environment. For this to succeed, the current working directory must be on the classpath. Newer versions of Spark have changed how the default classpath is generated, so Sahara must ensure explicitly that the working dir will be included. Change-Id: I6680bf8736cada93e87821ef37de3c3b4202ead4 Close-Bug: #1486544
This commit is contained in:
committed by
Vitaly Gridnev
parent
1ad842f79d
commit
1018a540a5
@@ -86,8 +86,6 @@ class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
|
||||
self.plugin_params["master"] = "yarn-cluster"
|
||||
driver_cp = u.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", self.cluster)
|
||||
if driver_cp:
|
||||
driver_cp = " --driver-class-path " + driver_cp
|
||||
self.plugin_params["driver-class-path"] = driver_cp
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -89,8 +89,6 @@ class EdpSparkEngine(edp_spark_engine.SparkJobEngine):
|
||||
self.plugin_params["master"] = "yarn-cluster"
|
||||
driver_cp = u.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", self.cluster)
|
||||
if driver_cp:
|
||||
driver_cp = " --driver-class-path " + driver_cp
|
||||
self.plugin_params["driver-class-path"] = driver_cp
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -43,8 +43,6 @@ class EdpEngine(edp_engine.SparkJobEngine):
|
||||
self.plugin_params["master"] = ('spark://%(host)s:' + port_str)
|
||||
driver_cp = plugin_utils.get_config_value_or_default(
|
||||
"Spark", "Executor extra classpath", self.cluster)
|
||||
if driver_cp:
|
||||
driver_cp = " --driver-class-path " + driver_cp
|
||||
self.plugin_params["driver-class-path"] = driver_cp
|
||||
|
||||
@staticmethod
|
||||
|
||||
@@ -50,7 +50,8 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
self.plugin_params = {"master": "",
|
||||
"spark-user": "",
|
||||
"deploy-mode": "",
|
||||
"spark-submit": ""
|
||||
"spark-submit": "",
|
||||
"driver-class-path": "",
|
||||
}
|
||||
|
||||
def _get_pid_and_inst_id(self, job_id):
|
||||
@@ -174,6 +175,12 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
|
||||
return uploaded_paths, builtin_paths
|
||||
|
||||
def _check_driver_class_path(self, param_dict):
|
||||
cp = param_dict['driver-class-path'] or ""
|
||||
if not (cp.startswith(":") or cp.endswith(":")):
|
||||
cp += ":"
|
||||
param_dict['driver-class-path'] = " --driver-class-path " + cp
|
||||
|
||||
def cancel_job(self, job_execution):
|
||||
pid, instance = self._get_instance_if_running(job_execution)
|
||||
if instance is not None:
|
||||
@@ -279,6 +286,12 @@ class SparkJobEngine(base_engine.JobEngine):
|
||||
|
||||
mutual_dict = self.plugin_params.copy()
|
||||
mutual_dict.update(indep_params)
|
||||
|
||||
# Handle driver classpath. Because of the way the hadoop
|
||||
# configuration is handled in the wrapper class, using
|
||||
# wrapper_xml, the working directory must be on the classpath
|
||||
self._check_driver_class_path(mutual_dict)
|
||||
|
||||
if mutual_dict.get("wrapper_jar"):
|
||||
# Substrings which may be empty have spaces
|
||||
# embedded if they are non-empty
|
||||
|
||||
@@ -42,7 +42,7 @@ class TestSpark(base.SaharaTestCase):
|
||||
self.spark_pid = "12345"
|
||||
self.spark_home = "/opt/spark"
|
||||
self.workflow_dir = "/wfdir"
|
||||
self.driver_cp = "/usr/lib/hadoop/hadoop-swift.jar"
|
||||
self.driver_cp = "/usr/lib/hadoop/hadoop-swift.jar:"
|
||||
|
||||
def test_get_pid_and_inst_id(self):
|
||||
'''Test parsing of job ids
|
||||
|
||||
Reference in New Issue
Block a user