diff --git a/taskflow/patterns/ordered_flow.py b/taskflow/patterns/ordered_flow.py index 96c9421b..e69b99a5 100644 --- a/taskflow/patterns/ordered_flow.py +++ b/taskflow/patterns/ordered_flow.py @@ -92,6 +92,9 @@ class Flow(object): self._state = states.PENDING # Tasks results are stored here... self.results = [] + # The last task index in the order we left off at before being + # interrupted (or failing). + self._left_off_at = 0 @property def state(self): @@ -129,6 +132,8 @@ class Flow(object): self._change_state(context, states.STARTED) try: task_order = self.order() + if self._left_off_at > 0: + task_order = task_order[self._left_off_at:] except Exception: with excutils.save_and_reraise_exception(): try: @@ -160,6 +165,8 @@ class Flow(object): " object: %s", result) result = exc.InvalidStateException() raise result + # Alter the index we have ran at. + self._left_off_at += 1 # Keep a pristine copy of the result # so that if said result is altered by other further # states the one here will not be. This ensures that @@ -216,15 +223,33 @@ class Flow(object): self._change_state(context, states.SUCCESS) def reset(self): - # Reset all internal state (except our parent workflows). + # Reset (hard) alters the local state and does clear out other member + # variables state. + if self.state not in (states.INTERRUPTED, states.SUCCESS, + states.FAILURE, states.PENDING): + raise exc.InvalidStateException(("Can not reset when" + " in state %s") % (self.state)) self._state = states.PENDING self.results = [] self.task_listeners = [] self.listeners = [] self.result_fetcher = None self._accumulator.reset() + self._left_off_at = 0 + + def soft_reset(self): + # Soft reset only alters the local state and does not clear out any + # other member variables state. + if self.state not in (states.INTERRUPTED, states.SUCCESS, + states.PENDING): + raise exc.InvalidStateException(("Can not soft_reset when" + " in state %s") % (self.state)) + self._state = states.PENDING def interrupt(self): + if self.state in (states.FAILURE, states.SUCCESS, states.PENDING): + raise exc.InvalidStateException(("Can not interrupt when" + " in state %s") % (self.state)) self._change_state(None, states.INTERRUPTED) def _change_state(self, context, new_state):