In cases where it is not desired to reset the whole state of the workflow, but only to soft_reset it so that it can be easily resumed from a interrupted state (for example). This change allows for that to happen, which involves also keeping track of the last task that ran so when resumed via soft_reset that task can be the one started off from (instead of starting from the start of all tasks). Change-Id: I034bd6af6445e3da52356328605368319c4ff6f9
		
			
				
	
	
		
			314 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			314 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# -*- coding: utf-8 -*-
 | 
						|
 | 
						|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 | 
						|
 | 
						|
#    Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
 | 
						|
#
 | 
						|
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#    not use this file except in compliance with the License. You may obtain
 | 
						|
#    a copy of the License at
 | 
						|
#
 | 
						|
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#    Unless required by applicable law or agreed to in writing, software
 | 
						|
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#    License for the specific language governing permissions and limitations
 | 
						|
#    under the License.
 | 
						|
 | 
						|
import abc
 | 
						|
import collections
 | 
						|
import copy
 | 
						|
import functools
 | 
						|
import logging
 | 
						|
import sys
 | 
						|
 | 
						|
from taskflow.openstack.common import excutils
 | 
						|
from taskflow import exceptions as exc
 | 
						|
from taskflow import states
 | 
						|
from taskflow import utils
 | 
						|
 | 
						|
LOG = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
class FlowFailure(object):
 | 
						|
    """When a task failure occurs the following object will be given to revert
 | 
						|
       and can be used to interrogate what caused the failure."""
 | 
						|
 | 
						|
    def __init__(self, task, flow, exception):
 | 
						|
        self.task = task
 | 
						|
        self.flow = flow
 | 
						|
        self.exc = exception
 | 
						|
        self.exc_info = sys.exc_info()
 | 
						|
 | 
						|
 | 
						|
class RollbackTask(object):
 | 
						|
    def __init__(self, context, task, result):
 | 
						|
        self.task = task
 | 
						|
        self.result = result
 | 
						|
        self.context = context
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        return str(self.task)
 | 
						|
 | 
						|
    def __call__(self, cause):
 | 
						|
        if (hasattr(self.task, "revert") and
 | 
						|
            isinstance(self.task.revert, collections.Callable)):
 | 
						|
            self.task.revert(self.context, self.result, cause)
 | 
						|
 | 
						|
 | 
						|
