Use rollback accumulator and remove requires()/provides() from being functions

Use a new rollback accumulator to collect which tasks need to be rolled back
and use that in the ordered workflow code. Move the usage of provides/requires
as functions and just let them be attributes of the flow objects.
This commit is contained in:
Joshua Harlow
2013-05-23 16:44:31 -07:00
parent b4f592c524
commit dd9d5c157e
8 changed files with 115 additions and 139 deletions

View File

@@ -30,7 +30,7 @@ class Flow(object):
"""A linear chain of independent tasks that can be applied as one unit or
rolled back as one unit."""
def __init__(self, name, tolerant=False, parents=None):
def __init__(self, name, parents=None):
self.name = name
self.root = None
self._tasks = []

View File

@@ -34,9 +34,8 @@ class Flow(ordered_flow.Flow):
determine who provides said input and order the task so that said providing
task will be ran before."""
def __init__(self, name, tolerant=False, parents=None,
allow_same_inputs=True):
super(Flow, self).__init__(name, tolerant, parents)
def __init__(self, name, parents=None, allow_same_inputs=True):
super(Flow, self).__init__(name, parents)
self._graph = digraph.DiGraph()
self._connected = False
self._allow_same_inputs = allow_same_inputs
@@ -53,10 +52,10 @@ class Flow(ordered_flow.Flow):
def _fetch_task_inputs(self, task):
inputs = collections.defaultdict(list)
for n in task.requires():
for n in task.requires:
for (them, there_result) in self.results:
if (not self._graph.has_edge(them, task) or
not n in them.provides()):
not n in them.provides):
continue
if there_result and n in there_result:
inputs[n].append(there_result[n])
@@ -90,9 +89,9 @@ class Flow(ordered_flow.Flow):
provides_what = collections.defaultdict(list)
requires_what = collections.defaultdict(list)
for t in self._graph.nodes_iter():
for r in t.requires():
for r in t.requires:
requires_what[r].append(t)
for p in t.provides():
for p in t.provides:
provides_what[p].append(t)
def get_providers(node, want_what):

View File

@@ -20,31 +20,21 @@ from taskflow import exceptions as exc
from taskflow.patterns import ordered_flow
def _convert_to_set(items):
if not items:
return set()
if isinstance(items, set):
return items
if isinstance(items, dict):
return items.keys()
return set(iter(items))
class Flow(ordered_flow.Flow):
"""A linear chain of tasks that can be applied as one unit or
rolled back as one unit. Each task in the chain may have requirements
which are satisfied by the previous task/s in the chain."""
def __init__(self, name, tolerant=False, parents=None):
super(Flow, self).__init__(name, tolerant, parents)
def __init__(self, name, parents=None):
super(Flow, self).__init__(name, parents)
self._tasks = []
def _fetch_task_inputs(self, task):
inputs = {}
for r in _convert_to_set(task.requires()):
for r in task.requires:
# Find the last task that provided this.
for (last_task, last_results) in reversed(self.results):
if r not in _convert_to_set(last_task.provides()):
if r not in last_task.provides:
continue
if last_results and r in last_results:
inputs[r] = last_results[r]
@@ -57,10 +47,10 @@ class Flow(ordered_flow.Flow):
def _validate_provides(self, task):
# Ensure that some previous task provides this input.
missing_requires = []
for r in _convert_to_set(task.requires()):
for r in task.requires:
found_provider = False
for prev_task in reversed(self._tasks):
if r in _convert_to_set(prev_task.provides()):
if r in prev_task.provides:
found_provider = True
break
if not found_provider:

View File

@@ -24,6 +24,7 @@ import logging
from taskflow.openstack.common import excutils
from taskflow import exceptions as exc
from taskflow import states
from taskflow import utils
LOG = logging.getLogger(__name__)
@@ -38,6 +39,19 @@ class FlowFailure(object):
self.exception = exception
class RollbackTask(object):
def __init__(self, context, task, result):
self.task = task
self.result = result
self.context = context
def __str__(self):
return str(self.task)
def __call__(self, cause):
self.task.revert(self.context, self.result, cause)
class Flow(object):
"""A set tasks that can be applied as one unit or rolled back as one
unit using an ordered arrangements of said tasks where reversion is by
@@ -45,14 +59,11 @@ class Flow(object):
__metaclass__ = abc.ABCMeta
def __init__(self, name, tolerant=False, parents=None):
def __init__(self, name, parents=None):
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._reversions = []
self._accumulator = utils.RollbackAccumulator()
self.name = name
# If this chain can ignore individual task reversion failure then this
# should be set to true, instead of the default value of false.
self.tolerant = tolerant
# If this flow has a parent flow/s which need to be reverted if
# this flow fails then please include them here to allow this child
# to call the parents...
@@ -100,24 +111,6 @@ class Flow(object):
said task is being applied."""
return None
def _perform_reconcilation(self, context, task, excp):
# Attempt to reconcile the given exception that occured while applying
# the given task and either reconcile said task and its associated
# failure, so that the flow can continue or abort and perform
# some type of undo of the tasks already completed.
cause = FlowFailure(task, self, excp)
with excutils.save_and_reraise_exception():
try:
self._on_task_error(context, task)
except Exception:
LOG.exception("Dropping exception catched when"
" notifying about existing task"
" exception.")
# The default strategy will be to rollback all the contained
# tasks by calling there reverting methods, and then calling
# any parent flows rollbacks (and so-on).
self.rollback(context, cause)
def run(self, context, *args, **kwargs):
if self.state != states.PENDING:
raise exc.InvalidStateException("Unable to run flow when "
@@ -129,7 +122,6 @@ class Flow(object):
result_fetcher = None
self._change_state(context, states.STARTED)
try:
task_order = self.order()
except Exception:
@@ -140,6 +132,29 @@ class Flow(object):
LOG.exception("Dropping exception catched when"
" notifying about ordering failure.")
def run_task(task, result=None, simulate_run=False):
try:
self._on_task_start(context, task)
if not simulate_run:
inputs = self._fetch_task_inputs(task)
if not inputs:
inputs = {}
inputs.update(kwargs)
result = task.apply(context, *args, **inputs)
# Keep a pristine copy of the result
# so that if said result is altered by other further
# states the one here will not be. This ensures that
# if rollback occurs that the task gets exactly the
# result it returned and not a modified one.
self.results.append((task, result))
self._accumulator.add(RollbackTask(context, task,
copy.deepcopy(result)))
self._on_task_finish(context, task, result)
except Exception as e:
cause = FlowFailure(task, self, e)
with excutils.save_and_reraise_exception():
self.rollback(context, cause)
last_task = 0
was_interrupted = False
if result_fetcher:
@@ -152,20 +167,10 @@ class Flow(object):
if not has_result:
break
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that would
# have happened in a normal flow).
# notifications and state changes (and rollback that
# would have happened in a normal flow).
last_task = i + 1
try:
self._on_task_start(context, task)
# Keep a pristine copy of the result
# so that if said result is altered by other further
# states the one here will not be. This ensures that
# if rollback occurs that the task gets exactly the
# result it returned and not a modified one.
self.results.append((task, copy.deepcopy(result)))
self._on_task_finish(context, task, result)
except Exception as e:
self._perform_reconcilation(context, task, e)
run_task(task, result=result, simulate_run=True)
if was_interrupted:
return
@@ -175,27 +180,7 @@ class Flow(object):
if self.state == states.INTERRUPTED:
was_interrupted = True
break
try:
has_result = False
result = None
if result_fetcher:
(has_result, result) = result_fetcher(self, task)
self._on_task_start(context, task)
if not has_result:
inputs = self._fetch_task_inputs(task)
if not inputs:
inputs = {}
inputs.update(kwargs)
result = task.apply(context, *args, **inputs)
# Keep a pristine copy of the result
# so that if said result is altered by other further states
# the one here will not be. This ensures that if rollback
# occurs that the task gets exactly the result it returned
# and not a modified one.
self.results.append((task, copy.deepcopy(result)))
self._on_task_finish(context, task, result)
except Exception as e:
self._perform_reconcilation(context, task, e)
run_task(task)
if not was_interrupted:
# Only gets here if everything went successfully.
@@ -204,7 +189,7 @@ class Flow(object):
def reset(self):
self._state = states.PENDING
self.results = []
self._reversions = []
self._accumulator.reset()
def interrupt(self):
self._change_state(None, states.INTERRUPTED)
@@ -232,7 +217,6 @@ class Flow(object):
def _on_task_finish(self, context, task, result):
# Notify any listeners that we are finishing the given task.
self._reversions.append((task, result))
for f in self.task_listeners:
f(context, states.SUCCESS, self, task, result=result)
@@ -256,27 +240,8 @@ class Flow(object):
" changing state to reverting while performing"
" reconcilation on a tasks exception.")
def rollback_tasks(reversions, tolerant):
for (i, (task, result)) in enumerate(reversions):
try:
task.revert(context, result, cause)
except Exception:
# Ex: WARN: Failed rolling back stage 1 (validate_request) of
# chain validation due to Y exception.
log_f = LOG.warn
if not tolerant:
log_f = LOG.exception
msg = ("Failed rolling back stage %(index)s (%(task)s)"
" of flow %(flow)s, due to inner exception.")
log_f(msg % {'index': (i + 1), 'task': task, 'flow': self})
if not tolerant:
# NOTE(harlowja): LOG a msg AND re-raise the exception
# if the flow does not tolerate exceptions happening
# in the rollback method.
raise
try:
rollback_tasks(reversed(self._reversions), self.tolerant)
self._accumulator.rollback(cause)
finally:
try:
self._change_state(context, states.FAILURE)

View File

@@ -27,24 +27,16 @@ class Task(object):
def __init__(self, name):
self.name = name
# Identifying items that this task requires to apply.
self._requires = set()
# Identifying items that this task provides from its apply.
self._provides = set()
# An *immutable* input 'resource' name set this task depends
# on existing before this task can be applied.
self.requires = set()
# An *immutable* output 'resource' name set this task
# produces that other tasks may depend on this task providing.
self.provides = set()
def __str__(self):
return "Task: %s" % (self.name)
def requires(self):
"""Returns an *immutable* input 'resource' name set this task depends
on existing before this task can be applied."""
return self._requires
def provides(self):
"""Returns an *immutable* output 'resource' name set this task
produces that other tasks may depend on this task providing."""
return self._provides
@abc.abstractmethod
def apply(self, context, *args, **kwargs):
"""Activate a given task which will perform some operation and return.

