Fix the job task failure

Change-Id: Ief9eec6833c0379f9c04fdd4c5440d9984247f79
Closes-Bug: #1726594
This commit is contained in:
Lingxian Kong 2017-10-24 09:57:51 +13:00
parent 3c8746f395
commit 7154c9bd06
3 changed files with 16 additions and 14 deletions

View File

@ -66,8 +66,8 @@ def conditional_update(model, values, expected_values, **kwargs):
return IMPL.conditional_update(model, values, expected_values, **kwargs) return IMPL.conditional_update(model, values, expected_values, **kwargs)
def get_function(id): def get_function(id, insecure=False):
return IMPL.get_function(id) return IMPL.get_function(id, insecure=insecure)
def get_functions(limit=None, marker=None, sort_keys=None, def get_functions(limit=None, marker=None, sort_keys=None,

View File

@ -216,8 +216,8 @@ def conditional_update(model, values, expected_values, insecure=False,
@db_base.session_aware() @db_base.session_aware()
def get_function(id, session=None): def get_function(id, insecure=False, session=None):
function = _get_db_object_by_id(models.Function, id) function = _get_db_object_by_id(models.Function, id, insecure=insecure)
if not function: if not function:
raise exc.DBEntityNotFoundError("Function not found [id=%s]" % id) raise exc.DBEntityNotFoundError("Function not found [id=%s]" % id)

View File

@ -68,10 +68,13 @@ def handle_function_service_expiration(ctx, engine_client, orchestrator):
def handle_job(engine_client): def handle_job(engine_client):
"""Execute job task with no db transactions."""
for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)): for job in db_api.get_next_jobs(timeutils.utcnow() + timedelta(seconds=3)):
LOG.debug("Processing job: %s, function: %s", job.id, job.function_id) job_id = job.id
func_id = job.function_id
LOG.debug("Processing job: %s, function: %s", job_id, func_id)
func_db = db_api.get_function(job.function_id) func_db = db_api.get_function(func_id, insecure=True)
trust_id = func_db.trust_id trust_id = func_db.trust_id
try: try:
@ -94,7 +97,7 @@ def handle_job(engine_client):
'count': 0 'count': 0
}, },
{ {
'id': job.id, 'id': job_id,
'status': status.RUNNING 'status': status.RUNNING
}, },
insecure=True, insecure=True,
@ -112,7 +115,7 @@ def handle_job(engine_client):
'count': job.count 'count': job.count
}, },
{ {
'id': job.id, 'id': job_id,
'next_execution_time': job.next_execution_time 'next_execution_time': job.next_execution_time
}, },
insecure=True, insecure=True,
@ -121,24 +124,23 @@ def handle_job(engine_client):
if not modified: if not modified:
LOG.warning( LOG.warning(
'Job %s has been already handled by another periodic ' 'Job %s has been already handled by another periodic '
'task.', job.id 'task.', job_id
) )
continue continue
LOG.debug( LOG.debug(
"Starting to execute function %s by job %s", "Starting to execute function %s by job %s", func_id, job_id
job.function_id, job.id
) )
params = { params = {
'function_id': job.function_id, 'function_id': func_id,
'input': job.function_input, 'input': job.function_input,
'sync': False, 'sync': False,
'description': constants.EXECUTION_BY_JOB % job.id 'description': constants.EXECUTION_BY_JOB % job_id
} }
executions.create_execution(engine_client, params) executions.create_execution(engine_client, params)
except Exception: except Exception:
LOG.exception("Failed to process job %s", job.id) LOG.exception("Failed to process job %s", job_id)
finally: finally:
context.set_ctx(None) context.set_ctx(None)