diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 2883c487..824545ab 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -49,8 +49,7 @@ class Flow(linear_flow.Flow): assert isinstance(task, collections.Callable) r = utils.Runner(task) self._graph.add_node(r, uuid=r.uuid) - self._runners = [] - self._leftoff_at = None + self._reset_internals() return r.uuid def _add_dependency(self, provider, requirer): @@ -64,6 +63,10 @@ class Flow(linear_flow.Flow): lines.append("%s" % (self.state)) return "; ".join(lines) + def _reset_internals(self): + super(Flow, self)._reset_internals() + self._runners = [] + @decorators.locked def remove(self, uuid): runner = None @@ -74,10 +77,8 @@ class Flow(linear_flow.Flow): if not runner: raise ValueError("No runner found with uuid %s" % (uuid)) else: - # Ensure that we reset out internal state after said removal self._graph.remove_node(runner) - self._runners = [] - self._leftoff_at = None + self._reset_internals() def _ordering(self): try: @@ -94,7 +95,7 @@ class Flow(linear_flow.Flow): create said dependency.""" if len(self._graph) == 0: return [] - if self._runners: + if self._connected: return self._runners # Clear out all edges (since we want to do a fresh connection) @@ -137,4 +138,5 @@ class Flow(linear_flow.Flow): r.runs_before = list(reversed(run_stack)) run_stack.append(r) self._runners = run_order + self._connected = True return run_order diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 9a9df158..149842fb 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -70,10 +70,13 @@ class Flow(base.Flow): assert isinstance(task, collections.Callable) r = utils.Runner(task) r.runs_before = list(reversed(self._runners)) + self._runners.append(r) + self._reset_internals() + return r.uuid + + def _reset_internals(self): self._connected = False self._leftoff_at = None - self._runners.append(r) - return r.uuid def _associate_providers(self, runner): # Ensure that some previous task provides this input. @@ -112,10 +115,8 @@ class Flow(base.Flow): if index_removed == -1: raise ValueError("No runner found with uuid %s" % (uuid)) else: - # Ensure that we reset out internal state after said removal. removed = self._runners.pop(index_removed) - self._connected = False - self._leftoff_at = None + self._reset_internals() # Go and remove it from any runner after the removed runner since # those runners may have had an attachment to it. for r in self._runners[index_removed:]: @@ -257,8 +258,7 @@ class Flow(base.Flow): self.results = {} self.resumer = None self._accumulator.reset() - self._leftoff_at = None - self._connected = False + self._reset_internals() @decorators.locked def rollback(self, context, cause):