From d8a62ee24ea023901db41ae85a72b518e1c9e302 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 14 May 2013 15:44:41 -0700 Subject: [PATCH] Start adding in a more generic DAG based workflow. --- taskflow/patterns/__init__.py | 200 +++++++++++++++++++++++++++ taskflow/patterns/graph_workflow.py | 81 +++++++++++ taskflow/patterns/linear_workflow.py | 163 ++-------------------- taskflow/task.py | 10 ++ tools/pip-requires | 2 +- 5 files changed, 300 insertions(+), 156 deletions(-) create mode 100644 taskflow/patterns/graph_workflow.py diff --git a/taskflow/patterns/__init__.py b/taskflow/patterns/__init__.py index 830dd2e7..0657139c 100644 --- a/taskflow/patterns/__init__.py +++ b/taskflow/patterns/__init__.py @@ -15,3 +15,203 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +from taskflow.openstack.common import excutils +from taskflow import exceptions as exc +from taskflow import states + +LOG = logging.getLogger(__name__) + + +class OrderedWorkflow(object): + """A set tasks that can be applied as one unit or rolled back as one + unit using an ordered arrangements of said tasks where reversion can be + handled by reversing through the tasks applied.""" + + __metaclass__ = abc.ABCMeta + + def __init__(self, name, tolerant=False, parents=None): + # The tasks which have been applied will be collected here so that they + # can be reverted in the correct order on failure. + self._reversions = [] + self.name = name + # If this chain can ignore individual task reversion failure then this + # should be set to true, instead of the default value of false. + self.tolerant = tolerant + # If this workflow has a parent workflow/s which need to be reverted if + # this workflow fails then please include them here to allow this child + # to call the parents... + self.parents = parents + # This should be a functor that returns whether a given task has + # already ran by returning a pair of (has_result, result). + # + # NOTE(harlowja): This allows for resumption by skipping tasks which + # have already occurred. The previous return value is needed due to + # the contract we have with tasks that they will be given the value + # they returned if reversion is triggered. + self.result_fetcher = None + # Any objects that want to listen when a task starts/stops/completes + # or errors should be registered here. This can be used to monitor + # progress and record tasks finishing (so that it becomes possible to + # store the result of a task in some persistent or semi-persistent + # storage backend). + self.listeners = [] + # The state of this flow. + self.state = states.PENDING + # Tasks results are stored here... + + @abc.abstractmethod + def add(self, task): + raise NotImplementedError() + + def __str__(self): + return "%s: %s" % (self.__class__.__name__, id(self)) + + @abc.abstractmethod + def order(self): + raise NotImplementedError() + + def _fetch_inputs(self, task): + return {} + + def run(self, context, *args, **kwargs): + if self.state != states.PENDING: + raise exc.InvalidStateException("Unable to run workflow when " + "in state %s" % (self.state)) + + if self.result_fetcher: + result_fetcher = functools.partial(self.result_fetcher, context) + else: + result_fetcher = None + + self._change_state(context, states.STARTED) + + # TODO(harlowja): we can likely add in custom reconcilation strategies + # here or around here... + def do_rollback_for(task, ex): + self._change_state(context, states.REVERTING) + with excutils.save_and_reraise_exception(): + try: + self._on_task_error(context, task) + except Exception: + LOG.exception("Dropping exception catched when" + " notifying about existing task" + " exception.") + self.rollback(context, exc.TaskException(task, self, ex)) + self._change_state(context, states.FAILURE) + + self._change_state(context, states.RESUMING) + last_task = 0 + if result_fetcher: + for (i, task) in enumerate(self.tasks): + (has_result, result) = result_fetcher(context, self, task) + if not has_result: + break + # Fake running the task so that we trigger the same + # notifications and state changes (and rollback that would + # have happened in a normal flow). + last_task = i + 1 + try: + self._on_task_start(context, task) + # Keep a pristine copy of the result + # so that if said result is altered by other further + # states the one here will not be. This ensures that + # if rollback occurs that the task gets exactly the + # result it returned and not a modified one. + self.results.append((task, copy.deepcopy(result))) + self._on_task_finish(context, task, result) + except Exception as ex: + do_rollback_for(task, ex) + + self._change_state(context, states.RUNNING) + for task in self.order(): + try: + has_result = False + result = None + if result_fetcher: + (has_result, result) = result_fetcher(context, self, task) + self._on_task_start(context, task) + if not has_result: + inputs = self._fetch_inputs(task) + inputs.update(kwargs) + result = task.apply(context, *args, **inputs) + # Keep a pristine copy of the result + # so that if said result is altered by other further states + # the one here will not be. This ensures that if rollback + # occurs that the task gets exactly the result it returned + # and not a modified one. + self.results.append((task, copy.deepcopy(result))) + self._on_task_finish(context, task, result) + except Exception as ex: + do_rollback_for(task, ex) + + # Only gets here if everything went successfully. + self._change_state(context, states.SUCCESS) + + def _change_state(self, context, new_state): + if self.state != new_state: + self.state = new_state + self._on_flow_state_change(context) + + def _on_flow_state_change(self, context): + # Notify any listeners that the internal state has changed. + for i in self.listeners: + i.notify(context, self) + + def _on_task_error(self, context, task): + # Notify any listeners that the task has errored. + for i in self.listeners: + i.notify(context, states.FAILURE, self, task) + + def _on_task_start(self, context, task): + # Notify any listeners that we are about to start the given task. + for i in self.listeners: + i.notify(context, states.STARTED, self, task) + + def _on_task_finish(self, context, task, result): + # Notify any listeners that we are finishing the given task. + self._reversions.append((task, result)) + for i in self.listeners: + i.notify(context, states.SUCCESS, self, task, result=result) + + def rollback(self, context, cause): + for (i, (task, result)) in enumerate(reversed(self._reversions)): + try: + task.revert(context, result, cause) + except Exception: + # Ex: WARN: Failed rolling back stage 1 (validate_request) of + # chain validation due to Y exception. + msg = ("Failed rolling back stage %(index)s (%(task)s)" + " of workflow %(workflow)s, due to inner exception.") + LOG.warn(msg % {'index': (i + 1), 'task': task, + 'workflow': self}) + if not self.tolerant: + # NOTE(harlowja): LOG a msg AND re-raise the exception if + # the chain does not tolerate exceptions happening in the + # rollback method. + raise + if self.parents: + # Rollback any parents workflows if they exist... + for p in self.parents: + p.rollback(context, cause) diff --git a/taskflow/patterns/graph_workflow.py b/taskflow/patterns/graph_workflow.py new file mode 100644 index 00000000..d9864b80 --- /dev/null +++ b/taskflow/patterns/graph_workflow.py @@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import defaultdict + +import logging + +from networkx import exception as g_exc +from networkx.algorithms import dag +from networkx.classes import digraph + +from taskflow import exceptions as exc +from taskflow import patterns + +LOG = logging.getLogger(__name__) + + +class Workflow(patterns.OrderedWorkflow): + def __init__(self, name, tolerant=False, parents=None): + super(Workflow, self).__init__(name, tolerant, parents) + self._graph = digraph.DiGraph() + self._connected = False + + def add(self, task): + # Do something with the task, either store it for later + # or add it to the graph right now... + # + # Only insert the node to start, connect all the edges + # together later after all nodes have been added. + self._graph.add_node(task) + + def run(self, context, *args, **kwargs): + self.connect() + return super(Workflow, self).run(context, *args, **kwargs) + + def order(self): + self.connect() + try: + return dag.topological_sort(self._graph) + except g_exc.NetworkXUnfeasible: + raise exc.InvalidStateException("Unable to correctly determine " + "the path through the provided " + "workflow which will satisfy the " + "tasks needed inputs and outputs.") + + def connect(self): + """Connects the edges of the graph together.""" + if self._connected: + return + provides_what = defaultdict(list) + requires_what = defaultdict(list) + for t in self._graph.nodes_iter(): + for r in t.requires: + requires_what[r].append(t) + for p in t.provides: + provides_what[p].append(t) + for (i_want, n) in requires_what.items(): + if i_want not in provides_what: + raise exc.InvalidStateException("Task %s requires input %s " + "but no other task produces " + "said output" % (n, i_want)) + for p in provides_what[i_want]: + # P produces for N so thats why we link P->N and not N->P + self._graph.add_edge(p, n) + + self._connected = True diff --git a/taskflow/patterns/linear_workflow.py b/taskflow/patterns/linear_workflow.py index 6e97b800..c68fbb9a 100644 --- a/taskflow/patterns/linear_workflow.py +++ b/taskflow/patterns/linear_workflow.py @@ -16,166 +16,19 @@ # License for the specific language governing permissions and limitations # under the License. -import copy -import functools -import logging - -from taskflow.openstack.common import excutils -from taskflow import exceptions as exc -from taskflow import states - -LOG = logging.getLogger(__name__) +from taskflow import patterns -class Workflow(object): +class Workflow(patterns.OrderedWorkflow): """A linear chain of independent tasks that can be applied as one unit or rolled back as one unit.""" def __init__(self, name, tolerant=False, parents=None): - # The tasks which have been applied will be collected here so that they - # can be reverted in the correct order on failure. - self._reversions = [] - self.name = name - # If this chain can ignore individual task reversion failure then this - # should be set to true, instead of the default value of false. - self.tolerant = tolerant - # Tasks and there results are stored here... - self.tasks = [] - self.results = [] - # If this workflow has a parent workflow/s which need to be reverted if - # this workflow fails then please include them here to allow this child - # to call the parents... - self.parents = parents - # This should be a functor that returns whether a given task has - # already ran by returning a pair of (has_result, result). - # - # NOTE(harlowja): This allows for resumption by skipping tasks which - # have already occurred. The previous return value is needed due to - # the contract we have with tasks that they will be given the value - # they returned if reversion is triggered. - self.result_fetcher = None - # Any objects that want to listen when a task starts/stops/completes - # or errors should be registered here. This can be used to monitor - # progress and record tasks finishing (so that it becomes possible to - # store the result of a task in some persistent or semi-persistent - # storage backend). - self.listeners = [] - # The state of this flow. - self.state = states.PENDING + super(Workflow, self).__init__(name, tolerant, parents) + self._tasks = [] - def __str__(self): - return "%s: %s" % (self.__class__.__name__, id(self)) + def add(self, task): + self._tasks.append(task) - def run(self, context, *args, **kwargs): - if self.state != states.PENDING: - raise exc.InvalidStateException("Unable to run linear flow when " - "in state %s" % (self.state)) - - if self.result_fetcher: - result_fetcher = functools.partial(self.result_fetcher, context) - else: - result_fetcher = None - - self._change_state(context, states.STARTED) - - # TODO(harlowja): we can likely add in custom reconcilation strategies - # here or around here... - def do_rollback_for(task, ex): - self._change_state(context, states.REVERTING) - with excutils.save_and_reraise_exception(): - try: - self._on_task_error(context, task) - except Exception: - LOG.exception("Dropping exception catched when" - " notifying about existing task" - " exception.") - self.rollback(context, exc.TaskException(task, self, ex)) - self._change_state(context, states.FAILURE) - - self._change_state(context, states.RESUMING) - last_task = 0 - if result_fetcher: - for (i, task) in enumerate(self.tasks): - (has_result, result) = result_fetcher(context, self, task) - if not has_result: - break - # Fake running the task so that we trigger the same - # notifications and state changes (and rollback that would - # have happened in a normal flow). - last_task = i + 1 - try: - self._on_task_start(context, task) - # Keep a pristine copy of the result - # so that if said result is altered by other further - # states the one here will not be. This ensures that - # if rollback occurs that the task gets exactly the - # result it returned and not a modified one. - self.results.append((task, copy.deepcopy(result))) - self._on_task_finish(context, task, result) - except Exception as ex: - do_rollback_for(task, ex) - - self._change_state(context, states.RUNNING) - for task in self.tasks[last_task:]: - try: - self._on_task_start(context, task) - result = task.apply(context, *args, **kwargs) - # Keep a pristine copy of the result - # so that if said result is altered by other further states - # the one here will not be. This ensures that if rollback - # occurs that the task gets exactly the result it returned - # and not a modified one. - self.results.append((task, copy.deepcopy(result))) - self._on_task_finish(context, task, result) - except Exception as ex: - do_rollback_for(task, ex) - - # Only gets here if everything went successfully. - self._change_state(context, states.SUCCESS) - - def _change_state(self, context, new_state): - if self.state != new_state: - self.state = new_state - self._on_flow_state_change(context) - - def _on_flow_state_change(self, context): - # Notify any listeners that the internal state has changed. - for i in self.listeners: - i.notify(context, self) - - def _on_task_error(self, context, task): - # Notify any listeners that the task has errored. - for i in self.listeners: - i.notify(context, states.FAILURE, self, task) - - def _on_task_start(self, context, task): - # Notify any listeners that we are about to start the given task. - for i in self.listeners: - i.notify(context, states.STARTED, self, task) - - def _on_task_finish(self, context, task, result): - # Notify any listeners that we are finishing the given task. - self._reversions.append((task, result)) - for i in self.listeners: - i.notify(context, states.SUCCESS, self, task, result=result) - - def rollback(self, context, cause): - for (i, (task, result)) in enumerate(reversed(self._reversions)): - try: - task.revert(context, result, cause) - except Exception: - # Ex: WARN: Failed rolling back stage 1 (validate_request) of - # chain validation due to Y exception. - msg = ("Failed rolling back stage %(index)s (%(task)s)" - " of workflow %(workflow)s, due to inner exception.") - LOG.warn(msg % {'index': (i + 1), 'task': task, - 'workflow': self}) - if not self.tolerant: - # NOTE(harlowja): LOG a msg AND re-raise the exception if - # the chain does not tolerate exceptions happening in the - # rollback method. - raise - if self.parents: - # Rollback any parents workflows if they exist... - for p in self.parents: - p.rollback(context, cause) + def order(self): + return list(self._tasks) diff --git a/taskflow/task.py b/taskflow/task.py index 6e67147f..4cfdcf95 100644 --- a/taskflow/task.py +++ b/taskflow/task.py @@ -33,6 +33,16 @@ class Task(object): def __str__(self): return self.name + def requires(self): + """Return any input 'resource' names this task depends on existing + before this task can be applied.""" + return set() + + def provides(self): + """Return any output 'resource' names this task produces that other + tasks may depend on this task providing.""" + return set() + @abc.abstractmethod def apply(self, context, *args, **kwargs): """Activate a given task which will perform some operation and return. diff --git a/tools/pip-requires b/tools/pip-requires index 483a09a7..1d75a515 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -1,3 +1,3 @@ # Packages needed for using this library. - oslo.config>=1.1.0 +networkx