EDP Spark jobs work with Swift

1) Fixed the path to hadoop-swift.jar - in cloudera
   it's named as hadoop-openstack.jar
2) Fixed the options for launch wrapper with yarn-cluster
   (more details at http://spark.apache.org/docs/latest/running-on-yarn.html
    'Important notes' section).
3) Fixed the issue of swift credentials visibility in Yarn cluster.
4) Fixed related unit-test with the same error in it.

Change-Id: I5e8c72f0e362792f06245b3744a32342abc42389
Closes-bug: 1474128
This commit is contained in:
Oleg Borisenko 2015-07-28 14:23:12 +03:00
parent a5921527de
commit 1bc9ec4656
4 changed files with 16 additions and 10 deletions

View File

@ -161,7 +161,7 @@ AWAIT_MANAGER_STARTING_TIMEOUT = p.Config(
_default_executor_classpath = ":".join( _default_executor_classpath = ":".join(
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar', ['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
'/usr/lib/hadoop/hadoop-swift.jar']) '/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
EXECUTOR_EXTRA_CLASSPATH = p.Config( EXECUTOR_EXTRA_CLASSPATH = p.Config(
'Executor extra classpath', 'Spark', 'cluster', priority=2, 'Executor extra classpath', 'Spark', 'cluster', priority=2,

View File

@ -183,7 +183,7 @@ AWAIT_MANAGER_STARTING_TIMEOUT = p.Config(
_default_executor_classpath = ":".join( _default_executor_classpath = ":".join(
['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar', ['/usr/lib/hadoop/lib/jackson-core-asl-1.8.8.jar',
'/usr/lib/hadoop/hadoop-swift.jar']) '/usr/lib/hadoop-mapreduce/hadoop-openstack.jar'])
EXECUTOR_EXTRA_CLASSPATH = p.Config( EXECUTOR_EXTRA_CLASSPATH = p.Config(
'Executor extra classpath', 'Spark', 'cluster', priority=2, 'Executor extra classpath', 'Spark', 'cluster', priority=2,

View File

@ -241,8 +241,10 @@ class SparkJobEngine(base_engine.JobEngine):
indep_params["wrapper_args"] = "%s %s" % ( indep_params["wrapper_args"] = "%s %s" % (
wrapper_xml, indep_params["job_class"]) wrapper_xml, indep_params["job_class"])
indep_params["addnl_files"] = wrapper_xml
indep_params["addnl_jars"] = ",".join( indep_params["addnl_jars"] = ",".join(
[indep_params["app_jar"]] + paths + builtin_paths) [indep_params["wrapper_jar"]] + paths + builtin_paths)
else: else:
indep_params["addnl_jars"] = ",".join(paths) indep_params["addnl_jars"] = ",".join(paths)
@ -268,10 +270,11 @@ class SparkJobEngine(base_engine.JobEngine):
# embedded if they are non-empty # embedded if they are non-empty
cmd = ( cmd = (
'%(spark-user)s%(spark-submit)s%(driver-class-path)s' '%(spark-user)s%(spark-submit)s%(driver-class-path)s'
' --files %(addnl_files)s'
' --class %(wrapper_class)s%(addnl_jars)s' ' --class %(wrapper_class)s%(addnl_jars)s'
' --master %(master)s' ' --master %(master)s'
' --deploy-mode %(deploy-mode)s' ' --deploy-mode %(deploy-mode)s'
' %(wrapper_jar)s %(wrapper_args)s%(args)s') % dict( ' %(app_jar)s %(wrapper_args)s%(args)s') % dict(
mutual_dict) mutual_dict)
else: else:
cmd = ( cmd = (

View File

@ -556,11 +556,12 @@ class TestSpark(base.SaharaTestCase):
'cd %(workflow_dir)s; ' 'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s ' './launch_command %(spark_user)s%(spark_submit)s '
'--driver-class-path %(driver_cp)s ' '--driver-class-path %(driver_cp)s '
'--files spark.xml '
'--class org.openstack.sahara.edp.SparkWrapper ' '--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar,jar1.jar,jar2.jar ' '--jars wrapper.jar,jar1.jar,jar2.jar '
'--master %(master)s ' '--master %(master)s '
'--deploy-mode %(deploy_mode)s ' '--deploy-mode %(deploy_mode)s '
'wrapper.jar spark.xml org.me.myclass input_arg output_arg ' 'app.jar spark.xml org.me.myclass input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir, '> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user, "spark_user": self.spark_user,
"spark_submit": self.spark_submit, "spark_submit": self.spark_submit,
@ -593,11 +594,12 @@ class TestSpark(base.SaharaTestCase):
'cd %(workflow_dir)s; ' 'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s ' './launch_command %(spark_user)s%(spark_submit)s '
'--driver-class-path %(driver_cp)s ' '--driver-class-path %(driver_cp)s '
'--files spark.xml '
'--class org.openstack.sahara.edp.SparkWrapper ' '--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar ' '--jars wrapper.jar '
'--master %(master)s ' '--master %(master)s '
'--deploy-mode %(deploy_mode)s ' '--deploy-mode %(deploy_mode)s '
'wrapper.jar spark.xml org.me.myclass input_arg output_arg ' 'app.jar spark.xml org.me.myclass input_arg output_arg '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir, '> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user, "spark_user": self.spark_user,
"spark_submit": self.spark_submit, "spark_submit": self.spark_submit,
@ -629,11 +631,12 @@ class TestSpark(base.SaharaTestCase):
'cd %(workflow_dir)s; ' 'cd %(workflow_dir)s; '
'./launch_command %(spark_user)s%(spark_submit)s ' './launch_command %(spark_user)s%(spark_submit)s '
'--driver-class-path %(driver_cp)s ' '--driver-class-path %(driver_cp)s '
'--files spark.xml '
'--class org.openstack.sahara.edp.SparkWrapper ' '--class org.openstack.sahara.edp.SparkWrapper '
'--jars app.jar ' '--jars wrapper.jar '
'--master %(master)s ' '--master %(master)s '
'--deploy-mode %(deploy_mode)s ' '--deploy-mode %(deploy_mode)s '
'wrapper.jar spark.xml org.me.myclass ' 'app.jar spark.xml org.me.myclass '
'> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir, '> /dev/null 2>&1 & echo $!' % {"workflow_dir": self.workflow_dir,
"spark_user": self.spark_user, "spark_user": self.spark_user,
"spark_submit": self.spark_submit, "spark_submit": self.spark_submit,