Merge "Tweak engine iteration 'close-up shop' runtime path"
This commit is contained in:
@@ -14,6 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import threading
|
||||
|
||||
from taskflow.engines.action_engine import compiler
|
||||
@@ -30,6 +31,16 @@ from taskflow.utils import misc
|
||||
from taskflow.utils import reflection
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _start_stop(executor):
|
||||
# A teenie helper context manager to safely start/stop a executor...
|
||||
executor.start()
|
||||
try:
|
||||
yield executor
|
||||
finally:
|
||||
executor.stop()
|
||||
|
||||
|
||||
class ActionEngine(base.EngineBase):
|
||||
"""Generic action-based engine.
|
||||
|
||||
@@ -110,31 +121,38 @@ class ActionEngine(base.EngineBase):
|
||||
"""
|
||||
self.compile()
|
||||
self.prepare()
|
||||
self._task_executor.start()
|
||||
state = None
|
||||
runner = self._runtime.runner
|
||||
try:
|
||||
last_state = None
|
||||
with _start_stop(self._task_executor):
|
||||
self._change_state(states.RUNNING)
|
||||
for state in runner.run_iter(timeout=timeout):
|
||||
try:
|
||||
try_suspend = yield state
|
||||
except GeneratorExit:
|
||||
break
|
||||
else:
|
||||
if try_suspend:
|
||||
try:
|
||||
closed = False
|
||||
for (last_state, failures) in runner.run_iter(timeout=timeout):
|
||||
if failures:
|
||||
misc.Failure.reraise_if_any(failures)
|
||||
if closed:
|
||||
continue
|
||||
try:
|
||||
try_suspend = yield last_state
|
||||
except GeneratorExit:
|
||||
# The generator was closed, attempt to suspend and
|
||||
# continue looping until we have cleanly closed up
|
||||
# shop...
|
||||
closed = True
|
||||
self.suspend()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._change_state(states.FAILURE)
|
||||
else:
|
||||
ignorable_states = getattr(runner, 'ignorable_states', [])
|
||||
if state and state not in ignorable_states:
|
||||
self._change_state(state)
|
||||
if state != states.SUSPENDED and state != states.SUCCESS:
|
||||
failures = self.storage.get_failures()
|
||||
misc.Failure.reraise_if_any(failures.values())
|
||||
finally:
|
||||
self._task_executor.stop()
|
||||
else:
|
||||
if try_suspend:
|
||||
self.suspend()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._change_state(states.FAILURE)
|
||||
else:
|
||||
ignorable_states = getattr(runner, 'ignorable_states', [])
|
||||
if last_state and last_state not in ignorable_states:
|
||||
self._change_state(last_state)
|
||||
if last_state not in [states.SUSPENDED, states.SUCCESS]:
|
||||
failures = self.storage.get_failures()
|
||||
misc.Failure.reraise_if_any(failures.values())
|
||||
|
||||
def _change_state(self, state):
|
||||
with self._state_lock:
|
||||
|
||||
@@ -72,12 +72,12 @@ class Runner(object):
|
||||
timeout = _WAITING_TIMEOUT
|
||||
|
||||
# Prepare flow to be resumed
|
||||
yield st.RESUMING
|
||||
yield (st.RESUMING, [])
|
||||
next_nodes = self._completer.resume()
|
||||
next_nodes.update(self._analyzer.get_next_nodes())
|
||||
|
||||
# Schedule nodes to be worked on
|
||||
yield st.SCHEDULING
|
||||
yield (st.SCHEDULING, [])
|
||||
if self.is_running():
|
||||
not_done, failures = self._scheduler.schedule(next_nodes)
|
||||
else:
|
||||
@@ -90,7 +90,7 @@ class Runner(object):
|
||||
# preempt those tasks (maybe in the future we will be better able to do
|
||||
# this).
|
||||
while not_done:
|
||||
yield st.WAITING
|
||||
yield (st.WAITING, [])
|
||||
|
||||
# TODO(harlowja): maybe we should start doing 'yield from' this
|
||||
# call sometime in the future, or equivalent that will work in
|
||||
@@ -101,7 +101,7 @@ class Runner(object):
|
||||
# failures). If failures occurred just continue processing what
|
||||
# is running (so that we don't leave it abandoned) but do not
|
||||
# schedule anything new.
|
||||
yield st.ANALYZING
|
||||
yield (st.ANALYZING, [])
|
||||
next_nodes = set()
|
||||
for future in done:
|
||||
try:
|
||||
@@ -119,7 +119,7 @@ class Runner(object):
|
||||
else:
|
||||
next_nodes.update(more_nodes)
|
||||
if next_nodes and not failures and self.is_running():
|
||||
yield st.SCHEDULING
|
||||
yield (st.SCHEDULING, [])
|
||||
# Recheck incase someone suspended it.
|
||||
if self.is_running():
|
||||
more_not_done, failures = self._scheduler.schedule(
|
||||
@@ -127,10 +127,10 @@ class Runner(object):
|
||||
not_done.update(more_not_done)
|
||||
|
||||
if failures:
|
||||
misc.Failure.reraise_if_any(failures)
|
||||
if self._analyzer.get_next_nodes():
|
||||
yield st.SUSPENDED
|
||||
yield (st.FAILURE, failures)
|
||||
elif self._analyzer.get_next_nodes():
|
||||
yield (st.SUSPENDED, [])
|
||||
elif self._analyzer.is_success():
|
||||
yield st.SUCCESS
|
||||
yield (st.SUCCESS, [])
|
||||
else:
|
||||
yield st.REVERTED
|
||||
yield (st.REVERTED, [])
|
||||
|
||||
@@ -14,8 +14,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import string
|
||||
|
||||
from taskflow.engines.action_engine import compiler
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.patterns import graph_flow as gf
|
||||
@@ -23,20 +21,12 @@ from taskflow.patterns import linear_flow as lf
|
||||
from taskflow.patterns import unordered_flow as uf
|
||||
from taskflow import retry
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as t_utils
|
||||
|
||||
|
||||
def _make_many(amount):
|
||||
assert amount <= len(string.ascii_lowercase), 'Not enough letters'
|
||||
tasks = []
|
||||
for i in range(0, amount):
|
||||
tasks.append(t_utils.DummyTask(name=string.ascii_lowercase[i]))
|
||||
return tasks
|
||||
from taskflow.tests import utils as test_utils
|
||||
|
||||
|
||||
class PatternCompileTest(test.TestCase):
|
||||
def test_task(self):
|
||||
task = t_utils.DummyTask(name='a')
|
||||
task = test_utils.DummyTask(name='a')
|
||||
compilation = compiler.PatternCompiler().compile(task)
|
||||
g = compilation.execution_graph
|
||||
self.assertEqual(list(g.nodes()), [task])
|
||||
@@ -54,7 +44,7 @@ class PatternCompileTest(test.TestCase):
|
||||
compiler.PatternCompiler().compile, 42)
|
||||
|
||||
def test_linear(self):
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = lf.Flow("test")
|
||||
flo.add(a, b, c)
|
||||
sflo = lf.Flow("sub-test")
|
||||
@@ -74,7 +64,7 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertEqual([a], list(g.no_predecessors_iter()))
|
||||
|
||||
def test_invalid(self):
|
||||
a, b, c = _make_many(3)
|
||||
a, b, c = test_utils.make_many(3)
|
||||
flo = lf.Flow("test")
|
||||
flo.add(a, b, c)
|
||||
flo.add(flo)
|
||||
@@ -82,7 +72,7 @@ class PatternCompileTest(test.TestCase):
|
||||
compiler.PatternCompiler().compile, flo)
|
||||
|
||||
def test_unordered(self):
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = uf.Flow("test")
|
||||
flo.add(a, b, c, d)
|
||||
compilation = compiler.PatternCompiler().compile(flo)
|
||||
@@ -95,7 +85,7 @@ class PatternCompileTest(test.TestCase):
|
||||
set(g.no_predecessors_iter()))
|
||||
|
||||
def test_linear_nested(self):
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = lf.Flow("test")
|
||||
flo.add(a, b)
|
||||
flo2 = uf.Flow("test2")
|
||||
@@ -119,7 +109,7 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertTrue(g.has_edge(b, d))
|
||||
|
||||
def test_unordered_nested(self):
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = uf.Flow("test")
|
||||
flo.add(a, b)
|
||||
flo2 = lf.Flow("test2")
|
||||
@@ -142,7 +132,7 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertEqual(1, lb.number_of_edges())
|
||||
|
||||
def test_unordered_nested_in_linear(self):
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = lf.Flow('lt').add(
|
||||
a,
|
||||
uf.Flow('ut').add(b, c),
|
||||
@@ -159,7 +149,7 @@ class PatternCompileTest(test.TestCase):
|
||||
])
|
||||
|
||||
def test_graph(self):
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = gf.Flow("test")
|
||||
flo.add(a, b, c, d)
|
||||
|
||||
@@ -169,7 +159,7 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertEqual(0, g.number_of_edges())
|
||||
|
||||
def test_graph_nested(self):
|
||||
a, b, c, d, e, f, g = _make_many(7)
|
||||
a, b, c, d, e, f, g = test_utils.make_many(7)
|
||||
flo = gf.Flow("test")
|
||||
flo.add(a, b, c, d)
|
||||
|
||||
@@ -186,7 +176,7 @@ class PatternCompileTest(test.TestCase):
|
||||
])
|
||||
|
||||
def test_graph_nested_graph(self):
|
||||
a, b, c, d, e, f, g = _make_many(7)
|
||||
a, b, c, d, e, f, g = test_utils.make_many(7)
|
||||
flo = gf.Flow("test")
|
||||
flo.add(a, b, c, d)
|
||||
|
||||
@@ -200,7 +190,7 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertEqual(0, g.number_of_edges())
|
||||
|
||||
def test_graph_links(self):
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = gf.Flow("test")
|
||||
flo.add(a, b, c, d)
|
||||
flo.link(a, b)
|
||||
@@ -219,8 +209,8 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertItemsEqual([d], g.no_successors_iter())
|
||||
|
||||
def test_graph_dependencies(self):
|
||||
a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
|
||||
b = t_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
|
||||
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
|
||||
flo = gf.Flow("test").add(a, b)
|
||||
|
||||
compilation = compiler.PatternCompiler().compile(flo)
|
||||
@@ -233,9 +223,9 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertItemsEqual([b], g.no_successors_iter())
|
||||
|
||||
def test_graph_nested_requires(self):
|
||||
a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
|
||||
b = t_utils.ProvidesRequiresTask('b', provides=[], requires=[])
|
||||
c = t_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
|
||||
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
|
||||
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
|
||||
flo = gf.Flow("test").add(
|
||||
a,
|
||||
lf.Flow("test2").add(b, c)
|
||||
@@ -252,9 +242,9 @@ class PatternCompileTest(test.TestCase):
|
||||
self.assertItemsEqual([c], g.no_successors_iter())
|
||||
|
||||
def test_graph_nested_provides(self):
|
||||
a = t_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
|
||||
b = t_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
|
||||
c = t_utils.ProvidesRequiresTask('c', provides=[], requires=[])
|
||||
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
|
||||
b = test_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
|
||||
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=[])
|
||||
flo = gf.Flow("test").add(
|
||||
a,
|
||||
lf.Flow("test2").add(b, c)
|
||||
@@ -272,8 +262,8 @@ class PatternCompileTest(test.TestCase):
|
||||
|
||||
def test_checks_for_dups(self):
|
||||
flo = gf.Flow("test").add(
|
||||
t_utils.DummyTask(name="a"),
|
||||
t_utils.DummyTask(name="a")
|
||||
test_utils.DummyTask(name="a"),
|
||||
test_utils.DummyTask(name="a")
|
||||
)
|
||||
self.assertRaisesRegexp(exc.Duplicate,
|
||||
'^Atoms with duplicate names',
|
||||
@@ -281,8 +271,8 @@ class PatternCompileTest(test.TestCase):
|
||||
|
||||
def test_checks_for_dups_globally(self):
|
||||
flo = gf.Flow("test").add(
|
||||
gf.Flow("int1").add(t_utils.DummyTask(name="a")),
|
||||
gf.Flow("int2").add(t_utils.DummyTask(name="a")))
|
||||
gf.Flow("int1").add(test_utils.DummyTask(name="a")),
|
||||
gf.Flow("int2").add(test_utils.DummyTask(name="a")))
|
||||
self.assertRaisesRegexp(exc.Duplicate,
|
||||
'^Atoms with duplicate names',
|
||||
compiler.PatternCompiler().compile, flo)
|
||||
@@ -325,7 +315,7 @@ class PatternCompileTest(test.TestCase):
|
||||
|
||||
def test_retry_in_linear_flow_with_tasks(self):
|
||||
c = retry.AlwaysRevert("c")
|
||||
a, b = _make_many(2)
|
||||
a, b = test_utils.make_many(2)
|
||||
flo = lf.Flow("test", c).add(a, b)
|
||||
compilation = compiler.PatternCompiler().compile(flo)
|
||||
g = compilation.execution_graph
|
||||
@@ -343,7 +333,7 @@ class PatternCompileTest(test.TestCase):
|
||||
|
||||
def test_retry_in_unordered_flow_with_tasks(self):
|
||||
c = retry.AlwaysRevert("c")
|
||||
a, b = _make_many(2)
|
||||
a, b = test_utils.make_many(2)
|
||||
flo = uf.Flow("test", c).add(a, b)
|
||||
compilation = compiler.PatternCompiler().compile(flo)
|
||||
g = compilation.execution_graph
|
||||
@@ -361,7 +351,7 @@ class PatternCompileTest(test.TestCase):
|
||||
|
||||
def test_retry_in_graph_flow_with_tasks(self):
|
||||
r = retry.AlwaysRevert("cp")
|
||||
a, b, c = _make_many(3)
|
||||
a, b, c = test_utils.make_many(3)
|
||||
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
|
||||
compilation = compiler.PatternCompiler().compile(flo)
|
||||
g = compilation.execution_graph
|
||||
@@ -382,7 +372,7 @@ class PatternCompileTest(test.TestCase):
|
||||
def test_retries_hierarchy(self):
|
||||
c1 = retry.AlwaysRevert("cp1")
|
||||
c2 = retry.AlwaysRevert("cp2")
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = lf.Flow("test", c1).add(
|
||||
a,
|
||||
lf.Flow("test", c2).add(b, c),
|
||||
@@ -407,7 +397,7 @@ class PatternCompileTest(test.TestCase):
|
||||
|
||||
def test_retry_subflows_hierarchy(self):
|
||||
c1 = retry.AlwaysRevert("cp1")
|
||||
a, b, c, d = _make_many(4)
|
||||
a, b, c, d = test_utils.make_many(4)
|
||||
flo = lf.Flow("test", c1).add(
|
||||
a,
|
||||
lf.Flow("test").add(b, c),
|
||||
|
||||
169
taskflow/tests/unit/test_action_engine_runner.py
Normal file
169
taskflow/tests/unit/test_action_engine_runner.py
Normal file
@@ -0,0 +1,169 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
|
||||
from taskflow.engines.action_engine import compiler
|
||||
from taskflow.engines.action_engine import executor
|
||||
from taskflow.engines.action_engine import runtime
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow import states as st
|
||||
from taskflow import storage
|
||||
from taskflow import test
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils as pu
|
||||
|
||||
|
||||
class RunnerTest(test.TestCase):
|
||||
def _make_runtime(self, flow, initial_state=None):
|
||||
compilation = compiler.PatternCompiler().compile(flow)
|
||||
flow_detail = pu.create_flow_detail(flow)
|
||||
store = storage.SingleThreadedStorage(flow_detail)
|
||||
# This ensures the tasks exist in storage...
|
||||
for task in compilation.execution_graph:
|
||||
store.ensure_task(task.name)
|
||||
if initial_state:
|
||||
store.set_flow_state(initial_state)
|
||||
task_notifier = misc.Notifier()
|
||||
task_executor = executor.SerialTaskExecutor()
|
||||
task_executor.start()
|
||||
self.addCleanup(task_executor.stop)
|
||||
return runtime.Runtime(compiler.PatternCompiler().compile(flow),
|
||||
store, task_notifier, task_executor)
|
||||
|
||||
def test_running(self):
|
||||
flow = lf.Flow("root")
|
||||
flow.add(*test_utils.make_many(1))
|
||||
|
||||
rt = self._make_runtime(flow, initial_state=st.RUNNING)
|
||||
self.assertTrue(rt.runner.is_running())
|
||||
|
||||
rt = self._make_runtime(flow, initial_state=st.SUSPENDED)
|
||||
self.assertFalse(rt.runner.is_running())
|
||||
|
||||
def test_run_iterations(self):
|
||||
flow = lf.Flow("root")
|
||||
tasks = test_utils.make_many(
|
||||
1, task_cls=test_utils.TaskNoRequiresNoReturns)
|
||||
flow.add(*tasks)
|
||||
|
||||
rt = self._make_runtime(flow, initial_state=st.RUNNING)
|
||||
self.assertTrue(rt.runner.is_running())
|
||||
|
||||
it = rt.runner.run_iter()
|
||||
state, failures = six.next(it)
|
||||
self.assertEqual(st.RESUMING, state)
|
||||
self.assertEqual(0, len(failures))
|
||||
|
||||
state, failures = six.next(it)
|
||||
self.assertEqual(st.SCHEDULING, state)
|
||||
self.assertEqual(0, len(failures))
|
||||
|
||||
state, failures = six.next(it)
|
||||
self.assertEqual(st.WAITING, state)
|
||||
self.assertEqual(0, len(failures))
|
||||
|
||||
state, failures = six.next(it)
|
||||
self.assertEqual(st.ANALYZING, state)
|
||||
self.assertEqual(0, len(failures))
|
||||
|
||||
state, failures = six.next(it)
|
||||
self.assertEqual(st.SUCCESS, state)
|
||||
self.assertEqual(0, len(failures))
|
||||
|
||||
self.assertRaises(StopIteration, six.next, it)
|
||||
|
||||
def test_run_iterations_reverted(self):
|
||||
flow = lf.Flow("root")
|
||||
tasks = test_utils.make_many(
|
||||
1, task_cls=test_utils.TaskWithFailure)
|
||||
flow.add(*tasks)
|
||||
|
||||
rt = self._make_runtime(flow, initial_state=st.RUNNING)
|
||||
self.assertTrue(rt.runner.is_running())
|
||||
|
||||
transitions = list(rt.runner.run_iter())
|
||||
state, failures = transitions[-1]
|
||||
self.assertEqual(st.REVERTED, state)
|
||||
self.assertEqual([], failures)
|
||||
|
||||
self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name))
|
||||
|
||||
def test_run_iterations_failure(self):
|
||||
flow = lf.Flow("root")
|
||||
tasks = test_utils.make_many(
|
||||
1, task_cls=test_utils.NastyFailingTask)
|
||||
flow.add(*tasks)
|
||||
|
||||
rt = self._make_runtime(flow, initial_state=st.RUNNING)
|
||||
self.assertTrue(rt.runner.is_running())
|
||||
|
||||
transitions = list(rt.runner.run_iter())
|
||||
state, failures = transitions[-1]
|
||||
self.assertEqual(st.FAILURE, state)
|
||||
self.assertEqual(1, len(failures))
|
||||
failure = failures[0]
|
||||
self.assertTrue(failure.check(RuntimeError))
|
||||
|
||||
self.assertEqual(st.FAILURE, rt.storage.get_atom_state(tasks[0].name))
|
||||
|
||||
def test_run_iterations_suspended(self):
|
||||
flow = lf.Flow("root")
|
||||
tasks = test_utils.make_many(
|
||||
2, task_cls=test_utils.TaskNoRequiresNoReturns)
|
||||
flow.add(*tasks)
|
||||
|
||||
rt = self._make_runtime(flow, initial_state=st.RUNNING)
|
||||
self.assertTrue(rt.runner.is_running())
|
||||
|
||||
transitions = []
|
||||
for state, failures in rt.runner.run_iter():
|
||||
transitions.append((state, failures))
|
||||
if state == st.ANALYZING:
|
||||
rt.storage.set_flow_state(st.SUSPENDED)
|
||||
state, failures = transitions[-1]
|
||||
self.assertEqual(st.SUSPENDED, state)
|
||||
self.assertEqual([], failures)
|
||||
|
||||
self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
|
||||
self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[1].name))
|
||||
|
||||
def test_run_iterations_suspended_failure(self):
|
||||
flow = lf.Flow("root")
|
||||
sad_tasks = test_utils.make_many(
|
||||
1, task_cls=test_utils.NastyFailingTask)
|
||||
flow.add(*sad_tasks)
|
||||
happy_tasks = test_utils.make_many(
|
||||
1, task_cls=test_utils.TaskNoRequiresNoReturns, offset=1)
|
||||
flow.add(*happy_tasks)
|
||||
|
||||
rt = self._make_runtime(flow, initial_state=st.RUNNING)
|
||||
self.assertTrue(rt.runner.is_running())
|
||||
|
||||
transitions = []
|
||||
for state, failures in rt.runner.run_iter():
|
||||
transitions.append((state, failures))
|
||||
if state == st.ANALYZING:
|
||||
rt.storage.set_flow_state(st.SUSPENDED)
|
||||
state, failures = transitions[-1]
|
||||
self.assertEqual(st.SUSPENDED, state)
|
||||
self.assertEqual([], failures)
|
||||
|
||||
self.assertEqual(st.PENDING,
|
||||
rt.storage.get_atom_state(happy_tasks[0].name))
|
||||
self.assertEqual(st.FAILURE,
|
||||
rt.storage.get_atom_state(sad_tasks[0].name))
|
||||
@@ -15,6 +15,7 @@
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import string
|
||||
import threading
|
||||
|
||||
import six
|
||||
@@ -346,3 +347,16 @@ class WaitForOneFromTask(SaveOrderTask):
|
||||
if name not in self.wait_for or state not in self.wait_states:
|
||||
return
|
||||
self.event.set()
|
||||
|
||||
|
||||
def make_many(amount, task_cls=DummyTask, offset=0):
|
||||
name_pool = string.ascii_lowercase + string.ascii_uppercase
|
||||
tasks = []
|
||||
while amount > 0:
|
||||
if offset >= len(name_pool):
|
||||
raise AssertionError('Name pool size to small (%s < %s)'
|
||||
% (len(name_pool), offset + 1))
|
||||
tasks.append(task_cls(name=name_pool[offset]))
|
||||
offset += 1
|
||||
amount -= 1
|
||||
return tasks
|
||||
|
||||
Reference in New Issue
Block a user