Allow for two ways to find a flow detail in a job for a conductor

Previously we had the code looking at the first logbook entry and
running with that. That doesn't work so well especially since the
logbook may be unordered. So we then switched to require a job
to provide a 'flow_uuid' key to determine which one to run. This
makes sense and avoids the problem of being unable to determine
which one to run but makes it harder to use for those that have just
logbooks with single entries (likely the common case).

So add in a slightly more advanced finding logic that will check for
existence of 'flow_uuid' and if found use it, otherwise if not found
then check if the logbook is only a single item and if so use that
instead (and otherwise abort).

Change-Id: Id1e11e8b4e48af3389e5c4e0818777ff9abf9463
This commit is contained in:
Joshua Harlow
2014-05-30 13:44:44 -07:00
parent 86e651b9ce
commit f4b7dfd25b
2 changed files with 48 additions and 19 deletions

View File

@@ -39,28 +39,53 @@ class Conductor(object):
self._persistence = persistence
self._lock = threading.RLock()
def _engine_from_job(self, job):
try:
def _flow_detail_from_job(self, job):
"""Extracts a flow detail from a job (via some manner).
The current mechanism to accomplish this is the following choices:
* If the job details provide a 'flow_uuid' key attempt to load this
key from the jobs book and use that as the flow_detail to run.
* If the job details does not have have a 'flow_uuid' key then attempt
to examine the size of the book and if it's only one element in the
book (aka one flow_detail) then just use that.
* Otherwise if there is no 'flow_uuid' defined or there are > 1
flow_details in the book raise an error that corresponds to being
unable to locate the correct flow_detail to run.
"""
book = job.book
if book is None:
raise excp.NotFound("No book found in job")
if job.details and 'flow_uuid' in job.details:
flow_uuid = job.details["flow_uuid"]
except (KeyError, TypeError):
raise excp.NotFound("No flow detail uuid found in job")
else:
try:
flow_detail = job.book.find(flow_uuid)
except (TypeError, AttributeError):
flow_detail = None
flow_detail = book.find(flow_uuid)
if flow_detail is None:
raise excp.NotFound("No matching flow detail found in"
" job for flow detail uuid %s" % flow_uuid)
try:
store = dict(job.details["store"])
except (KeyError, TypeError):
store = {}
return taskflow.engines.load_from_detail(
flow_detail,
store=store,
engine_conf=dict(self._engine_conf),
backend=self._persistence)
" jobs book for flow detail"
" with uuid %s" % flow_uuid)
else:
choices = len(book)
if choices == 1:
flow_detail = list(book)[0]
elif choices == 0:
raise excp.NotFound("No flow detail(s) found in jobs book")
else:
raise excp.MultipleChoices("No matching flow detail found (%s"
" choices) in jobs book" % choices)
return flow_detail
def _engine_from_job(self, job):
"""Extracts an engine from a job (via some manner)."""
flow_detail = self._flow_detail_from_job(job)
if job.details and 'store' in job.details:
store = dict(job.details["store"])
else:
store = {}
engine_conf = dict(self._engine_conf)
return taskflow.engines.load_from_detail(flow_detail,
store=store,
engine_conf=engine_conf,
backend=self._persistence)
@lock_utils.locked
def connect(self):

View File

@@ -127,6 +127,10 @@ class Empty(TaskFlowException):
"""Raised when some object is empty when it shouldn't be."""
class MultipleChoices(TaskFlowException):
"""Raised when some decision can't be made due to many possible choices."""
# Others.
class WrappedFailure(Exception):