diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 634c5dec..0595e72a 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -39,28 +39,53 @@ class Conductor(object): self._persistence = persistence self._lock = threading.RLock() - def _engine_from_job(self, job): - try: + def _flow_detail_from_job(self, job): + """Extracts a flow detail from a job (via some manner). + + The current mechanism to accomplish this is the following choices: + + * If the job details provide a 'flow_uuid' key attempt to load this + key from the jobs book and use that as the flow_detail to run. + * If the job details does not have have a 'flow_uuid' key then attempt + to examine the size of the book and if it's only one element in the + book (aka one flow_detail) then just use that. + * Otherwise if there is no 'flow_uuid' defined or there are > 1 + flow_details in the book raise an error that corresponds to being + unable to locate the correct flow_detail to run. + """ + book = job.book + if book is None: + raise excp.NotFound("No book found in job") + if job.details and 'flow_uuid' in job.details: flow_uuid = job.details["flow_uuid"] - except (KeyError, TypeError): - raise excp.NotFound("No flow detail uuid found in job") - else: - try: - flow_detail = job.book.find(flow_uuid) - except (TypeError, AttributeError): - flow_detail = None + flow_detail = book.find(flow_uuid) if flow_detail is None: raise excp.NotFound("No matching flow detail found in" - " job for flow detail uuid %s" % flow_uuid) - try: - store = dict(job.details["store"]) - except (KeyError, TypeError): - store = {} - return taskflow.engines.load_from_detail( - flow_detail, - store=store, - engine_conf=dict(self._engine_conf), - backend=self._persistence) + " jobs book for flow detail" + " with uuid %s" % flow_uuid) + else: + choices = len(book) + if choices == 1: + flow_detail = list(book)[0] + elif choices == 0: + raise excp.NotFound("No flow detail(s) found in jobs book") + else: + raise excp.MultipleChoices("No matching flow detail found (%s" + " choices) in jobs book" % choices) + return flow_detail + + def _engine_from_job(self, job): + """Extracts an engine from a job (via some manner).""" + flow_detail = self._flow_detail_from_job(job) + if job.details and 'store' in job.details: + store = dict(job.details["store"]) + else: + store = {} + engine_conf = dict(self._engine_conf) + return taskflow.engines.load_from_detail(flow_detail, + store=store, + engine_conf=engine_conf, + backend=self._persistence) @lock_utils.locked def connect(self): diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 983eb55a..78186ef5 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -127,6 +127,10 @@ class Empty(TaskFlowException): """Raised when some object is empty when it shouldn't be.""" +class MultipleChoices(TaskFlowException): + """Raised when some decision can't be made due to many possible choices.""" + + # Others. class WrappedFailure(Exception):