Merge pull request #16 from harlowja/graph

Graph flow tests and updates.
This commit is contained in:
Joshua Harlow
2013-05-20 19:59:37 -07:00
6 changed files with 223 additions and 17 deletions

View File

@@ -26,10 +26,10 @@ class TaskException(TaskFlowException):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure."""
def __init__(self, task, workflow=None, cause=None):
def __init__(self, task, flow=None, cause=None):
super(TaskException, self).__init__()
self.task = task
self.workflow = workflow
self.flow = flow
self.cause = cause

View File

@@ -49,6 +49,17 @@ class Flow(ordered_flow.Flow):
self._graph.add_node(task)
self._connected = False
def _fetch_task_inputs(self, task):
task_needs = task.requires()
if not task_needs:
return None
inputs = {}
for (_who, there_result) in self.results:
for n in task_needs:
if there_result and n in there_result:
inputs[n] = there_result[n]
return inputs
def run(self, context, *args, **kwargs):
self.connect()
return super(Flow, self).run(context, *args, **kwargs)
@@ -64,23 +75,30 @@ class Flow(ordered_flow.Flow):
"tasks needed inputs and outputs.")
def connect(self):
"""Connects the edges of the graph together."""
if self._connected:
"""Connects the nodes & edges of the graph together."""
if self._connected or len(self._graph) == 0:
return
provides_what = defaultdict(list)
requires_what = defaultdict(list)
for t in self._graph.nodes_iter():
for r in t.requires:
for r in t.requires():
requires_what[r].append(t)
for p in t.provides:
for p in t.provides():
provides_what[p].append(t)
for (i_want, n) in requires_what.items():
for (i_want, who_wants) in requires_what.items():
if i_want not in provides_what:
raise exc.InvalidStateException("Task %s requires input %s "
raise exc.InvalidStateException("Task/s %s requires input %s "
"but no other task produces "
"said output" % (n, i_want))
"said output." % (who_wants,
i_want))
for p in provides_what[i_want]:
# P produces for N so thats why we link P->N and not N->P
self._graph.add_edge(p, n)
for n in who_wants:
why = {
i_want: True,
}
self._graph.add_edge(p, n, why)
self._connected = True

View File

@@ -95,14 +95,14 @@ class Flow(object):
# the given task and either reconcile said task and its associated
# failure, so that the flow can continue or abort and perform
# some type of undo of the tasks already completed.
try:
self._change_state(context, states.REVERTING)
except Exception:
LOG.exception("Dropping exception catched when"
" changing state to reverting while performing"
" reconcilation on a tasks exception.")
cause = exc.TaskException(task, self, excp)
with excutils.save_and_reraise_exception():
try:
self._change_state(context, states.REVERTING)
except Exception:
LOG.exception("Dropping exception catched when"
" changing state to reverting while performing"
" reconcilation on a tasks exception.")
try:
self._on_task_error(context, task)
except Exception:

View File

@@ -0,0 +1,130 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
import unittest
from taskflow import exceptions as excp
from taskflow import states
from taskflow import task
from taskflow import wrappers
from taskflow.patterns import graph_flow as gw
def null_functor(*args, **kwargs):
return None
class GraphFlowTest(unittest.TestCase):
def testRevertPath(self):
flo = gw.Flow("test-flow")
reverted = []
def run1(context, *args, **kwargs):
return {
'a': 1,
}
def run1_revert(context, result, cause):
reverted.append('run1')
self.assertEquals(states.REVERTING, cause.flow.state)
self.assertEquals(result, {'a': 1})
def run2(context, a, *args, **kwargs):
raise Exception('Dead')
flo.add(wrappers.FunctorTask(None, run1, run1_revert,
provides_what=['a'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run2, null_functor,
provides_what=['c'],
extract_requires=True))
self.assertEquals(states.PENDING, flo.state)
self.assertRaises(Exception, flo.run, {})
self.assertEquals(states.FAILURE, flo.state)
self.assertEquals(['run1'], reverted)
def testConnectRequirementFailure(self):
def run1(context, *args, **kwargs):
return {
'a': 1,
}
def run2(context, b, c, d, *args, **kwargs):
return None
flo = gw.Flow("test-flow")
flo.add(wrappers.FunctorTask(None, run1, null_functor,
provides_what=['a'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run2, null_functor,
extract_requires=True))
self.assertRaises(excp.InvalidStateException, flo.connect)
self.assertRaises(excp.InvalidStateException, flo.run, {})
self.assertRaises(excp.InvalidStateException, flo.order)
def testHappyPath(self):
flo = gw.Flow("test-flow")
run_order = []
f_args = {}
def run1(context, *args, **kwargs):
run_order.append('ran1')
return {
'a': 1,
}
def run2(context, a, *args, **kwargs):
run_order.append('ran2')
return {
'c': 3,
}
def run3(context, a, *args, **kwargs):
run_order.append('ran3')
return {
'b': 2,
}
def run4(context, b, c, *args, **kwargs):
run_order.append('ran4')
f_args['b'] = b
f_args['c'] = c
flo.add(wrappers.FunctorTask(None, run1, null_functor,
provides_what=['a'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run2, null_functor,
provides_what=['c'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run3, null_functor,
provides_what=['b'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run4, null_functor,
extract_requires=True))
flo.run({})
self.assertEquals(['ran1', 'ran2', 'ran3', 'ran4'], sorted(run_order))
self.assertEquals('ran1', run_order[0])
self.assertEquals('ran4', run_order[-1])
self.assertEquals({'b': 2, 'c': 3}, f_args)

View File

@@ -60,6 +60,46 @@ class LinearFlowTest(unittest.TestCase):
functools.partial(do_interrupt, token),
null_functor)
def testSadFlowStateChanges(self):
wf = lw.Flow("the-test-action")
flow_changes = []
def flow_listener(context, wf, previous_state):
flow_changes.append(previous_state)
wf.listeners.append(flow_listener)
wf.add(self.makeRevertingTask(1, True))
self.assertEquals(states.PENDING, wf.state)
self.assertRaises(Exception, wf.run, {})
expected_states = [
states.PENDING,
states.STARTED,
states.RUNNING,
states.REVERTING,
]
self.assertEquals(expected_states, flow_changes)
self.assertEquals(states.FAILURE, wf.state)
def testHappyFlowStateChanges(self):
wf = lw.Flow("the-test-action")
flow_changes = []
def flow_listener(context, wf, previous_state):
flow_changes.append(previous_state)
wf.listeners.append(flow_listener)
wf.add(self.makeRevertingTask(1))
self.assertEquals(states.PENDING, wf.state)
wf.run({})
self.assertEquals([states.PENDING, states.STARTED, states.RUNNING],
flow_changes)
self.assertEquals(states.SUCCESS, wf.state)
def testHappyPath(self):
wf = lw.Flow("the-test-action")

View File

@@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import inspect
from taskflow import task
@@ -25,13 +27,29 @@ class FunctorTask(task.Task):
situations where existing functions already are in place and you just want
to wrap them up."""
def __init__(self, name, apply_functor, revert_functor):
def __init__(self, name, apply_functor, revert_functor,
provides_what=None, extract_requires=False):
super(FunctorTask, self).__init__(name)
if not self.name:
self.name = "%s_%s" % (apply_functor.__name__,
revert_functor.__name__)
self._apply_functor = apply_functor
self._revert_functor = revert_functor
self._requires = set()
self._provides = set()
if provides_what:
self._provides.update(provides_what)
if extract_requires:
for a in inspect.getargspec(apply_functor).args:
if a in ('self', 'context',):
continue
self._requires.add(a)
def requires(self):
return set(self._requires)
def provides(self):
return set(self._provides)
def apply(self, context, *args, **kwargs):
return self._apply_functor(context, *args, **kwargs)