Allow any of the previous tasks to satisfy requirements.
Instead of just checking the previous task, allow any of the previous tasks to provide the requirement for a new task.
This commit is contained in:
@@ -33,7 +33,7 @@ def _convert_to_set(items):
|
||||
class Flow(ordered_flow.Flow):
|
||||
"""A linear chain of tasks that can be applied as one unit or
|
||||
rolled back as one unit. Each task in the chain may have requirements
|
||||
which are satisfied by the previous task in the chain."""
|
||||
which are satisfied by the previous task/s in the chain."""
|
||||
|
||||
def __init__(self, name, tolerant=False, parents=None):
|
||||
super(Flow, self).__init__(name, tolerant, parents)
|
||||
@@ -41,31 +41,35 @@ class Flow(ordered_flow.Flow):
|
||||
|
||||
def _fetch_task_inputs(self, task):
|
||||
inputs = {}
|
||||
if self.results:
|
||||
(_last_task, last_results) = self.results[-1]
|
||||
for k in task.requires():
|
||||
if last_results and k in last_results:
|
||||
inputs[k] = last_results[k]
|
||||
for r in _convert_to_set(task.requires()):
|
||||
# Find the last task that provided this.
|
||||
for (last_task, last_results) in reversed(self.results):
|
||||
if r not in _convert_to_set(last_task.provides()):
|
||||
continue
|
||||
if last_results and r in last_results:
|
||||
inputs[r] = last_results[r]
|
||||
else:
|
||||
inputs[r] = None
|
||||
# Some task said they had it, get the next requirement.
|
||||
break
|
||||
return inputs
|
||||
|
||||
def _validate_provides(self, task):
|
||||
requires = _convert_to_set(task.requires())
|
||||
last_provides = set()
|
||||
last_provider = None
|
||||
if self._tasks:
|
||||
last_provider = self._tasks[-1]
|
||||
last_provides = _convert_to_set(last_provider.provides())
|
||||
# Ensure that some previous task provides this input.
|
||||
missing_requires = []
|
||||
for r in _convert_to_set(task.requires()):
|
||||
found_provider = False
|
||||
for prev_task in reversed(self._tasks):
|
||||
if r in _convert_to_set(prev_task.provides()):
|
||||
found_provider = True
|
||||
break
|
||||
if not found_provider:
|
||||
missing_requires.append(r)
|
||||
# Ensure that the last task provides all the needed input for this
|
||||
# task to run correctly.
|
||||
req_diff = requires.difference(last_provides)
|
||||
if req_diff:
|
||||
if last_provider is None:
|
||||
msg = ("There is no previous task providing the outputs %s"
|
||||
" for %s to correctly execute.") % (req_diff, task)
|
||||
else:
|
||||
msg = ("%s does not provide the needed outputs %s for %s to"
|
||||
" correctly execute.")
|
||||
msg = msg % (last_provider, req_diff, task)
|
||||
if len(missing_requires):
|
||||
msg = ("There is no previous task providing the outputs %s"
|
||||
" for %s to correctly execute.") % (missing_requires, task)
|
||||
raise exc.InvalidStateException(msg)
|
||||
|
||||
def add(self, task):
|
||||
|
||||
Reference in New Issue
Block a user