Have the dispatch_job function return a future

To make it easier to add in a multi-threaded conductor
convert the base dispatch_job function to return a future
object. This future object will contain a single result,
whether the job should be consumed or abandoned. In the
single threaded conductor its dispatch_job function will
return a future, after completing the job (in a multi
threaded conductor it would not return a future after
doing the work).

Change-Id: I077334820d36c64e272e93d158e3a0cd0d66a937
This commit is contained in:
Joshua Harlow
2014-06-30 20:26:26 -07:00
parent 73125ee0fd
commit 296e660cd3
2 changed files with 8 additions and 5 deletions

View File

@@ -108,9 +108,10 @@ class Conductor(object):
"""Dispatches a claimed job for work completion.
Accepts a single (already claimed) job and causes it to be run in
an engine. Returns a boolean that signifies whether the job should
be consumed. The job is consumed upon completion (unless False is
returned which will signify the job should be abandoned instead).
an engine. Returns a future object that represented the work to be
completed sometime in the future. The future should return a single
boolean from its result() method. This boolean determines whether the
job will be consumed (true) or whether it should be abandoned (false).
:param job: A job instance that has already been claimed by the
jobboard.

View File

@@ -21,6 +21,7 @@ from taskflow.conductors import base
from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener
from taskflow.types import timing as tt
from taskflow.utils import async_utils
from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__)
@@ -116,7 +117,7 @@ class SingleThreadedConductor(base.Conductor):
job, exc_info=True)
else:
LOG.info("Job completed successfully: %s", job)
return consume
return async_utils.make_completed_future(consume)
def run(self):
self._dead.clear()
@@ -136,12 +137,13 @@ class SingleThreadedConductor(base.Conductor):
continue
consume = False
try:
consume = self._dispatch_job(job)
f = self._dispatch_job(job)
except Exception:
LOG.warn("Job dispatching failed: %s", job,
exc_info=True)
else:
dispatched += 1
consume = f.result()
try:
if consume:
self._jobboard.consume(job, self._name)