diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index e7c9887a..c881346e 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -108,9 +108,10 @@ class Conductor(object): """Dispatches a claimed job for work completion. Accepts a single (already claimed) job and causes it to be run in - an engine. Returns a boolean that signifies whether the job should - be consumed. The job is consumed upon completion (unless False is - returned which will signify the job should be abandoned instead). + an engine. Returns a future object that represented the work to be + completed sometime in the future. The future should return a single + boolean from its result() method. This boolean determines whether the + job will be consumed (true) or whether it should be abandoned (false). :param job: A job instance that has already been claimed by the jobboard. diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index 5e78e348..23994e79 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -21,6 +21,7 @@ from taskflow.conductors import base from taskflow import exceptions as excp from taskflow.listeners import logging as logging_listener from taskflow.types import timing as tt +from taskflow.utils import async_utils from taskflow.utils import lock_utils LOG = logging.getLogger(__name__) @@ -116,7 +117,7 @@ class SingleThreadedConductor(base.Conductor): job, exc_info=True) else: LOG.info("Job completed successfully: %s", job) - return consume + return async_utils.make_completed_future(consume) def run(self): self._dead.clear() @@ -136,12 +137,13 @@ class SingleThreadedConductor(base.Conductor): continue consume = False try: - consume = self._dispatch_job(job) + f = self._dispatch_job(job) except Exception: LOG.warn("Job dispatching failed: %s", job, exc_info=True) else: dispatched += 1 + consume = f.result() try: if consume: self._jobboard.consume(job, self._name)