Merge "Follow the argument order specified in spark-submit help"

This commit is contained in:
Jenkins 2015-01-27 11:15:53 +00:00 committed by Gerrit Code Review
commit a8f3363777
2 changed files with 20 additions and 10 deletions

View File

@ -159,15 +159,17 @@ class SparkJobEngine(base_engine.JobEngine):
# TODO(tmckay): we need to clean up wf_dirs on long running clusters # TODO(tmckay): we need to clean up wf_dirs on long running clusters
# TODO(tmckay): probably allow for general options to spark-submit # TODO(tmckay): probably allow for general options to spark-submit
args = " ".join(updated_job_configs.get('args', [])) args = " ".join(updated_job_configs.get('args', []))
if args:
args = " " + args
# The redirects of stdout and stderr will preserve output in the wf_dir # The redirects of stdout and stderr will preserve output in the wf_dir
cmd = "%s %s --class %s %s --master spark://%s:%s %s" % ( cmd = "%s --class %s%s --master spark://%s:%s %s%s" % (
spark_submit, spark_submit,
app_jar,
job_class, job_class,
additional_jars, additional_jars,
host, host,
port, port,
app_jar,
args) args)
job_execution = conductor.job_execution_get(ctx, job_execution.id) job_execution = conductor.job_execution_get(ctx, job_execution.id)

View File

@ -356,9 +356,9 @@ class TestSpark(base.SaharaTestCase):
# Check the command # Check the command
remote_instance.execute_command.assert_called_with( remote_instance.execute_command.assert_called_with(
'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit app.jar ' 'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit '
'--class org.me.myclass --jars jar1.jar,jar2.jar ' '--class org.me.myclass --jars jar1.jar,jar2.jar '
'--master spark://master:7077 input_arg output_arg ' '--master spark://master:7077 app.jar input_arg output_arg '
'> /dev/null 2>&1 & echo $!') '> /dev/null 2>&1 & echo $!')
# Check result here # Check result here
@ -366,14 +366,22 @@ class TestSpark(base.SaharaTestCase):
edp.JOB_STATUS_RUNNING, edp.JOB_STATUS_RUNNING,
{"spark-path": "/wfdir"})) {"spark-path": "/wfdir"}))
# Run again without support jars. Note the extra space # Run again without arguments
# after 'myclass', this is from a %s with empty string job_exec.job_configs['args'] = []
status = eng.run_job(job_exec)
remote_instance.execute_command.assert_called_with(
'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit '
'--class org.me.myclass --jars jar1.jar,jar2.jar '
'--master spark://master:7077 app.jar '
'> /dev/null 2>&1 & echo $!')
# Run again without support jars.
upload_job_files.return_value = ["/wfdir/app.jar"] upload_job_files.return_value = ["/wfdir/app.jar"]
status = eng.run_job(job_exec) status = eng.run_job(job_exec)
remote_instance.execute_command.assert_called_with( remote_instance.execute_command.assert_called_with(
'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit app.jar ' 'cd /wfdir; ./launch_command /opt/spark/bin/spark-submit '
'--class org.me.myclass ' '--class org.me.myclass '
'--master spark://master:7077 input_arg output_arg ' '--master spark://master:7077 app.jar '
'> /dev/null 2>&1 & echo $!') '> /dev/null 2>&1 & echo $!')
# run again with non-zero result, should raise EDPError # run again with non-zero result, should raise EDPError