Graph tests and adjustments releated to.

This commit is contained in:
Joshua Harlow
2013-05-20 15:03:41 -07:00
parent a8d2826519
commit e543bc3ea7
3 changed files with 133 additions and 18 deletions

View File

@@ -49,6 +49,17 @@ class Flow(ordered_flow.Flow):
self._graph.add_node(task)
self._connected = False
def _fetch_task_inputs(self, task):
task_needs = task.requires()
if not task_needs:
return None
inputs = {}
for (_who, there_result) in self.results:
for n in task_needs:
if there_result and n in there_result:
inputs[n] = there_result[n]
return inputs
def run(self, context, *args, **kwargs):
self.connect()
return super(Flow, self).run(context, *args, **kwargs)
@@ -64,23 +75,30 @@ class Flow(ordered_flow.Flow):
"tasks needed inputs and outputs.")
def connect(self):
"""Connects the edges of the graph together."""
if self._connected:
"""Connects the nodes & edges of the graph together."""
if self._connected or len(self._graph) == 0:
return
provides_what = defaultdict(list)
requires_what = defaultdict(list)
for t in self._graph.nodes_iter():
for r in t.requires:
for r in t.requires():
requires_what[r].append(t)
for p in t.provides:
for p in t.provides():
provides_what[p].append(t)
for (i_want, n) in requires_what.items():
for (i_want, who_wants) in requires_what.items():
if i_want not in provides_what:
raise exc.InvalidStateException("Task %s requires input %s "
raise exc.InvalidStateException("Task/s %s requires input %s "
"but no other task produces "
"said output" % (n, i_want))
"said output." % (who_wants,
i_want))
for p in provides_what[i_want]:
# P produces for N so thats why we link P->N and not N->P
self._graph.add_edge(p, n)
for n in who_wants:
why = {
i_want: True,
}
self._graph.add_edge(p, n, why)
self._connected = True

View File

@@ -95,14 +95,14 @@ class Flow(object):
# the given task and either reconcile said task and its associated
# failure, so that the flow can continue or abort and perform
# some type of undo of the tasks already completed.
try:
self._change_state(context, states.REVERTING)
except Exception:
LOG.exception("Dropping exception catched when"
" changing state to reverting while performing"
" reconcilation on a tasks exception.")
cause = exc.TaskException(task, self, excp)
with excutils.save_and_reraise_exception():
try:
self._change_state(context, states.REVERTING)
except Exception:
LOG.exception("Dropping exception catched when"
" changing state to reverting while performing"
" reconcilation on a tasks exception.")
try:
self._on_task_error(context, task)
except Exception:

View File

@@ -19,15 +19,112 @@
import functools
import unittest
from taskflow import exceptions as excp
from taskflow import states
from taskflow import task
from taskflow import wrappers
from taskflow.patterns import graph_workflow as gw
from taskflow.patterns import graph_flow as gw
class GraphWorkflowTest(unittest.TestCase):
def null_functor(*args, **kwargs):
return None
class GraphFlowTest(unittest.TestCase):
def testRevertPath(self):
flo = gw.Flow("test-flow")
reverted = []
def run1(context, *args, **kwargs):
return {
'a': 1,
}
def run1_revert(context, result, cause):
reverted.append('run1')
self.assertEquals(states.REVERTING, cause.flow.state)
self.assertEquals(result, {'a': 1})
def run2(context, a, *args, **kwargs):
raise Exception('Dead')
flo.add(wrappers.FunctorTask(None, run1, run1_revert,
provides_what=['a'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run2, null_functor,
provides_what=['c'],
extract_requires=True))
self.assertEquals(states.PENDING, flo.state)
self.assertRaises(Exception, flo.run, {})
self.assertEquals(states.FAILURE, flo.state)
self.assertEquals(['run1'], reverted)
def testConnectRequirementFailure(self):
def run1(context, *args, **kwargs):
return {
'a': 1,
}
def run2(context, b, c, d, *args, **kwargs):
return None
flo = gw.Flow("test-flow")
flo.add(wrappers.FunctorTask(None, run1, null_functor,
provides_what=['a'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run2, null_functor,
extract_requires=True))
self.assertRaises(excp.InvalidStateException, flo.connect)
self.assertRaises(excp.InvalidStateException, flo.run, {})
self.assertRaises(excp.InvalidStateException, flo.order)
def testHappyPath(self):
flo = gw.Flow("test-flow")
pass
run_order = []
f_args = {}
def run1(context, *args, **kwargs):
run_order.append('ran1')
return {
'a': 1,
}
def run2(context, a, *args, **kwargs):
run_order.append('ran2')
return {
'c': 3,
}
def run3(context, a, *args, **kwargs):
run_order.append('ran3')
return {
'b': 2,
}
def run4(context, b, c, *args, **kwargs):
run_order.append('ran4')
f_args['b'] = b
f_args['c'] = c
flo.add(wrappers.FunctorTask(None, run1, null_functor,
provides_what=['a'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run2, null_functor,
provides_what=['c'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run3, null_functor,
provides_what=['b'],
extract_requires=True))
flo.add(wrappers.FunctorTask(None, run4, null_functor,
extract_requires=True))
flo.run({})
self.assertEquals(['ran1', 'ran2', 'ran3', 'ran4'], sorted(run_order))
self.assertEquals('ran1', run_order[0])
self.assertEquals('ran4', run_order[-1])
self.assertEquals({'b': 2, 'c': 3}, f_args)