diff --git a/taskflow/patterns/linear_workflow.py b/taskflow/patterns/linear_workflow.py index 98de8febe..6658f777f 100644 --- a/taskflow/patterns/linear_workflow.py +++ b/taskflow/patterns/linear_workflow.py @@ -18,12 +18,9 @@ import collections as dict_provider import copy +import functools import logging -# OrderedDict is only in 2.7 or greater :-( -if not hasattr(dict_provider, 'OrderedDict'): - import ordereddict as dict_provider - from taskflow.openstack.common import excutils from taskflow import exceptions as exc from taskflow import states @@ -38,16 +35,14 @@ class Workflow(object): def __init__(self, name, tolerant=False, parents=None): # The tasks which have been applied will be collected here so that they # can be reverted in the correct order on failure. - self.reversions = [] + self._reversions = [] self.name = name # If this chain can ignore individual task reversion failure then this # should be set to true, instead of the default value of false. self.tolerant = tolerant - # Ordered dicts are used so that we can nicely refer to the tasks by - # name and easily fetch there results but also allow for the running - # of said tasks to happen in a linear order. - self.tasks = dict_provider.OrderedDict() - self.results = dict_provider.OrderedDict() + # Tasks and there results are stored here... + self.tasks = [] + self.results = [] # If this workflow has a parent workflow/s which need to be reverted if # this workflow fails then please include them here to allow this child # to call the parents... @@ -69,71 +64,83 @@ class Workflow(object): # The state of this flow. self.state = states.PENDING - def __setitem__(self, name, task): - self.tasks[name] = task - - def __getitem__(self, name): - return self.results[name] + def __str__(self): + return "%s: %s" % (self.__class__.__name__, id(self)) def run(self, context, *args, **kwargs): + if self.state != states.PENDING: + raise exc.InvalidStateException("Unable to run linear flow when " + "in state %s" % (self.state)) + + if self.result_fetcher: + result_fetcher = functools.partial(self.result_fetcher, context) + else: + result_fetcher = None + self.state = states.STARTED - for (name, task) in self.tasks.iteritems(): + for task in self.tasks: try: - self._on_task_start(context, task, name) - # See if we have already ran this... + # See if we have already ran this. result = None has_result = False - if self.result_fetcher: - (has_result, result) = self.result_fetcher(context, - name, self) + if result_fetcher: + (has_result, result) = result_fetcher(context, self, task) + if not has_result: + self.state = state.RUNNING + else: + self.state = state.RESUMING + self._on_task_start(context, task) if not has_result: result = task.apply(context, *args, **kwargs) - # Keep a pristine copy of the result in the results table + # Keep a pristine copy of the result # so that if said result is altered by other further states - # the one here will not be. - self.results[name] = copy.deepcopy(result) - self._on_task_finish(context, task, name, result) - self.state = states.SUCCESS + # the one here will not be. This ensures that if rollback + # occurs that the task gets exactly the result it returned + # and not a modified one. + self.results.append((task, copy.deepcopy(result))) + self._on_task_finish(context, task, result) except Exception as ex: + self.state = states.FAILURE with excutils.save_and_reraise_exception(): - self.state = states.FAILURE try: - self._on_task_error(context, task, name) + self._on_task_error(context, task) except Exception: LOG.exception("Dropping exception catched when" " notifying about existing task" " exception.") self.state = states.REVERTING - self.rollback(context, - exc.TaskException(task, name, self, ex)) + self.rollback(context, exc.TaskException(task, self, ex)) + self.state = states.FAILURE + # Only gets here if everything went successfully. + self.state = states.SUCCESS - def _on_task_error(self, context, task, name): + def _on_task_error(self, context, task): # Notify any listeners that the task has errored. for i in self.listeners: - i.notify(context, states.FAILURE, self, task, name) + i.notify(context, states.FAILURE, self, task) - def _on_task_start(self, context, task, name): + def _on_task_start(self, context, task): # Notify any listeners that we are about to start the given task. for i in self.listeners: - i.notify(context, states.STARTED, self, task, name) + i.notify(context, states.STARTED, self, task) - def _on_task_finish(self, context, task, name, result): + def _on_task_finish(self, context, task, result): # Notify any listeners that we are finishing the given task. - self.reversions.append((name, task)) + self._reversions.append((task, result)) for i in self.listeners: - i.notify(context, states.SUCCESS, self, task, name, result=result) + i.notify(context, states.SUCCESS, self, task, result=result) def rollback(self, context, cause): - for (i, (name, task)) in enumerate(reversed(self.reversions)): + for (i, (task, result)) in enumerate(reversed(self._reversions)): try: - task.revert(context, self.results[name], cause) + task.revert(context, result, cause) except Exception: # Ex: WARN: Failed rolling back stage 1 (validate_request) of # chain validation due to Y exception. - msg = ("Failed rolling back stage %(index)s (%(name)s)" + msg = ("Failed rolling back stage %(index)s (%(task)s)" " of workflow %(workflow)s, due to inner exception.") - LOG.warn(msg % {'index': (i + 1), 'stage': name, - 'workflow': self.name}) + LOG.warn(msg % {'index': (i + 1), 'task': task, + 'workflow': self}) if not self.tolerant: # NOTE(harlowja): LOG a msg AND re-raise the exception if # the chain does not tolerate exceptions happening in the diff --git a/tools/pip-requires b/tools/pip-requires index 2f2e7a9f4..c15e57adf 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -1,2 +1 @@ oslo.cfg>=1.1.0 -ordereddict