class Flow(object):
 | 
						|
    """A set tasks that can be applied as one unit or rolled back as one
 | 
						|
    unit using an ordered arrangements of said tasks where reversion is by
 | 
						|
    default handled by reversing through the tasks applied."""
 | 
						|
 | 
						|
    __metaclass__ = abc.ABCMeta
 | 
						|
 | 
						|
    def __init__(self, name, parents=None):
 | 
						|
        # The tasks which have been applied will be collected here so that they
 | 
						|
        # can be reverted in the correct order on failure.
 | 
						|
        self._accumulator = utils.RollbackAccumulator()
 | 
						|
        self.name = name
 | 
						|
        # If this flow has a parent flow/s which need to be reverted if
 | 
						|
        # this flow fails then please include them here to allow this child
 | 
						|
        # to call the parents...
 | 
						|
        self.parents = parents
 | 
						|
        # This should be a functor that returns whether a given task has
 | 
						|
        # already ran by returning a pair of (has_result, was_error, result).
 | 
						|
        #
 | 
						|
        # NOTE(harlowja): This allows for resumption by skipping tasks which
 | 
						|
        # have already occurred. The previous return value is needed due to
 | 
						|
        # the contract we have with tasks that they will be given the value
 | 
						|
        # they returned if reversion is triggered.
 | 
						|
        self.result_fetcher = None
 | 
						|
        # Any objects that want to listen when a wf/task starts/stops/completes
 | 
						|
        # or errors should be registered here. This can be used to monitor
 | 
						|
        # progress and record tasks finishing (so that it becomes possible to
 | 
						|
        # store the result of a task in some persistent or semi-persistent
 | 
						|
        # storage backend).
 | 
						|
        self.task_listeners = []
 | 
						|
        self.listeners = []
 | 
						|
        # The state of this flow.
 | 
						|
        self._state = states.PENDING
 | 
						|
        # Tasks results are stored here...
 | 
						|
        self.results = []
 | 
						|
        # The last task index in the order we left off at before being
 | 
						|
        # interrupted (or failing).
 | 
						|
        self._left_off_at = 0
 | 
						|
 | 
						|
    @property
 | 
						|
    def state(self):
 | 
						|
        return self._state
 | 
						|
 | 
						|
    @abc.abstractmethod
 | 
						|
    def add(self, task):
 | 
						|
        """Adds a given task to this flow."""
 | 
						|
        raise NotImplementedError()
 | 
						|
 | 
						|
    def __str__(self):
 | 
						|
        return "Flow: %s" % (self.name)
 | 
						|
 | 
						|
    @abc.abstractmethod
 | 
						|
    def order(self):
 | 
						|
        """Returns the order in which the tasks should be ran
 | 
						|
        as a iterable list."""
 | 
						|
        raise NotImplementedError()
 | 
						|
 | 
						|
    def _fetch_task_inputs(self, _task):
 | 
						|
        """Retrieves and additional kwargs inputs to provide to the task when
 | 
						|
        said task is being applied."""
 | 
						|
        return None
 | 
						|
 | 
						|
    def run(self, context, *args, **kwargs):
 | 
						|
        if self.state != states.PENDING:
 | 
						|
            raise exc.InvalidStateException("Unable to run flow when "
 | 
						|
                                            "in state %s" % (self.state))
 | 
						|
 | 
						|
        if self.result_fetcher:
 | 
						|
            result_fetcher = functools.partial(self.result_fetcher, context)
 | 
						|
        else:
 | 
						|
            result_fetcher = None
 | 
						|
 | 
						|
        self._change_state(context, states.STARTED)
 | 
						|
        try:
 | 
						|
            task_order = self.order()
 | 
						|
            if self._left_off_at > 0:
 | 
						|
                task_order = task_order[self._left_off_at:]
 | 
						|
        except Exception:
 | 
						|
            with excutils.save_and_reraise_exception():
 | 
						|
                try:
 | 
						|
                    self._change_state(context, states.FAILURE)
 | 
						|
                except Exception:
 | 
						|
                    LOG.exception("Dropping exception catched when"
 | 
						|
                                  " notifying about ordering failure.")
 | 
						|
 | 
						|
        def run_task(task, failed=False, result=None, simulate_run=False):
 | 
						|
            try:
 | 
						|
                self._on_task_start(context, task)
 | 
						|
                if not simulate_run:
 | 
						|
                    inputs = self._fetch_task_inputs(task)
 | 
						|
                    if not inputs:
 | 
						|
                        inputs = {}
 | 
						|
                    inputs.update(kwargs)
 | 
						|
                    result = task(context, *args, **inputs)
 | 
						|
                else:
 | 
						|
                    if failed:
 | 
						|
                        if not result:
 | 
						|
                            # If no exception or exception message was provided
 | 
						|
                            # or captured from the previous run then we need to
 | 
						|
                            # form one for this task.
 | 
						|
                            result = "%s failed running." % (task)
 | 
						|
                        if isinstance(result, basestring):
 | 
						|
                            result = exc.InvalidStateException(result)
 | 
						|
                        if not isinstance(result, Exception):
 | 
						|
                            LOG.warn("Can not raise a non-exception"
 | 
						|
                                     " object: %s", result)
 | 
						|
                            result = exc.InvalidStateException()
 | 
						|
                        raise result
 | 
						|
                # Alter the index we have ran at.
 | 
						|
                self._left_off_at += 1
 | 
						|
                # Keep a pristine copy of the result
 | 
						|
                # so that if said result is altered by other further
 | 
						|
                # states the one here will not be. This ensures that
 | 
						|
                # if rollback occurs that the task gets exactly the
 | 
						|
                # result it returned and not a modified one.
 | 
						|
                self.results.append((task, result))
 | 
						|
                # Add the task result to the accumulator before
 | 
						|
                # notifying others that the task has finished to
 | 
						|
                # avoid the case where a listener might throw an
 | 
						|
                # exception.
 | 
						|
                self._accumulator.add(RollbackTask(context, task,
 | 
						|
                                                   copy.deepcopy(result)))
 | 
						|
                self._on_task_finish(context, task, result)
 | 
						|
            except Exception as e:
 | 
						|
                cause = FlowFailure(task, self, e)
 | 
						|
                with excutils.save_and_reraise_exception():
 | 
						|
                    try:
 | 
						|
                        self._on_task_error(context, task, e)
 | 
						|
                    except Exception:
 | 
						|
                        LOG.exception("Dropping exception catched when"
 | 
						|
                                      " notifying about task failure.")
 | 
						|
                    self.rollback(context, cause)
 | 
						|
 | 
						|
        last_task = 0
 | 
						|
        was_interrupted = False
 | 
						|
        if result_fetcher:
 | 
						|
            self._change_state(context, states.RESUMING)
 | 
						|
            for (i, task) in enumerate(task_order):
 | 
						|
                if self.state == states.INTERRUPTED:
 | 
						|
                    was_interrupted = True
 | 
						|
                    break
 | 
						|
                (has_result, was_error, result) = result_fetcher(self, task)
 | 
						|
                if not has_result:
 | 
						|
                    break
 | 
						|
                # Fake running the task so that we trigger the same
 | 
						|
                # notifications and state changes (and rollback that
 | 
						|
                # would have happened in a normal flow).
 | 
						|
                last_task = i + 1
 | 
						|
                run_task(task, failed=was_error, result=result,
 | 
						|
                         simulate_run=True)
 | 
						|
 | 
						|
        if was_interrupted:
 | 
						|
            return
 | 
						|
 | 
						|
        self._change_state(context, states.RUNNING)
 | 
						|
        for task in task_order[last_task:]:
 | 
						|
            if self.state == states.INTERRUPTED:
 | 
						|
                was_interrupted = True
 | 
						|
                break
 | 
						|
            run_task(task)
 | 
						|
 | 
						|
        if not was_interrupted:
 | 
						|
            # Only gets here if everything went successfully.
 | 
						|
            self._change_state(context, states.SUCCESS)
 | 
						|
 | 
						|
    def reset(self):
 | 
						|
        # Reset (hard) alters the local state and does clear out other member
 | 
						|
        # variables state.
 | 
						|
        if self.state not in (states.INTERRUPTED, states.SUCCESS,
 | 
						|
                              states.FAILURE, states.PENDING):
 | 
						|
            raise exc.InvalidStateException(("Can not reset when"
 | 
						|
                                             " in state %s") % (self.state))
 | 
						|
        self._state = states.PENDING
 | 
						|
        self.results = []
 | 
						|
        self.task_listeners = []
 | 
						|
        self.listeners = []
 | 
						|
        self.result_fetcher = None
 | 
						|
        self._accumulator.reset()
 | 
						|
        self._left_off_at = 0
 | 
						|
 | 
						|
    def soft_reset(self):
 | 
						|
        # Soft reset only alters the local state and does not clear out any
 | 
						|
        # other member variables state.
 | 
						|
        if self.state not in (states.INTERRUPTED, states.SUCCESS,
 | 
						|
                              states.PENDING):
 | 
						|
            raise exc.InvalidStateException(("Can not soft_reset when"
 | 
						|
                                             " in state %s") % (self.state))
 | 
						|
        self._state = states.PENDING
 | 
						|
 | 
						|
    def interrupt(self):
 | 
						|
        if self.state in (states.FAILURE, states.SUCCESS, states.PENDING):
 | 
						|
            raise exc.InvalidStateException(("Can not interrupt when"
 | 
						|
                                             " in state %s") % (self.state))
 | 
						|
        self._change_state(None, states.INTERRUPTED)
 | 
						|
 | 
						|
    def _change_state(self, context, new_state):
 | 
						|
        if self.state != new_state:
 | 
						|
            old_state = self.state
 | 
						|
            self._state = new_state
 | 
						|
            self._on_flow_state_change(context, old_state)
 | 
						|
 | 
						|
    def _on_flow_state_change(self, context, old_state):
 | 
						|
        # Notify any listeners that the internal state has changed.
 | 
						|
        for f in self.listeners:
 | 
						|
            f(context, self, old_state)
 | 
						|
 | 
						|
    def _on_task_error(self, context, task, exc):
 | 
						|
        # Notify any listeners that the task has errored.
 | 
						|
        for f in self.task_listeners:
 | 
						|
            f(context, states.FAILURE, self, task, result=exc)
 | 
						|
 | 
						|
    def _on_task_start(self, context, task):
 | 
						|
        # Notify any listeners that we are about to start the given task.
 | 
						|
        for f in self.task_listeners:
 | 
						|
            f(context, states.STARTED, self, task)
 | 
						|
 | 
						|
    def _on_task_finish(self, context, task, result):
 | 
						|
        # Notify any listeners that we are finishing the given task.
 | 
						|
        for f in self.task_listeners:
 | 
						|
            f(context, states.SUCCESS, self, task, result=result)
 | 
						|
 | 
						|
    def rollback(self, context, cause):
 | 
						|
        # Performs basic task by task rollback by going through the reverse
 | 
						|
        # order that tasks have finished and asking said task to undo whatever
 | 
						|
        # it has done. If this flow has any parent flows then they will
 | 
						|
        # also be called to rollback any tasks said parents contain.
 | 
						|
        #
 | 
						|
        # Note(harlowja): if a flow can more simply revert a whole set of
 | 
						|
        # tasks via a simpler command then it can override this method to
 | 
						|
        # accomplish that.
 | 
						|
        #
 | 
						|
        # For example, if each task was creating a file in a directory, then
 | 
						|
        # it's easier to just remove the directory than to ask each task to
 | 
						|
        # delete its file individually.
 | 
						|
        try:
 | 
						|
            self._change_state(context, states.REVERTING)
 | 
						|
        except Exception:
 | 
						|
            LOG.exception("Dropping exception catched when"
 | 
						|
                          " changing state to reverting while performing"
 | 
						|
                          " reconcilation on a tasks exception.")
 | 
						|
 | 
						|
        try:
 | 
						|
            self._accumulator.rollback(cause)
 | 
						|
        finally:
 | 
						|
            try:
 | 
						|
                self._change_state(context, states.FAILURE)
 | 
						|
            except Exception:
 | 
						|
                LOG.exception("Dropping exception catched when"
 | 
						|
                              " changing state to failure while performing"
 | 
						|
                              " reconcilation on a tasks exception.")
 | 
						|
        if self.parents:
 | 
						|
            # Rollback any parents flows if they exist...
 | 
						|
            for p in self.parents:
 | 
						|
                p.rollback(context, cause)
 |