Merge "Have the dispatch_job function return a future"

This commit is contained in:
Jenkins
2014-10-18 20:23:17 +00:00
committed by Gerrit Code Review
2 changed files with 8 additions and 5 deletions

View File

@@ -108,9 +108,10 @@ class Conductor(object):
"""Dispatches a claimed job for work completion. """Dispatches a claimed job for work completion.
Accepts a single (already claimed) job and causes it to be run in Accepts a single (already claimed) job and causes it to be run in
an engine. Returns a boolean that signifies whether the job should an engine. Returns a future object that represented the work to be
be consumed. The job is consumed upon completion (unless False is completed sometime in the future. The future should return a single
returned which will signify the job should be abandoned instead). boolean from its result() method. This boolean determines whether the
job will be consumed (true) or whether it should be abandoned (false).
:param job: A job instance that has already been claimed by the :param job: A job instance that has already been claimed by the
jobboard. jobboard.

View File

@@ -21,6 +21,7 @@ from taskflow.conductors import base
from taskflow import exceptions as excp from taskflow import exceptions as excp
from taskflow.listeners import logging as logging_listener from taskflow.listeners import logging as logging_listener
from taskflow.types import timing as tt from taskflow.types import timing as tt
from taskflow.utils import async_utils
from taskflow.utils import lock_utils from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -116,7 +117,7 @@ class SingleThreadedConductor(base.Conductor):
job, exc_info=True) job, exc_info=True)
else: else:
LOG.info("Job completed successfully: %s", job) LOG.info("Job completed successfully: %s", job)
return consume return async_utils.make_completed_future(consume)
def run(self): def run(self):
self._dead.clear() self._dead.clear()
@@ -136,12 +137,13 @@ class SingleThreadedConductor(base.Conductor):
continue continue
consume = False consume = False
try: try:
consume = self._dispatch_job(job) f = self._dispatch_job(job)
except Exception: except Exception:
LOG.warn("Job dispatching failed: %s", job, LOG.warn("Job dispatching failed: %s", job,
exc_info=True) exc_info=True)
else: else:
dispatched += 1 dispatched += 1
consume = f.result()
try: try:
if consume: if consume:
self._jobboard.consume(job, self._name) self._jobboard.consume(job, self._name)