Allow provided flow to be empty

If someone really wants to provide a flow to run that is
empty that is there prerogative so it doesn't seem that valuable
to blow up if they do this.

Change-Id: I0ad89b0ade85a64f6ec107e2686454ef6dc97353
This commit is contained in:
Joshua Harlow
2015-08-02 20:48:12 -07:00
committed by Joshua Harlow
parent fb064a9f0f
commit 010b3fda21
3 changed files with 16 additions and 15 deletions

View File

@@ -329,12 +329,6 @@ class PatternCompiler(object):
if dup_names:
raise exc.Duplicate(
"Atoms with duplicate names found: %s" % (sorted(dup_names)))
atoms = iter_utils.count(
node for node, node_attrs in graph.nodes_iter(data=True)
if node_attrs['kind'] in ATOMS)
if atoms == 0:
raise exc.Empty("Root container '%s' (%s) is empty"
% (self._root, type(self._root)))
self._history.clear()
@fasteners.locked

View File

@@ -43,7 +43,7 @@ class PatternCompileTest(test.TestCase):
def test_empty(self):
flo = lf.Flow("test")
self.assertRaises(exc.Empty, compiler.PatternCompiler(flo).compile)
compiler.PatternCompiler(flo).compile()
def test_linear(self):
a, b, c, d = test_utils.make_many(4)

View File

@@ -42,6 +42,13 @@ from taskflow.utils import persistence_utils as p_utils
from taskflow.utils import threading_utils as tu
# Expected engine transitions when empty workflows are ran...
_EMPTY_TRANSITIONS = [
states.RESUMING, states.SCHEDULING, states.WAITING,
states.ANALYZING, states.SUCCESS,
]
class EngineTaskTest(object):
def test_run_task_as_flow(self):
@@ -255,10 +262,10 @@ class EngineMultipleResultsTest(utils.EngineTestBase):
class EngineLinearFlowTest(utils.EngineTestBase):
def test_run_empty_flow(self):
def test_run_empty_linear_flow(self):
flow = lf.Flow('flow-1')
engine = self._make_engine(flow)
self.assertRaises(exc.Empty, engine.run)
self.assertEqual(_EMPTY_TRANSITIONS, list(engine.run_iter()))
def test_overlap_parent_sibling_expected_result(self):
flow = lf.Flow('flow-1')
@@ -456,10 +463,10 @@ class EngineLinearFlowTest(utils.EngineTestBase):
class EngineParallelFlowTest(utils.EngineTestBase):
def test_run_empty_flow(self):
def test_run_empty_unordered_flow(self):
flow = uf.Flow('p-1')
engine = self._make_engine(flow)
self.assertRaises(exc.Empty, engine.run)
self.assertEqual(_EMPTY_TRANSITIONS, list(engine.run_iter()))
def test_parallel_flow_with_priority(self):
flow = uf.Flow('p-1')
@@ -664,16 +671,16 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
class EngineGraphFlowTest(utils.EngineTestBase):
def test_run_empty_flow(self):
def test_run_empty_graph_flow(self):
flow = gf.Flow('g-1')
engine = self._make_engine(flow)
self.assertRaises(exc.Empty, engine.run)
self.assertEqual(_EMPTY_TRANSITIONS, list(engine.run_iter()))
def test_run_nested_empty_flows(self):
def test_run_empty_nested_graph_flows(self):
flow = gf.Flow('g-1').add(lf.Flow('l-1'),
gf.Flow('g-2'))
engine = self._make_engine(flow)
self.assertRaises(exc.Empty, engine.run)
self.assertEqual(_EMPTY_TRANSITIONS, list(engine.run_iter()))
def test_graph_flow_one_task(self):
flow = gf.Flow('g-1').add(