View File

@@ -37,14 +37,8 @@ def null_functor(*args, **kwargs): # pylint: disable=W0613
class ProvidesRequiresTask(task.Task):
def __init__(self, name, provides, requires):
super(ProvidesRequiresTask, self).__init__(name)
self._provides = provides
self._requires = requires
def requires(self):
return self._requires
def provides(self):
return self._provides
self.provides = provides
self.requires = requires
def apply(self, context, *args, **kwargs):
outs = {
@@ -54,6 +48,6 @@ class ProvidesRequiresTask(task.Task):
if not ORDER_KEY in context:
context[ORDER_KEY] = []
context[ORDER_KEY].append(self.name)
for v in self.provides():
for v in self.provides:
outs[v] = True
return outs

View File

@@ -17,9 +17,12 @@
# under the License.
import contextlib
import logging
import threading
import time
LOG = logging.getLogger(__name__)
def await(check_functor, timeout=None):
if timeout is not None:
@@ -40,6 +43,46 @@ def await(check_functor, timeout=None):
return True
class RollbackAccumulator(object):
"""A utility class that can help in organizing 'undo' like code
so that said code be rolled back on failure (automatically or manually)
by activating rollback callables that were inserted during said codes
progression."""
def __init__(self):
self._rollbacks = []
def add(self, *callables):
self._rollbacks.extend(callables)
def reset(self):
self._rollbacks = []
def __len__(self):
return len(self._rollbacks)
def __iter__(self):
# Rollbacks happen in the reverse order that they were added.
return reversed(self._rollbacks)
def __enter__(self):
return self
def rollback(self, cause):
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
for (i, f) in enumerate(self):
LOG.debug("Calling rollback %s: %s", i + 1, f)
try:
f(cause)
except Exception:
LOG.exception(("Failed rolling back %s: %s due "
"to inner exception."), i + 1, f)
def __exit__(self, type, value, tb):
if any((value, type, tb)):
self.rollback(value)
class ReaderWriterLock(object):
"""A simple reader-writer lock.
@@ -114,6 +157,7 @@ class ReaderWriterLock(object):
self.readers_ok.notifyAll()
self.readers_ok.release()
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""

View File

@@ -34,28 +34,20 @@ class FunctorTask(task.Task):
if not name:
name = "_".join([apply_functor.__name__, revert_functor.__name__])
super(FunctorTask, self).__init__(name)
self._apply_functor = apply_functor
self._revert_functor = revert_functor
self._requires = set()
self._provides = set()
if provides_what:
self._provides.update(provides_what)
self.provides.update(provides_what)
if extract_requires:
for arg_name in inspect.getargspec(apply_functor).args:
# These are automatically given, ignore.
if arg_name in AUTO_ARGS:
continue
self._requires.add(arg_name)
def requires(self):
return set(self._requires)
def provides(self):
return set(self._provides)
self.requires.add(arg_name)
def apply(self, context, *args, **kwargs):
return self._apply_functor(context, *args, **kwargs)
def revert(self, context, result, cause):
return self._revert_functor(context, result, cause)
if self._revert_functor:
self._revert_functor(context, result, cause)