Merge "Allow for two ways to find a flow detail in a job for a conductor"
This commit is contained in:
@@ -39,28 +39,53 @@ class Conductor(object):
|
||||
self._persistence = persistence
|
||||
self._lock = threading.RLock()
|
||||
|
||||
def _engine_from_job(self, job):
|
||||
try:
|
||||
def _flow_detail_from_job(self, job):
|
||||
"""Extracts a flow detail from a job (via some manner).
|
||||
|
||||
The current mechanism to accomplish this is the following choices:
|
||||
|
||||
* If the job details provide a 'flow_uuid' key attempt to load this
|
||||
key from the jobs book and use that as the flow_detail to run.
|
||||
* If the job details does not have have a 'flow_uuid' key then attempt
|
||||
to examine the size of the book and if it's only one element in the
|
||||
book (aka one flow_detail) then just use that.
|
||||
* Otherwise if there is no 'flow_uuid' defined or there are > 1
|
||||
flow_details in the book raise an error that corresponds to being
|
||||
unable to locate the correct flow_detail to run.
|
||||
"""
|
||||
book = job.book
|
||||
if book is None:
|
||||
raise excp.NotFound("No book found in job")
|
||||
if job.details and 'flow_uuid' in job.details:
|
||||
flow_uuid = job.details["flow_uuid"]
|
||||
except (KeyError, TypeError):
|
||||
raise excp.NotFound("No flow detail uuid found in job")
|
||||
else:
|
||||
try:
|
||||
flow_detail = job.book.find(flow_uuid)
|
||||
except (TypeError, AttributeError):
|
||||
flow_detail = None
|
||||
flow_detail = book.find(flow_uuid)
|
||||
if flow_detail is None:
|
||||
raise excp.NotFound("No matching flow detail found in"
|
||||
" job for flow detail uuid %s" % flow_uuid)
|
||||
try:
|
||||
store = dict(job.details["store"])
|
||||
except (KeyError, TypeError):
|
||||
store = {}
|
||||
return taskflow.engines.load_from_detail(
|
||||
flow_detail,
|
||||
store=store,
|
||||
engine_conf=dict(self._engine_conf),
|
||||
backend=self._persistence)
|
||||
" jobs book for flow detail"
|
||||
" with uuid %s" % flow_uuid)
|
||||
else:
|
||||
choices = len(book)
|
||||
if choices == 1:
|
||||
flow_detail = list(book)[0]
|
||||
elif choices == 0:
|
||||
raise excp.NotFound("No flow detail(s) found in jobs book")
|
||||
else:
|
||||
raise excp.MultipleChoices("No matching flow detail found (%s"
|
||||
" choices) in jobs book" % choices)
|
||||
return flow_detail
|
||||
|
||||
def _engine_from_job(self, job):
|
||||
"""Extracts an engine from a job (via some manner)."""
|
||||
flow_detail = self._flow_detail_from_job(job)
|
||||
if job.details and 'store' in job.details:
|
||||
store = dict(job.details["store"])
|
||||
else:
|
||||
store = {}
|
||||
engine_conf = dict(self._engine_conf)
|
||||
return taskflow.engines.load_from_detail(flow_detail,
|
||||
store=store,
|
||||
engine_conf=engine_conf,
|
||||
backend=self._persistence)
|
||||
|
||||
@lock_utils.locked
|
||||
def connect(self):
|
||||
|
||||
@@ -127,6 +127,10 @@ class Empty(TaskFlowException):
|
||||
"""Raised when some object is empty when it shouldn't be."""
|
||||
|
||||
|
||||
class MultipleChoices(TaskFlowException):
|
||||
"""Raised when some decision can't be made due to many possible choices."""
|
||||
|
||||
|
||||
# Others.
|
||||
|
||||
class WrappedFailure(Exception):
|
||||
|
||||
Reference in New Issue
Block a user