Merge "Delete the job from backend if it cannot be consumed"
This commit is contained in:
commit
b46a6c8b6d
@ -209,7 +209,7 @@ class ExecutorConductor(base.Conductor, metaclass=abc.ABCMeta):
|
|||||||
self._log.info("Job completed successfully: %s", job)
|
self._log.info("Job completed successfully: %s", job)
|
||||||
return consume
|
return consume
|
||||||
|
|
||||||
def _try_finish_job(self, job, consume):
|
def _try_finish_job(self, job, consume, trash=False):
|
||||||
try:
|
try:
|
||||||
if consume:
|
if consume:
|
||||||
self._jobboard.consume(job, self._name)
|
self._jobboard.consume(job, self._name)
|
||||||
@ -218,6 +218,13 @@ class ExecutorConductor(base.Conductor, metaclass=abc.ABCMeta):
|
|||||||
'conductor': self,
|
'conductor': self,
|
||||||
'persistence': self._persistence,
|
'persistence': self._persistence,
|
||||||
})
|
})
|
||||||
|
elif trash:
|
||||||
|
self._jobboard.trash(job, self._name)
|
||||||
|
self._notifier.notify("job_trashed", {
|
||||||
|
'job': job,
|
||||||
|
'conductor': self,
|
||||||
|
'persistence': self._persistence,
|
||||||
|
})
|
||||||
else:
|
else:
|
||||||
self._jobboard.abandon(job, self._name)
|
self._jobboard.abandon(job, self._name)
|
||||||
self._notifier.notify("job_abandoned", {
|
self._notifier.notify("job_abandoned", {
|
||||||
@ -235,6 +242,7 @@ class ExecutorConductor(base.Conductor, metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
def _on_job_done(self, job, fut):
|
def _on_job_done(self, job, fut):
|
||||||
consume = False
|
consume = False
|
||||||
|
trash = False
|
||||||
try:
|
try:
|
||||||
consume = fut.result()
|
consume = fut.result()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
@ -242,8 +250,9 @@ class ExecutorConductor(base.Conductor, metaclass=abc.ABCMeta):
|
|||||||
self._log.warn("Job dispatching interrupted: %s", job)
|
self._log.warn("Job dispatching interrupted: %s", job)
|
||||||
except Exception:
|
except Exception:
|
||||||
self._log.warn("Job dispatching failed: %s", job, exc_info=True)
|
self._log.warn("Job dispatching failed: %s", job, exc_info=True)
|
||||||
|
trash = True
|
||||||
try:
|
try:
|
||||||
self._try_finish_job(job, consume)
|
self._try_finish_job(job, consume, trash)
|
||||||
finally:
|
finally:
|
||||||
self._dispatched.discard(fut)
|
self._dispatched.discard(fut)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user