diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 00ce34aa..fd045015 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -54,8 +54,9 @@ class Flow(base.Flow): # the contract we have with tasks that they will be given the value # they returned if reversion is triggered. self.result_fetcher = None - # Tasks results are stored here... - self.results = [] + # Tasks results are stored here. Lookup is by the uuid that was + # returned from the add function. + self.results = {} # The last index in the order we left off at before being # interrupted (or failing). self._left_off_at = 0 @@ -173,7 +174,7 @@ class Flow(base.Flow): runner.result = result # Alter the index we have ran at. self._left_off_at += 1 - self.results.append((runner.task, copy.deepcopy(result))) + self.results[runner.uuid] = copy.deepcopy(result) self._on_task_finish(context, runner.task, result) except Exception as e: cause = utils.FlowFailure(runner.task, self, e) @@ -225,7 +226,7 @@ class Flow(base.Flow): @decorators.locked def reset(self): super(Flow, self).reset() - self.results = [] + self.results = {} self.result_fetcher = None self._accumulator.reset() self._left_off_at = 0 diff --git a/taskflow/tests/unit/test_linear_flow.py b/taskflow/tests/unit/test_linear_flow.py index 6d5f36c9..c2cccb71 100644 --- a/taskflow/tests/unit/test_linear_flow.py +++ b/taskflow/tests/unit/test_linear_flow.py @@ -57,6 +57,19 @@ class LinearFlowTest(unittest2.TestCase): return do_interrupt + def test_result_access(self): + wf = lw.Flow("the-test-action") + + @decorators.task + def do_apply1(context): + return [1, 2] + + result_id = wf.add(do_apply1) + ctx = {} + wf.run(ctx) + self.assertTrue(result_id in wf.results) + self.assertEquals([1, 2], wf.results[result_id]) + def test_functor_flow(self): wf = lw.Flow("the-test-action")