Cleaning up various components.

1. In order to aid future features its useful to split
split apart the linear_flow and have a good base
class that is not connected to a ordered or linear flow.
2. Have the graph flow inherit from the linear flow since
it is performing a linear run using a topological ordering
of the tasks.

Change-Id: I287cd4a666caf09ca3a4cdf1d53814e9c6feb765
This commit is contained in:
Joshua Harlow
2013-06-27 22:45:54 -07:00
committed by Joshua Harlow
parent 2c333387ba
commit 2d5f90fa24
6 changed files with 393 additions and 337 deletions
+146
View File
@@ -0,0 +1,146 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from taskflow import exceptions as exc
from taskflow import states
class Flow(object):
"""The base abstract class of all flow implementations."""
__metaclass__ = abc.ABCMeta
RESETTABLE_STATES = set([
states.INTERRUPTED,
states.SUCCESS,
states.PENDING,
states.FAILURE,
])
SOFT_RESETTABLE_STATES = set([
states.INTERRUPTED,
])
UNINTERRUPTIBLE_STATES = set([
states.FAILURE,
states.SUCCESS,
states.PENDING,
])
RUNNABLE_STATES = set([
states.PENDING,
])
def __init__(self, name, parents=None):
self.name = name
# The state of this flow.
self._state = states.PENDING
# If this flow has a parent flow/s which need to be reverted if
# this flow fails then please include them here to allow this child
# to call the parents...
self.parents = parents
# Any objects that want to listen when a wf/task starts/stops/completes
# or errors should be registered here. This can be used to monitor
# progress and record tasks finishing and other advanced features that
# can be implemented when you can track a flows progress.
self.task_listeners = []
self.listeners = []
@property
def state(self):
"""Provides a read-only view of the flow state."""
return self._state
def _change_state(self, context, new_state):
if self.state != new_state:
old_state = self.state
self._state = new_state
self._on_flow_state_change(context, old_state)
def __str__(self):
return "Flow: %s" % (self.name)
def _on_flow_state_change(self, context, old_state):
# Notify any listeners that the internal state has changed.
for f in self.listeners:
f(context, self, old_state)
def _on_task_error(self, context, task, cause):
# Notify any listeners that the task has errored.
for f in self.task_listeners:
f(context, states.FAILURE, self, task, result=cause)
def _on_task_start(self, context, task):
# Notify any listeners that we are about to start the given task.
for f in self.task_listeners:
f(context, states.STARTED, self, task)
def _on_task_finish(self, context, task, result):
# Notify any listeners that we are finishing the given task.
for f in self.task_listeners:
f(context, states.SUCCESS, self, task, result=result)
@abc.abstractmethod
def add(self, task):
"""Adds a given task to this flow."""
raise NotImplementedError()
@abc.abstractmethod
def add_many(self, tasks):
"""Adds many tasks to this flow."""
raise NotImplementedError()
def interrupt(self):
"""Attempts to interrupt the current flow and any tasks that are
currently not running in the flow.
Returns how many tasks were interrupted (if any).
"""
if self.state in self.UNINTERRUPTIBLE_STATES:
raise exc.InvalidStateException(("Can not interrupt when"
" in state %s") % (self.state))
self._change_state(None, states.INTERRUPTED)
def reset(self):
"""Fully resets the internal state of this flow, allowing for the flow
to be ran again. *Listeners are also reset*"""
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % (self.state))
self.task_listeners = []
self.listeners = []
self._change_state(None, states.PENDING)
def soft_reset(self):
"""Partially resets the internal state of this flow, allowing for the
flow to be ran again from an interrupted state *only*"""
if self.state not in self.SOFT_RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not soft reset when"
" in state %s") % (self.state))
self._change_state(None, states.PENDING)
def run(self, context, *args, **kwargs):
"""Executes the workflow."""
if self.state not in self.RUNNABLE_STATES:
raise exc.InvalidStateException("Unable to run flow when "
"in state %s" % (self.state))
@abc.abstractmethod
def rollback(self, context, cause):
"""Performs rollback of this workflow and any attached parent workflows
if present."""
raise NotImplementedError()
+26 -14
View File
@@ -24,16 +24,16 @@ from networkx.classes import digraph
from networkx import exception as g_exc
from taskflow import exceptions as exc
from taskflow.patterns import ordered_flow
from taskflow.patterns import linear_flow
from taskflow import utils
LOG = logging.getLogger(__name__)
class Flow(ordered_flow.Flow):
"""A flow which will analyze the attached tasks input requirements and
determine who provides said input and order the task so that said providing
task will be ran before."""
class Flow(linear_flow.Flow):
"""A extension of the linear flow which will run the associated tasks in
a linear topological ordering (and reverse using the same linear
topological order)"""
def __init__(self, name, parents=None, allow_same_inputs=True):
super(Flow, self).__init__(name, parents)
@@ -42,14 +42,26 @@ class Flow(ordered_flow.Flow):
self._allow_same_inputs = allow_same_inputs
def add(self, task):
# Do something with the task, either store it for later
# or add it to the graph right now...
#
# Only insert the node to start, connect all the edges
# together later after all nodes have been added.
# together later after all nodes have been added since if we try
# to infer the edges at this stage we likely will fail finding
# dependencies from nodes that don't exist.
assert isinstance(task, collections.Callable)
self._graph.add_node(task)
self._connected = False
if not self._graph.has_node(task):
self._graph.add_node(task)
self._connected = False
def add_many(self, tasks):
for t in tasks:
self.add(t)
def __str__(self):
lines = ["GraphFlow: %s" % (self.name)]
lines.append(" Number of tasks: %s" % (self._graph.number_of_nodes()))
lines.append(" Number of dependencies: %s"
% (self._graph.number_of_edges()))
lines.append(" State: %s" % (self.state))
return "\n".join(lines)
def _fetch_task_inputs(self, task):
@@ -87,8 +99,8 @@ class Flow(ordered_flow.Flow):
return dict(map(collapse_functor, task_inputs.iteritems()))
def order(self):
self.connect()
def _ordering(self):
self._connect()
try:
return dag.topological_sort(self._graph)
except g_exc.NetworkXUnfeasible:
@@ -97,7 +109,7 @@ class Flow(ordered_flow.Flow):
"flow which will satisfy the "
"tasks needed inputs and outputs.")
def connect(self):
def _connect(self):
"""Connects the nodes & edges of the graph together."""
if self._connected or len(self._graph) == 0:
return
+219 -25
View File
@@ -17,38 +17,59 @@
# under the License.
import collections
import copy
import functools
import logging
from taskflow.openstack.common import excutils
from taskflow import exceptions as exc
from taskflow.patterns import ordered_flow
from taskflow import states
from taskflow import utils
from taskflow.patterns import base
class Flow(ordered_flow.Flow):
"""A linear chain of tasks that can be applied as one unit or
rolled back as one unit. Each task in the chain may have requirements
LOG = logging.getLogger(__name__)
class Flow(base.Flow):
""""A linear chain of tasks that can be applied in order as one unit and
rolled back as one unit using the reverse order that the tasks have
been applied in.
Note(harlowja): Each task in the chain must have requirements
which are satisfied by the previous task/s in the chain."""
def __init__(self, name, parents=None):
super(Flow, self).__init__(name, parents)
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._accumulator = utils.RollbackAccumulator()
# This should be a functor that returns whether a given task has
# already ran by returning a pair of (has_result, was_error, result).
#
# NOTE(harlowja): This allows for resumption by skipping tasks which
# have already occurred. The previous return value is needed due to
# the contract we have with tasks that they will be given the value
# they returned if reversion is triggered.
self.result_fetcher = None
# Tasks results are stored here...
self.results = []
# The last task index in the order we left off at before being
# interrupted (or failing).
self._left_off_at = 0
# All tasks to run are collected here.
self._tasks = []
def _fetch_task_inputs(self, task):
would_like = set(utils.get_attr(task, 'requires', []))
would_like.update(utils.get_attr(task, 'optional', []))
def add_many(self, tasks):
for t in tasks:
self.add(t)
inputs = {}
for n in would_like:
# Find the last task that provided this.
for (last_task, last_results) in reversed(self.results):
if n not in utils.get_attr(last_task, 'provides', []):
continue
if last_results and n in last_results:
inputs[n] = last_results[n]
else:
inputs[n] = None
# Some task said they had it, get the next requirement.
break
return inputs
def add(self, task):
"""Adds a given task to this flow."""
assert isinstance(task, collections.Callable)
self._validate_provides(task)
self._tasks.append(task)
def _validate_provides(self, task):
# Ensure that some previous task provides this input.
@@ -68,10 +89,183 @@ class Flow(ordered_flow.Flow):
" for %s to correctly execute.") % (missing_requires, task)
raise exc.InvalidStateException(msg)
def add(self, task):
assert isinstance(task, collections.Callable)
self._validate_provides(task)
self._tasks.append(task)
def __str__(self):
lines = ["LinearFlow: %s" % (self.name)]
lines.append(" Number of tasks: %s" % (len(self._tasks)))
lines.append(" Last index: %s" % (self._left_off_at))
lines.append(" State: %s" % (self.state))
return "\n".join(lines)
def order(self):
def _ordering(self):
return list(self._tasks)
def _fetch_task_inputs(self, task):
"""Retrieves and additional kwargs inputs to provide to the task when
said task is being applied."""
would_like = set(utils.get_attr(task, 'requires', []))
would_like.update(utils.get_attr(task, 'optional', []))
inputs = {}
for n in would_like:
# Find the last task that provided this.
for (last_task, last_results) in reversed(self.results):
if n not in utils.get_attr(last_task, 'provides', []):
continue
if last_results and n in last_results:
inputs[n] = last_results[n]
else:
inputs[n] = None
# Some task said they had it, get the next requirement.
break
return inputs
def run(self, context, *args, **kwargs):
super(Flow, self).run(context, *args, **kwargs)
if self.result_fetcher:
result_fetcher = functools.partial(self.result_fetcher, context)
else:
result_fetcher = None
self._change_state(context, states.STARTED)
try:
task_order = self._ordering()
if self._left_off_at > 0:
task_order = task_order[self._left_off_at:]
except Exception:
with excutils.save_and_reraise_exception():
try:
self._change_state(context, states.FAILURE)
except Exception:
LOG.exception("Dropping exception catched when"
" notifying about ordering failure.")
def run_task(task, failed=False, result=None, simulate_run=False):
try:
self._on_task_start(context, task)
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
rb = utils.RollbackTask(context, task, result=None)
self._accumulator.add(rb)
if not simulate_run:
inputs = self._fetch_task_inputs(task)
if not inputs:
inputs = {}
inputs.update(kwargs)
result = task(context, *args, **inputs)
else:
if failed:
if not result:
# If no exception or exception message was provided
# or captured from the previous run then we need to
# form one for this task.
result = "%s failed running." % (task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
# Adjust the task result in the accumulator before
# notifying others that the task has finished to
# avoid the case where a listener might throw an
# exception.
#
# Note(harlowja): Keep the original result in the
# accumulator only and give a duplicated copy to
# avoid the original result being altered by other
# tasks.
#
# This is due to python being by reference (which means
# some task could alter this result intentionally or not
# intentionally).
rb.result = result
# Alter the index we have ran at.
self._left_off_at += 1
result_copy = copy.deepcopy(result)
self.results.append((task, result_copy))
self._on_task_finish(context, task, result_copy)
except Exception as e:
cause = utils.FlowFailure(task, self, e)
with excutils.save_and_reraise_exception():
try:
self._on_task_error(context, task, e)
except Exception:
LOG.exception("Dropping exception catched when"
" notifying about task failure.")
self.rollback(context, cause)
last_task = 0
was_interrupted = False
if result_fetcher:
self._change_state(context, states.RESUMING)
for (i, task) in enumerate(task_order):
if self.state == states.INTERRUPTED:
was_interrupted = True
break
(has_result, was_error, result) = result_fetcher(self, task)
if not has_result:
break
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that
# would have happened in a normal flow).
last_task = i + 1
run_task(task, failed=was_error, result=result,
simulate_run=True)
if was_interrupted:
return
self._change_state(context, states.RUNNING)
for task in task_order[last_task:]:
if self.state == states.INTERRUPTED:
was_interrupted = True
break
run_task(task)
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
def reset(self):
super(Flow, self).reset()
self.results = []
self.result_fetcher = None
self._accumulator.reset()
self._left_off_at = 0
def rollback(self, context, cause):
# Performs basic task by task rollback by going through the reverse
# order that tasks have finished and asking said task to undo whatever
# it has done. If this flow has any parent flows then they will
# also be called to rollback any tasks said parents contain.
#
# Note(harlowja): if a flow can more simply revert a whole set of
# tasks via a simpler command then it can override this method to
# accomplish that.
#
# For example, if each task was creating a file in a directory, then
# it's easier to just remove the directory than to ask each task to
# delete its file individually.
try:
self._change_state(context, states.REVERTING)
except Exception:
LOG.exception("Dropping exception catched when"
" changing state to reverting while performing"
" reconcilation on a tasks exception.")
try:
self._accumulator.rollback(cause)
finally:
try:
self._change_state(context, states.FAILURE)
except Exception:
LOG.exception("Dropping exception catched when"
" changing state to failure while performing"
" reconcilation on a tasks exception.")
if self.parents:
# Rollback any parents flows if they exist...
for p in self.parents:
p.rollback(context, cause)
-295
View File
@@ -1,295 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import copy
import functools
import logging
from taskflow.openstack.common import excutils
from taskflow import exceptions as exc
from taskflow import states
from taskflow import utils
LOG = logging.getLogger(__name__)
class Flow(object):
"""A set tasks that can be applied as one unit or rolled back as one
unit using an ordered arrangements of said tasks where reversion is by
default handled by reversing through the tasks applied."""
__metaclass__ = abc.ABCMeta
def __init__(self, name, parents=None):
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._accumulator = utils.RollbackAccumulator()
self.name = name
# If this flow has a parent flow/s which need to be reverted if
# this flow fails then please include them here to allow this child
# to call the parents...
self.parents = parents
# This should be a functor that returns whether a given task has
# already ran by returning a pair of (has_result, was_error, result).
#
# NOTE(harlowja): This allows for resumption by skipping tasks which
# have already occurred. The previous return value is needed due to
# the contract we have with tasks that they will be given the value
# they returned if reversion is triggered.
self.result_fetcher = None
# Any objects that want to listen when a wf/task starts/stops/completes
# or errors should be registered here. This can be used to monitor
# progress and record tasks finishing (so that it becomes possible to
# store the result of a task in some persistent or semi-persistent
# storage backend).
self.task_listeners = []
self.listeners = []
# The state of this flow.
self._state = states.PENDING
# Tasks results are stored here...
self.results = []
# The last task index in the order we left off at before being
# interrupted (or failing).
self._left_off_at = 0
@property
def state(self):
return self._state
@abc.abstractmethod
def add(self, task):
"""Adds a given task to this flow."""
raise NotImplementedError()
def __str__(self):
return "Flow: %s" % (self.name)
@abc.abstractmethod
def order(self):
"""Returns the order in which the tasks should be ran
as a iterable list."""
raise NotImplementedError()
def _fetch_task_inputs(self, _task):
"""Retrieves and additional kwargs inputs to provide to the task when
said task is being applied."""
return None
def run(self, context, *args, **kwargs):
if self.state != states.PENDING:
raise exc.InvalidStateException("Unable to run flow when "
"in state %s" % (self.state))
if self.result_fetcher:
result_fetcher = functools.partial(self.result_fetcher, context)
else:
result_fetcher = None
self._change_state(context, states.STARTED)
try:
task_order = self.order()
if self._left_off_at > 0:
task_order = task_order[self._left_off_at:]
except Exception:
with excutils.save_and_reraise_exception():
try:
self._change_state(context, states.FAILURE)
except Exception:
LOG.exception("Dropping exception catched when"
" notifying about ordering failure.")
def run_task(task, failed=False, result=None, simulate_run=False):
try:
self._on_task_start(context, task)
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
rb = utils.RollbackTask(context, task, result=None)
self._accumulator.add(rb)
if not simulate_run:
inputs = self._fetch_task_inputs(task)
if not inputs:
inputs = {}
inputs.update(kwargs)
result = task(context, *args, **inputs)
else:
if failed:
if not result:
# If no exception or exception message was provided
# or captured from the previous run then we need to
# form one for this task.
result = "%s failed running." % (task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
# Adjust the task result in the accumulator before
# notifying others that the task has finished to
# avoid the case where a listener might throw an
# exception.
#
# Note(harlowja): Keep the original result in the
# accumulator only and give a duplicated copy to
# avoid the original result being altered by other
# tasks.
#
# This is due to python being by reference (which means
# some task could alter this result intentionally or not
# intentionally).
rb.result = result
# Alter the index we have ran at.
self._left_off_at += 1
result_copy = copy.deepcopy(result)
self.results.append((task, result_copy))
self._on_task_finish(context, task, result_copy)
except Exception as e:
cause = utils.FlowFailure(task, self, e)
with excutils.save_and_reraise_exception():
try:
self._on_task_error(context, task, e)
except Exception:
LOG.exception("Dropping exception catched when"
" notifying about task failure.")
self.rollback(context, cause)
last_task = 0
was_interrupted = False
if result_fetcher:
self._change_state(context, states.RESUMING)
for (i, task) in enumerate(task_order):
if self.state == states.INTERRUPTED:
was_interrupted = True
break
(has_result, was_error, result) = result_fetcher(self, task)
if not has_result:
break
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that
# would have happened in a normal flow).
last_task = i + 1
run_task(task, failed=was_error, result=result,
simulate_run=True)
if was_interrupted:
return
self._change_state(context, states.RUNNING)
for task in task_order[last_task:]:
if self.state == states.INTERRUPTED:
was_interrupted = True
break
run_task(task)
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
def reset(self):
# Reset (hard) alters the local state and does clear out other member
# variables state.
if self.state not in (states.INTERRUPTED, states.SUCCESS,
states.FAILURE, states.PENDING):
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % (self.state))
self._state = states.PENDING
self.results = []
self.task_listeners = []
self.listeners = []
self.result_fetcher = None
self._accumulator.reset()
self._left_off_at = 0
def soft_reset(self):
# Soft reset only alters the local state and does not clear out any
# other member variables state.
if self.state not in (states.INTERRUPTED, states.SUCCESS,
states.PENDING):
raise exc.InvalidStateException(("Can not soft_reset when"
" in state %s") % (self.state))
self._state = states.PENDING
def interrupt(self):
if self.state in (states.FAILURE, states.SUCCESS, states.PENDING):
raise exc.InvalidStateException(("Can not interrupt when"
" in state %s") % (self.state))
self._change_state(None, states.INTERRUPTED)
def _change_state(self, context, new_state):
if self.state != new_state:
old_state = self.state
self._state = new_state
self._on_flow_state_change(context, old_state)
def _on_flow_state_change(self, context, old_state):
# Notify any listeners that the internal state has changed.
for f in self.listeners:
f(context, self, old_state)
def _on_task_error(self, context, task, exc):
# Notify any listeners that the task has errored.
for f in self.task_listeners:
f(context, states.FAILURE, self, task, result=exc)
def _on_task_start(self, context, task):
# Notify any listeners that we are about to start the given task.
for f in self.task_listeners:
f(context, states.STARTED, self, task)
def _on_task_finish(self, context, task, result):
# Notify any listeners that we are finishing the given task.
for f in self.task_listeners:
f(context, states.SUCCESS, self, task, result=result)
def rollback(self, context, cause):
# Performs basic task by task rollback by going through the reverse
# order that tasks have finished and asking said task to undo whatever
# it has done. If this flow has any parent flows then they will
# also be called to rollback any tasks said parents contain.
#
# Note(harlowja): if a flow can more simply revert a whole set of
# tasks via a simpler command then it can override this method to
# accomplish that.
#
# For example, if each task was creating a file in a directory, then
# it's easier to just remove the directory than to ask each task to
# delete its file individually.
try:
self._change_state(context, states.REVERTING)
except Exception:
LOG.exception("Dropping exception catched when"
" changing state to reverting while performing"
" reconcilation on a tasks exception.")
try:
self._accumulator.rollback(cause)
finally:
try:
self._change_state(context, states.FAILURE)
except Exception:
LOG.exception("Dropping exception catched when"
" changing state to failure while performing"
" reconcilation on a tasks exception.")
if self.parents:
# Rollback any parents flows if they exist...
for p in self.parents:
p.rollback(context, cause)
-2
View File
@@ -159,9 +159,7 @@ class GraphFlowTest(unittest2.TestCase):
flo.add(run1)
flo.add(run2)
self.assertRaises(excp.InvalidStateException, flo.connect)
self.assertRaises(excp.InvalidStateException, flo.run, {})
self.assertRaises(excp.InvalidStateException, flo.order)
def test_happy_flow(self):
flo = gw.Flow("test-flow")
+2 -1
View File
@@ -1,7 +1,8 @@
# Packages needed for using this library.
oslo.config>=1.1.0
iso8601
networkx
# Very nice graph library
networkx>=1.5
# Only needed if celery/distributed flows used.
celery
# Only needed if database backend used.