Merge "Job execution cancel timeout"
This commit is contained in:
commit
86caac15e5
@ -127,7 +127,13 @@ def run_job(job_execution_id):
|
|||||||
LOG.warning(
|
LOG.warning(
|
||||||
_LW("Can't run job execution (reason: {reason})").format(
|
_LW("Can't run job execution (reason: {reason})").format(
|
||||||
reason=ex))
|
reason=ex))
|
||||||
|
|
||||||
|
job_execution = conductor.job_execution_get(
|
||||||
|
context.ctx(), job_execution_id)
|
||||||
|
|
||||||
|
if job_execution.engine_job_id is not None:
|
||||||
cancel_job(job_execution_id)
|
cancel_job(job_execution_id)
|
||||||
|
|
||||||
conductor.job_execution_update(
|
conductor.job_execution_update(
|
||||||
context.ctx(), job_execution_id,
|
context.ctx(), job_execution_id,
|
||||||
{'info': {'status': edp.JOB_STATUS_FAILED},
|
{'info': {'status': edp.JOB_STATUS_FAILED},
|
||||||
|
@ -184,6 +184,12 @@ class OozieJobEngine(base_engine.JobEngine):
|
|||||||
job_execution = conductor.job_execution_get(ctx, job_execution.id)
|
job_execution = conductor.job_execution_get(ctx, job_execution.id)
|
||||||
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
|
if job_execution.info['status'] == edp.JOB_STATUS_TOBEKILLED:
|
||||||
return (None, edp.JOB_STATUS_KILLED, None)
|
return (None, edp.JOB_STATUS_KILLED, None)
|
||||||
|
|
||||||
|
conductor.job_execution_update(
|
||||||
|
context.ctx(), job_execution.id,
|
||||||
|
{'info': {'status': edp.JOB_STATUS_READYTORUN},
|
||||||
|
'engine_job_id': oozie_job_id})
|
||||||
|
|
||||||
client.run_job(job_execution, oozie_job_id)
|
client.run_job(job_execution, oozie_job_id)
|
||||||
try:
|
try:
|
||||||
status = client.get_job_status(job_execution,
|
status = client.get_job_status(job_execution,
|
||||||
|
@ -488,13 +488,40 @@ class TestJobManager(base.SaharaWithDbTestCase):
|
|||||||
self.assertEqual(['test'], su.inject_swift_url_suffix(['test']))
|
self.assertEqual(['test'], su.inject_swift_url_suffix(['test']))
|
||||||
|
|
||||||
@mock.patch('sahara.conductor.API.job_execution_update')
|
@mock.patch('sahara.conductor.API.job_execution_update')
|
||||||
|
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||||
@mock.patch('sahara.service.edp.job_manager._run_job')
|
@mock.patch('sahara.service.edp.job_manager._run_job')
|
||||||
@mock.patch('sahara.service.edp.job_manager.cancel_job')
|
@mock.patch('sahara.service.edp.job_manager.cancel_job')
|
||||||
def test_run_job_handles_exceptions(self, canceljob, runjob, job_ex_upd):
|
def test_run_job_handles_exceptions(self, canceljob, runjob,
|
||||||
|
job_ex_get, job_ex_upd):
|
||||||
runjob.side_effect = ex.SwiftClientException("Unauthorised")
|
runjob.side_effect = ex.SwiftClientException("Unauthorised")
|
||||||
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
|
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
|
||||||
|
|
||||||
|
job_exec.engine_job_id = None
|
||||||
|
job_ex_get.return_value = job_exec
|
||||||
|
|
||||||
job_manager.run_job(job_exec.id)
|
job_manager.run_job(job_exec.id)
|
||||||
|
|
||||||
|
self.assertEqual(1, job_ex_get.call_count)
|
||||||
|
self.assertEqual(1, job_ex_upd.call_count)
|
||||||
|
|
||||||
|
new_status = job_ex_upd.call_args[0][2]["info"]["status"]
|
||||||
|
self.assertEqual(edp.JOB_STATUS_FAILED, new_status)
|
||||||
|
self.assertEqual(0, canceljob.call_count)
|
||||||
|
|
||||||
|
@mock.patch('sahara.conductor.API.job_execution_update')
|
||||||
|
@mock.patch('sahara.conductor.API.job_execution_get')
|
||||||
|
@mock.patch('sahara.service.edp.job_manager._run_job')
|
||||||
|
@mock.patch('sahara.service.edp.job_manager.cancel_job')
|
||||||
|
def test_run_job_handles_exceptions_with_run_job(self, canceljob, runjob,
|
||||||
|
job_ex_get, job_ex_upd):
|
||||||
|
runjob.side_effect = ex.OozieException("run_job failed")
|
||||||
|
job, job_exec = u.create_job_exec(edp.JOB_TYPE_PIG)
|
||||||
|
job_exec.engine_job_id = "fake_oozie_id"
|
||||||
|
job_ex_get.return_value = job_exec
|
||||||
|
|
||||||
|
job_manager.run_job(job_exec.id)
|
||||||
|
|
||||||
|
self.assertEqual(1, job_ex_get.call_count)
|
||||||
self.assertEqual(1, job_ex_upd.call_count)
|
self.assertEqual(1, job_ex_upd.call_count)
|
||||||
|
|
||||||
new_status = job_ex_upd.call_args[0][2]["info"]["status"]
|
new_status = job_ex_upd.call_args[0][2]["info"]["status"]
|
||||||
|
@ -24,6 +24,7 @@ JOB_STATUS_DONEWITHERROR = 'DONEWITHERROR'
|
|||||||
JOB_STATUS_FAILED = 'FAILED'
|
JOB_STATUS_FAILED = 'FAILED'
|
||||||
JOB_STATUS_KILLED = 'KILLED'
|
JOB_STATUS_KILLED = 'KILLED'
|
||||||
JOB_STATUS_PENDING = 'PENDING'
|
JOB_STATUS_PENDING = 'PENDING'
|
||||||
|
JOB_STATUS_READYTORUN = 'READYTORUN'
|
||||||
JOB_STATUS_RUNNING = 'RUNNING'
|
JOB_STATUS_RUNNING = 'RUNNING'
|
||||||
JOB_STATUS_SUCCEEDED = 'SUCCEEDED'
|
JOB_STATUS_SUCCEEDED = 'SUCCEEDED'
|
||||||
JOB_STATUS_TOBEKILLED = 'TOBEKILLED'
|
JOB_STATUS_TOBEKILLED = 'TOBEKILLED'
|
||||||
|
Loading…
Reference in New Issue
Block a user