Rework how we should be using lists instead of ordereddicts for optimal usage.
This commit is contained in:
parent
05078fdca2
commit
c0cf3f8d47
@ -18,12 +18,9 @@
|
||||
|
||||
import collections as dict_provider
|
||||
import copy
|
||||
import functools
|
||||
import logging
|
||||
|
||||
# OrderedDict is only in 2.7 or greater :-(
|
||||
if not hasattr(dict_provider, 'OrderedDict'):
|
||||
import ordereddict as dict_provider
|
||||
|
||||
from taskflow.openstack.common import excutils
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow import states
|
||||
@ -38,16 +35,14 @@ class Workflow(object):
|
||||
def __init__(self, name, tolerant=False, parents=None):
|
||||
# The tasks which have been applied will be collected here so that they
|
||||
# can be reverted in the correct order on failure.
|
||||
self.reversions = []
|
||||
self._reversions = []
|
||||
self.name = name
|
||||
# If this chain can ignore individual task reversion failure then this
|
||||
# should be set to true, instead of the default value of false.
|
||||
self.tolerant = tolerant
|
||||
# Ordered dicts are used so that we can nicely refer to the tasks by
|
||||
# name and easily fetch there results but also allow for the running
|
||||
# of said tasks to happen in a linear order.
|
||||
self.tasks = dict_provider.OrderedDict()
|
||||
self.results = dict_provider.OrderedDict()
|
||||
# Tasks and there results are stored here...
|
||||
self.tasks = []
|
||||
self.results = []
|
||||
# If this workflow has a parent workflow/s which need to be reverted if
|
||||
# this workflow fails then please include them here to allow this child
|
||||
# to call the parents...
|
||||
@ -69,71 +64,83 @@ class Workflow(object):
|
||||
# The state of this flow.
|
||||
self.state = states.PENDING
|
||||
|
||||
def __setitem__(self, name, task):
|
||||
self.tasks[name] = task
|
||||
|
||||
def __getitem__(self, name):
|
||||
return self.results[name]
|
||||
def __str__(self):
|
||||
return "%s: %s" % (self.__class__.__name__, id(self))
|
||||
|
||||
def run(self, context, *args, **kwargs):
|
||||
if self.state != states.PENDING:
|
||||
raise exc.InvalidStateException("Unable to run linear flow when "
|
||||
"in state %s" % (self.state))
|
||||
|
||||
if self.result_fetcher:
|
||||
result_fetcher = functools.partial(self.result_fetcher, context)
|
||||
else:
|
||||
result_fetcher = None
|
||||
|
||||
self.state = states.STARTED
|
||||
for (name, task) in self.tasks.iteritems():
|
||||
for task in self.tasks:
|
||||
try:
|
||||
self._on_task_start(context, task, name)
|
||||
# See if we have already ran this...
|
||||
# See if we have already ran this.
|
||||
result = None
|
||||
has_result = False
|
||||
if self.result_fetcher:
|
||||
(has_result, result) = self.result_fetcher(context,
|
||||
name, self)
|
||||
if result_fetcher:
|
||||
(has_result, result) = result_fetcher(context, self, task)
|
||||
if not has_result:
|
||||
self.state = state.RUNNING
|
||||
else:
|
||||
self.state = state.RESUMING
|
||||
self._on_task_start(context, task)
|
||||
if not has_result:
|
||||
result = task.apply(context, *args, **kwargs)
|
||||
# Keep a pristine copy of the result in the results table
|
||||
# Keep a pristine copy of the result
|
||||
# so that if said result is altered by other further states
|
||||
# the one here will not be.
|
||||
self.results[name] = copy.deepcopy(result)
|
||||
self._on_task_finish(context, task, name, result)
|
||||
self.state = states.SUCCESS
|
||||
# the one here will not be. This ensures that if rollback
|
||||
# occurs that the task gets exactly the result it returned
|
||||
# and not a modified one.
|
||||
self.results.append((task, copy.deepcopy(result)))
|
||||
self._on_task_finish(context, task, result)
|
||||
except Exception as ex:
|
||||
self.state = states.FAILURE
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.state = states.FAILURE
|
||||
try:
|
||||
self._on_task_error(context, task, name)
|
||||
self._on_task_error(context, task)
|
||||
except Exception:
|
||||
LOG.exception("Dropping exception catched when"
|
||||
" notifying about existing task"
|
||||
" exception.")
|
||||
self.state = states.REVERTING
|
||||
self.rollback(context,
|
||||
exc.TaskException(task, name, self, ex))
|
||||
self.rollback(context, exc.TaskException(task, self, ex))
|
||||
self.state = states.FAILURE
|
||||
# Only gets here if everything went successfully.
|
||||
self.state = states.SUCCESS
|
||||
|
||||
def _on_task_error(self, context, task, name):
|
||||
def _on_task_error(self, context, task):
|
||||
# Notify any listeners that the task has errored.
|
||||
for i in self.listeners:
|
||||
i.notify(context, states.FAILURE, self, task, name)
|
||||
i.notify(context, states.FAILURE, self, task)
|
||||
|
||||
def _on_task_start(self, context, task, name):
|
||||
def _on_task_start(self, context, task):
|
||||
# Notify any listeners that we are about to start the given task.
|
||||
for i in self.listeners:
|
||||
i.notify(context, states.STARTED, self, task, name)
|
||||
i.notify(context, states.STARTED, self, task)
|
||||
|
||||
def _on_task_finish(self, context, task, name, result):
|
||||
def _on_task_finish(self, context, task, result):
|
||||
# Notify any listeners that we are finishing the given task.
|
||||
self.reversions.append((name, task))
|
||||
self._reversions.append((task, result))
|
||||
for i in self.listeners:
|
||||
i.notify(context, states.SUCCESS, self, task, name, result=result)
|
||||
i.notify(context, states.SUCCESS, self, task, result=result)
|
||||
|
||||
def rollback(self, context, cause):
|
||||
for (i, (name, task)) in enumerate(reversed(self.reversions)):
|
||||
for (i, (task, result)) in enumerate(reversed(self._reversions)):
|
||||
try:
|
||||
task.revert(context, self.results[name], cause)
|
||||
task.revert(context, result, cause)
|
||||
except Exception:
|
||||
# Ex: WARN: Failed rolling back stage 1 (validate_request) of
|
||||
# chain validation due to Y exception.
|
||||
msg = ("Failed rolling back stage %(index)s (%(name)s)"
|
||||
msg = ("Failed rolling back stage %(index)s (%(task)s)"
|
||||
" of workflow %(workflow)s, due to inner exception.")
|
||||
LOG.warn(msg % {'index': (i + 1), 'stage': name,
|
||||
'workflow': self.name})
|
||||
LOG.warn(msg % {'index': (i + 1), 'task': task,
|
||||
'workflow': self})
|
||||
if not self.tolerant:
|
||||
# NOTE(harlowja): LOG a msg AND re-raise the exception if
|
||||
# the chain does not tolerate exceptions happening in the
|
||||
|
@ -1,2 +1 @@
|
||||
oslo.cfg>=1.1.0
|
||||
ordereddict
|
||||
|
Loading…
x
Reference in New Issue
Block a user