From a45f8249c966a7241556a465f8c07c7fbb210277 Mon Sep 17 00:00:00 2001 From: Gregory Thiemonge Date: Fri, 5 Nov 2021 09:29:48 +0100 Subject: [PATCH] Delete the job from backend if it cannot be consumed In case of invalid/missing data in the job, the job entry should be purged from the backend to avoid re-scheduling it later. Closes-Bug: #1949950 Change-Id: I09ab84883cd4dfbab18b56c61e585fd8ac1b6acf --- taskflow/conductors/backends/impl_executor.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/taskflow/conductors/backends/impl_executor.py b/taskflow/conductors/backends/impl_executor.py index ab55821b0..bd7be7e79 100644 --- a/taskflow/conductors/backends/impl_executor.py +++ b/taskflow/conductors/backends/impl_executor.py @@ -211,7 +211,7 @@ class ExecutorConductor(base.Conductor): self._log.info("Job completed successfully: %s", job) return consume - def _try_finish_job(self, job, consume): + def _try_finish_job(self, job, consume, trash=False): try: if consume: self._jobboard.consume(job, self._name) @@ -220,6 +220,13 @@ class ExecutorConductor(base.Conductor): 'conductor': self, 'persistence': self._persistence, }) + elif trash: + self._jobboard.trash(job, self._name) + self._notifier.notify("job_trashed", { + 'job': job, + 'conductor': self, + 'persistence': self._persistence, + }) else: self._jobboard.abandon(job, self._name) self._notifier.notify("job_abandoned", { @@ -237,6 +244,7 @@ class ExecutorConductor(base.Conductor): def _on_job_done(self, job, fut): consume = False + trash = False try: consume = fut.result() except KeyboardInterrupt: @@ -244,8 +252,9 @@ class ExecutorConductor(base.Conductor): self._log.warn("Job dispatching interrupted: %s", job) except Exception: self._log.warn("Job dispatching failed: %s", job, exc_info=True) + trash = True try: - self._try_finish_job(job, consume) + self._try_finish_job(job, consume, trash) finally: self._dispatched.discard(fut)