diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py index 72e97424..f2afe7da 100644 --- a/taskflow/patterns/distributed_flow.py +++ b/taskflow/patterns/distributed_flow.py @@ -30,7 +30,7 @@ class Flow(object): """A linear chain of independent tasks that can be applied as one unit or rolled back as one unit.""" - def __init__(self, name, tolerant=False, parents=None): + def __init__(self, name, parents=None): self.name = name self.root = None self._tasks = [] diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index e6992159..df2cc23d 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -34,9 +34,8 @@ class Flow(ordered_flow.Flow): determine who provides said input and order the task so that said providing task will be ran before.""" - def __init__(self, name, tolerant=False, parents=None, - allow_same_inputs=True): - super(Flow, self).__init__(name, tolerant, parents) + def __init__(self, name, parents=None, allow_same_inputs=True): + super(Flow, self).__init__(name, parents) self._graph = digraph.DiGraph() self._connected = False self._allow_same_inputs = allow_same_inputs @@ -53,10 +52,10 @@ class Flow(ordered_flow.Flow): def _fetch_task_inputs(self, task): inputs = collections.defaultdict(list) - for n in task.requires(): + for n in task.requires: for (them, there_result) in self.results: if (not self._graph.has_edge(them, task) or - not n in them.provides()): + not n in them.provides): continue if there_result and n in there_result: inputs[n].append(there_result[n]) @@ -90,9 +89,9 @@ class Flow(ordered_flow.Flow): provides_what = collections.defaultdict(list) requires_what = collections.defaultdict(list) for t in self._graph.nodes_iter(): - for r in t.requires(): + for r in t.requires: requires_what[r].append(t) - for p in t.provides(): + for p in t.provides: provides_what[p].append(t) def get_providers(node, want_what): diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 89cb9301..532dade7 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -20,31 +20,21 @@ from taskflow import exceptions as exc from taskflow.patterns import ordered_flow -def _convert_to_set(items): - if not items: - return set() - if isinstance(items, set): - return items - if isinstance(items, dict): - return items.keys() - return set(iter(items)) - - class Flow(ordered_flow.Flow): """A linear chain of tasks that can be applied as one unit or rolled back as one unit. Each task in the chain may have requirements which are satisfied by the previous task/s in the chain.""" - def __init__(self, name, tolerant=False, parents=None): - super(Flow, self).__init__(name, tolerant, parents) + def __init__(self, name, parents=None): + super(Flow, self).__init__(name, parents) self._tasks = [] def _fetch_task_inputs(self, task): inputs = {} - for r in _convert_to_set(task.requires()): + for r in task.requires: # Find the last task that provided this. for (last_task, last_results) in reversed(self.results): - if r not in _convert_to_set(last_task.provides()): + if r not in last_task.provides: continue if last_results and r in last_results: inputs[r] = last_results[r] @@ -57,10 +47,10 @@ class Flow(ordered_flow.Flow): def _validate_provides(self, task): # Ensure that some previous task provides this input. missing_requires = [] - for r in _convert_to_set(task.requires()): + for r in task.requires: found_provider = False for prev_task in reversed(self._tasks): - if r in _convert_to_set(prev_task.provides()): + if r in prev_task.provides: found_provider = True break if not found_provider: diff --git a/taskflow/patterns/ordered_flow.py b/taskflow/patterns/ordered_flow.py index 1c4408f1..9b4bc087 100644 --- a/taskflow/patterns/ordered_flow.py +++ b/taskflow/patterns/ordered_flow.py @@ -24,6 +24,7 @@ import logging from taskflow.openstack.common import excutils from taskflow import exceptions as exc from taskflow import states +from taskflow import utils LOG = logging.getLogger(__name__) @@ -38,6 +39,19 @@ class FlowFailure(object): self.exception = exception +class RollbackTask(object): + def __init__(self, context, task, result): + self.task = task + self.result = result + self.context = context + + def __str__(self): + return str(self.task) + + def __call__(self, cause): + self.task.revert(self.context, self.result, cause) + + class Flow(object): """A set tasks that can be applied as one unit or rolled back as one unit using an ordered arrangements of said tasks where reversion is by @@ -45,14 +59,11 @@ class Flow(object): __metaclass__ = abc.ABCMeta - def __init__(self, name, tolerant=False, parents=None): + def __init__(self, name, parents=None): # The tasks which have been applied will be collected here so that they # can be reverted in the correct order on failure. - self._reversions = [] + self._accumulator = utils.RollbackAccumulator() self.name = name - # If this chain can ignore individual task reversion failure then this - # should be set to true, instead of the default value of false. - self.tolerant = tolerant # If this flow has a parent flow/s which need to be reverted if # this flow fails then please include them here to allow this child # to call the parents... @@ -100,24 +111,6 @@ class Flow(object): said task is being applied.""" return None - def _perform_reconcilation(self, context, task, excp): - # Attempt to reconcile the given exception that occured while applying - # the given task and either reconcile said task and its associated - # failure, so that the flow can continue or abort and perform - # some type of undo of the tasks already completed. - cause = FlowFailure(task, self, excp) - with excutils.save_and_reraise_exception(): - try: - self._on_task_error(context, task) - except Exception: - LOG.exception("Dropping exception catched when" - " notifying about existing task" - " exception.") - # The default strategy will be to rollback all the contained - # tasks by calling there reverting methods, and then calling - # any parent flows rollbacks (and so-on). - self.rollback(context, cause) - def run(self, context, *args, **kwargs): if self.state != states.PENDING: raise exc.InvalidStateException("Unable to run flow when " @@ -129,7 +122,6 @@ class Flow(object): result_fetcher = None self._change_state(context, states.STARTED) - try: task_order = self.order() except Exception: @@ -140,6 +132,29 @@ class Flow(object): LOG.exception("Dropping exception catched when" " notifying about ordering failure.") + def run_task(task, result=None, simulate_run=False): + try: + self._on_task_start(context, task) + if not simulate_run: + inputs = self._fetch_task_inputs(task) + if not inputs: + inputs = {} + inputs.update(kwargs) + result = task.apply(context, *args, **inputs) + # Keep a pristine copy of the result + # so that if said result is altered by other further + # states the one here will not be. This ensures that + # if rollback occurs that the task gets exactly the + # result it returned and not a modified one. + self.results.append((task, result)) + self._accumulator.add(RollbackTask(context, task, + copy.deepcopy(result))) + self._on_task_finish(context, task, result) + except Exception as e: + cause = FlowFailure(task, self, e) + with excutils.save_and_reraise_exception(): + self.rollback(context, cause) + last_task = 0 was_interrupted = False if result_fetcher: @@ -152,20 +167,10 @@ class Flow(object): if not has_result: break # Fake running the task so that we trigger the same - # notifications and state changes (and rollback that would - # have happened in a normal flow). + # notifications and state changes (and rollback that + # would have happened in a normal flow). last_task = i + 1 - try: - self._on_task_start(context, task) - # Keep a pristine copy of the result - # so that if said result is altered by other further - # states the one here will not be. This ensures that - # if rollback occurs that the task gets exactly the - # result it returned and not a modified one. - self.results.append((task, copy.deepcopy(result))) - self._on_task_finish(context, task, result) - except Exception as e: - self._perform_reconcilation(context, task, e) + run_task(task, result=result, simulate_run=True) if was_interrupted: return @@ -175,27 +180,7 @@ class Flow(object): if self.state == states.INTERRUPTED: was_interrupted = True break - try: - has_result = False - result = None - if result_fetcher: - (has_result, result) = result_fetcher(self, task) - self._on_task_start(context, task) - if not has_result: - inputs = self._fetch_task_inputs(task) - if not inputs: - inputs = {} - inputs.update(kwargs) - result = task.apply(context, *args, **inputs) - # Keep a pristine copy of the result - # so that if said result is altered by other further states - # the one here will not be. This ensures that if rollback - # occurs that the task gets exactly the result it returned - # and not a modified one. - self.results.append((task, copy.deepcopy(result))) - self._on_task_finish(context, task, result) - except Exception as e: - self._perform_reconcilation(context, task, e) + run_task(task) if not was_interrupted: # Only gets here if everything went successfully. @@ -204,7 +189,7 @@ class Flow(object): def reset(self): self._state = states.PENDING self.results = [] - self._reversions = [] + self._accumulator.reset() def interrupt(self): self._change_state(None, states.INTERRUPTED) @@ -232,7 +217,6 @@ class Flow(object): def _on_task_finish(self, context, task, result): # Notify any listeners that we are finishing the given task. - self._reversions.append((task, result)) for f in self.task_listeners: f(context, states.SUCCESS, self, task, result=result) @@ -256,27 +240,8 @@ class Flow(object): " changing state to reverting while performing" " reconcilation on a tasks exception.") - def rollback_tasks(reversions, tolerant): - for (i, (task, result)) in enumerate(reversions): - try: - task.revert(context, result, cause) - except Exception: - # Ex: WARN: Failed rolling back stage 1 (validate_request) of - # chain validation due to Y exception. - log_f = LOG.warn - if not tolerant: - log_f = LOG.exception - msg = ("Failed rolling back stage %(index)s (%(task)s)" - " of flow %(flow)s, due to inner exception.") - log_f(msg % {'index': (i + 1), 'task': task, 'flow': self}) - if not tolerant: - # NOTE(harlowja): LOG a msg AND re-raise the exception - # if the flow does not tolerate exceptions happening - # in the rollback method. - raise - try: - rollback_tasks(reversed(self._reversions), self.tolerant) + self._accumulator.rollback(cause) finally: try: self._change_state(context, states.FAILURE) diff --git a/taskflow/task.py b/taskflow/task.py index 32460ffc..70fa6915 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -27,24 +27,16 @@ class Task(object): def __init__(self, name): self.name = name - # Identifying items that this task requires to apply. - self._requires = set() - # Identifying items that this task provides from its apply. - self._provides = set() + # An *immutable* input 'resource' name set this task depends + # on existing before this task can be applied. + self.requires = set() + # An *immutable* output 'resource' name set this task + # produces that other tasks may depend on this task providing. + self.provides = set() def __str__(self): return "Task: %s" % (self.name) - def requires(self): - """Returns an *immutable* input 'resource' name set this task depends - on existing before this task can be applied.""" - return self._requires - - def provides(self): - """Returns an *immutable* output 'resource' name set this task - produces that other tasks may depend on this task providing.""" - return self._provides - @abc.abstractmethod def apply(self, context, *args, **kwargs): """Activate a given task which will perform some operation and return. diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 4f56e9fe..ffe424dc 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -37,14 +37,8 @@ def null_functor(*args, **kwargs): # pylint: disable=W0613 class ProvidesRequiresTask(task.Task): def __init__(self, name, provides, requires): super(ProvidesRequiresTask, self).__init__(name) - self._provides = provides - self._requires = requires - - def requires(self): - return self._requires - - def provides(self): - return self._provides + self.provides = provides + self.requires = requires def apply(self, context, *args, **kwargs): outs = { @@ -54,6 +48,6 @@ class ProvidesRequiresTask(task.Task): if not ORDER_KEY in context: context[ORDER_KEY] = [] context[ORDER_KEY].append(self.name) - for v in self.provides(): + for v in self.provides: outs[v] = True return outs diff --git a/taskflow/utils.py b/taskflow/utils.py index 37cc65d4..730bc817 100644 --- a/taskflow/utils.py +++ b/taskflow/utils.py @@ -17,9 +17,12 @@ # under the License. import contextlib +import logging import threading import time +LOG = logging.getLogger(__name__) + def await(check_functor, timeout=None): if timeout is not None: @@ -40,6 +43,46 @@ def await(check_functor, timeout=None): return True +class RollbackAccumulator(object): + """A utility class that can help in organizing 'undo' like code + so that said code be rolled back on failure (automatically or manually) + by activating rollback callables that were inserted during said codes + progression.""" + + def __init__(self): + self._rollbacks = [] + + def add(self, *callables): + self._rollbacks.extend(callables) + + def reset(self): + self._rollbacks = [] + + def __len__(self): + return len(self._rollbacks) + + def __iter__(self): + # Rollbacks happen in the reverse order that they were added. + return reversed(self._rollbacks) + + def __enter__(self): + return self + + def rollback(self, cause): + LOG.warn("Activating %s rollbacks due to %s.", len(self), cause) + for (i, f) in enumerate(self): + LOG.debug("Calling rollback %s: %s", i + 1, f) + try: + f(cause) + except Exception: + LOG.exception(("Failed rolling back %s: %s due " + "to inner exception."), i + 1, f) + + def __exit__(self, type, value, tb): + if any((value, type, tb)): + self.rollback(value) + + class ReaderWriterLock(object): """A simple reader-writer lock. @@ -114,6 +157,7 @@ class ReaderWriterLock(object): self.readers_ok.notifyAll() self.readers_ok.release() + class LazyPluggable(object): """A pluggable backend loaded lazily based on some value.""" diff --git a/taskflow/wrappers.py b/taskflow/wrappers.py index ba9d29b2..c97e9e98 100644 --- a/taskflow/wrappers.py +++ b/taskflow/wrappers.py @@ -34,28 +34,20 @@ class FunctorTask(task.Task): if not name: name = "_".join([apply_functor.__name__, revert_functor.__name__]) super(FunctorTask, self).__init__(name) - self._apply_functor = apply_functor self._revert_functor = revert_functor - self._requires = set() - self._provides = set() if provides_what: - self._provides.update(provides_what) + self.provides.update(provides_what) if extract_requires: for arg_name in inspect.getargspec(apply_functor).args: # These are automatically given, ignore. if arg_name in AUTO_ARGS: continue - self._requires.add(arg_name) - - def requires(self): - return set(self._requires) - - def provides(self): - return set(self._provides) + self.requires.add(arg_name) def apply(self, context, *args, **kwargs): return self._apply_functor(context, *args, **kwargs) def revert(self, context, result, cause): - return self._revert_functor(context, result, cause) + if self._revert_functor: + self._revert_functor(context, result, cause)