From afea4f0732e68f5cbb38f5a8ac194698aec8e520 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 22 May 2013 14:51:03 -0700 Subject: [PATCH] Allow any of the previous tasks to satisfy requirements. Instead of just checking the previous task, allow any of the previous tasks to provide the requirement for a new task. --- taskflow/patterns/linear_flow.py | 46 +++++++++++++++++--------------- 1 file changed, 25 insertions(+), 21 deletions(-) diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 643ccced..89cb9301 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -33,7 +33,7 @@ def _convert_to_set(items): class Flow(ordered_flow.Flow): """A linear chain of tasks that can be applied as one unit or rolled back as one unit. Each task in the chain may have requirements - which are satisfied by the previous task in the chain.""" + which are satisfied by the previous task/s in the chain.""" def __init__(self, name, tolerant=False, parents=None): super(Flow, self).__init__(name, tolerant, parents) @@ -41,31 +41,35 @@ class Flow(ordered_flow.Flow): def _fetch_task_inputs(self, task): inputs = {} - if self.results: - (_last_task, last_results) = self.results[-1] - for k in task.requires(): - if last_results and k in last_results: - inputs[k] = last_results[k] + for r in _convert_to_set(task.requires()): + # Find the last task that provided this. + for (last_task, last_results) in reversed(self.results): + if r not in _convert_to_set(last_task.provides()): + continue + if last_results and r in last_results: + inputs[r] = last_results[r] + else: + inputs[r] = None + # Some task said they had it, get the next requirement. + break return inputs def _validate_provides(self, task): - requires = _convert_to_set(task.requires()) - last_provides = set() - last_provider = None - if self._tasks: - last_provider = self._tasks[-1] - last_provides = _convert_to_set(last_provider.provides()) + # Ensure that some previous task provides this input. + missing_requires = [] + for r in _convert_to_set(task.requires()): + found_provider = False + for prev_task in reversed(self._tasks): + if r in _convert_to_set(prev_task.provides()): + found_provider = True + break + if not found_provider: + missing_requires.append(r) # Ensure that the last task provides all the needed input for this # task to run correctly. - req_diff = requires.difference(last_provides) - if req_diff: - if last_provider is None: - msg = ("There is no previous task providing the outputs %s" - " for %s to correctly execute.") % (req_diff, task) - else: - msg = ("%s does not provide the needed outputs %s for %s to" - " correctly execute.") - msg = msg % (last_provider, req_diff, task) + if len(missing_requires): + msg = ("There is no previous task providing the outputs %s" + " for %s to correctly execute.") % (missing_requires, task) raise exc.InvalidStateException(msg) def add(self, task):