Tweak engine iteration 'close-up shop' runtime path

1. Have the runner yield the final set of failures instead of
   raising them, this allows the same yield syntax to be used
   for all exit points that the runner run_iter() produces and
   now raise failures from the main engine run loop to match this
   change.
2. Use a context manager instead of try/finally to start and
   stop the action engines task executor (teenie niceness...)
3. When the engine run_iter() is used and the generator that is
   returned is closed, instead of breaking from the run loop, which
   can leave running tasks incomplete instead continue running and
   signal to the runner that the engine has suspended itself. This
   ensures that the running atoms are not lost when the generator from
   run_iter() is closed (for whatever reason) before finishing.

Also adds a bunch of useful tests that directly test the runner instead
of the indirect testing that we were doing before.

Fixes bug 1361013

Change-Id: I1b598e26f0b3877c8f7004f87bacdb7f5e9c9897
This commit is contained in:
Joshua Harlow
2014-08-24 21:11:42 -07:00
committed by Joshua Harlow
parent 7c3332e49b
commit bfaa109821
5 changed files with 262 additions and 71 deletions

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import threading
from taskflow.engines.action_engine import compiler
@@ -30,6 +31,16 @@ from taskflow.utils import misc
from taskflow.utils import reflection
@contextlib.contextmanager
def _start_stop(executor):
# A teenie helper context manager to safely start/stop a executor...
executor.start()
try:
yield executor
finally:
executor.stop()
class ActionEngine(base.EngineBase):
"""Generic action-based engine.
@@ -110,31 +121,38 @@ class ActionEngine(base.EngineBase):
"""
self.compile()
self.prepare()
self._task_executor.start()
state = None
runner = self._runtime.runner
try:
last_state = None
with _start_stop(self._task_executor):
self._change_state(states.RUNNING)
for state in runner.run_iter(timeout=timeout):
try:
try_suspend = yield state
except GeneratorExit:
break
else:
if try_suspend:
try:
closed = False
for (last_state, failures) in runner.run_iter(timeout=timeout):
if failures:
misc.Failure.reraise_if_any(failures)
if closed:
continue
try:
try_suspend = yield last_state
except GeneratorExit:
# The generator was closed, attempt to suspend and
# continue looping until we have cleanly closed up
# shop...
closed = True
self.suspend()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(states.FAILURE)
else:
ignorable_states = getattr(runner, 'ignorable_states', [])
if state and state not in ignorable_states:
self._change_state(state)
if state != states.SUSPENDED and state != states.SUCCESS:
failures = self.storage.get_failures()
misc.Failure.reraise_if_any(failures.values())
finally:
self._task_executor.stop()
else:
if try_suspend:
self.suspend()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(states.FAILURE)
else:
ignorable_states = getattr(runner, 'ignorable_states', [])
if last_state and last_state not in ignorable_states:
self._change_state(last_state)
if last_state not in [states.SUSPENDED, states.SUCCESS]:
failures = self.storage.get_failures()
misc.Failure.reraise_if_any(failures.values())
def _change_state(self, state):
with self._state_lock:

View File

@@ -72,12 +72,12 @@ class Runner(object):
timeout = _WAITING_TIMEOUT
# Prepare flow to be resumed
yield st.RESUMING
yield (st.RESUMING, [])
next_nodes = self._completer.resume()
next_nodes.update(self._analyzer.get_next_nodes())
# Schedule nodes to be worked on
yield st.SCHEDULING
yield (st.SCHEDULING, [])
if self.is_running():
not_done, failures = self._scheduler.schedule(next_nodes)
else:
@@ -90,7 +90,7 @@ class Runner(object):
# preempt those tasks (maybe in the future we will be better able to do
# this).
while not_done:
yield st.WAITING
yield (st.WAITING, [])
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
@@ -101,7 +101,7 @@ class Runner(object):
# failures). If failures occurred just continue processing what
# is running (so that we don't leave it abandoned) but do not
# schedule anything new.
yield st.ANALYZING
yield (st.ANALYZING, [])
next_nodes = set()
for future in done:
try:
@@ -119,7 +119,7 @@ class Runner(object):
else:
next_nodes.update(more_nodes)
if next_nodes and not failures and self.is_running():
yield st.SCHEDULING
yield (st.SCHEDULING, [])
# Recheck incase someone suspended it.
if self.is_running():
more_not_done, failures = self._scheduler.schedule(
@@ -127,10 +127,10 @@ class Runner(object):
not_done.update(more_not_done)
if failures:
misc.Failure.reraise_if_any(failures)
if self._analyzer.get_next_nodes():
yield st.SUSPENDED
yield (st.FAILURE, failures)
elif self._analyzer.get_next_nodes():
yield (st.SUSPENDED, [])
elif self._analyzer.is_success():
yield st.SUCCESS
yield (st.SUCCESS, [])
else:
yield st.REVERTED
yield (st.REVERTED, [])

View File

@@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import string
from taskflow.engines.action_engine import compiler
from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gf
@@ -23,20 +21,12 @@ from taskflow.patterns import linear_flow as lf
from taskflow.patterns import unordered_flow as uf
from taskflow import retry
from taskflow import test
from taskflow.tests import utils as t_utils
def _make_many(amount):
assert amount <= len(string.ascii_lowercase), 'Not enough letters'
tasks = []
for i in range(0, amount):
tasks.append(t_utils.DummyTask(name=string.ascii_lowercase[i]))
return tasks
from taskflow.tests import utils as test_utils
class PatternCompileTest(test.TestCase):
def test_task(self):
task = t_utils.DummyTask(name='a')
task = test_utils.DummyTask(name='a')
compilation = compiler.PatternCompiler().compile(task)
g = compilation.execution_graph
self.assertEqual(list(g.nodes()), [task])
@@ -54,7 +44,7 @@ class PatternCompileTest(test.TestCase):
compiler.PatternCompiler().compile, 42)
def test_linear(self):
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test")
flo.add(a, b, c)
sflo = lf.Flow("sub-test")
@@ -74,7 +64,7 @@ class PatternCompileTest(test.TestCase):
self.assertEqual([a], list(g.no_predecessors_iter()))
def test_invalid(self):
a, b, c = _make_many(3)
a, b, c = test_utils.make_many(3)
flo = lf.Flow("test")
flo.add(a, b, c)
flo.add(flo)
@@ -82,7 +72,7 @@ class PatternCompileTest(test.TestCase):
compiler.PatternCompiler().compile, flo)
def test_unordered(self):
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = uf.Flow("test")
flo.add(a, b, c, d)
compilation = compiler.PatternCompiler().compile(flo)
@@ -95,7 +85,7 @@ class PatternCompileTest(test.TestCase):
set(g.no_predecessors_iter()))
def test_linear_nested(self):
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test")
flo.add(a, b)
flo2 = uf.Flow("test2")
@@ -119,7 +109,7 @@ class PatternCompileTest(test.TestCase):
self.assertTrue(g.has_edge(b, d))
def test_unordered_nested(self):
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = uf.Flow("test")
flo.add(a, b)
flo2 = lf.Flow("test2")
@@ -142,7 +132,7 @@ class PatternCompileTest(test.TestCase):
self.assertEqual(1, lb.number_of_edges())
def test_unordered_nested_in_linear(self):
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow('lt').add(
a,
uf.Flow('ut').add(b, c),
@@ -159,7 +149,7 @@ class PatternCompileTest(test.TestCase):
])
def test_graph(self):
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -169,7 +159,7 @@ class PatternCompileTest(test.TestCase):
self.assertEqual(0, g.number_of_edges())
def test_graph_nested(self):
a, b, c, d, e, f, g = _make_many(7)
a, b, c, d, e, f, g = test_utils.make_many(7)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -186,7 +176,7 @@ class PatternCompileTest(test.TestCase):
])
def test_graph_nested_graph(self):
a, b, c, d, e, f, g = _make_many(7)
a, b, c, d, e, f, g = test_utils.make_many(7)
flo = gf.Flow("test")
flo.add(a, b, c, d)
@@ -200,7 +190,7 @@ class PatternCompileTest(test.TestCase):
self.assertEqual(0, g.number_of_edges())
def test_graph_links(self):
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = gf.Flow("test")
flo.add(a, b, c, d)
flo.link(a, b)
@@ -219,8 +209,8 @@ class PatternCompileTest(test.TestCase):
self.assertItemsEqual([d], g.no_successors_iter())
def test_graph_dependencies(self):
a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = t_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=['x'])
flo = gf.Flow("test").add(a, b)
compilation = compiler.PatternCompiler().compile(flo)
@@ -233,9 +223,9 @@ class PatternCompileTest(test.TestCase):
self.assertItemsEqual([b], g.no_successors_iter())
def test_graph_nested_requires(self):
a = t_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = t_utils.ProvidesRequiresTask('b', provides=[], requires=[])
c = t_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
a = test_utils.ProvidesRequiresTask('a', provides=['x'], requires=[])
b = test_utils.ProvidesRequiresTask('b', provides=[], requires=[])
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=['x'])
flo = gf.Flow("test").add(
a,
lf.Flow("test2").add(b, c)
@@ -252,9 +242,9 @@ class PatternCompileTest(test.TestCase):
self.assertItemsEqual([c], g.no_successors_iter())
def test_graph_nested_provides(self):
a = t_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
b = t_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
c = t_utils.ProvidesRequiresTask('c', provides=[], requires=[])
a = test_utils.ProvidesRequiresTask('a', provides=[], requires=['x'])
b = test_utils.ProvidesRequiresTask('b', provides=['x'], requires=[])
c = test_utils.ProvidesRequiresTask('c', provides=[], requires=[])
flo = gf.Flow("test").add(
a,
lf.Flow("test2").add(b, c)
@@ -272,8 +262,8 @@ class PatternCompileTest(test.TestCase):
def test_checks_for_dups(self):
flo = gf.Flow("test").add(
t_utils.DummyTask(name="a"),
t_utils.DummyTask(name="a")
test_utils.DummyTask(name="a"),
test_utils.DummyTask(name="a")
)
self.assertRaisesRegexp(exc.Duplicate,
'^Atoms with duplicate names',
@@ -281,8 +271,8 @@ class PatternCompileTest(test.TestCase):
def test_checks_for_dups_globally(self):
flo = gf.Flow("test").add(
gf.Flow("int1").add(t_utils.DummyTask(name="a")),
gf.Flow("int2").add(t_utils.DummyTask(name="a")))
gf.Flow("int1").add(test_utils.DummyTask(name="a")),
gf.Flow("int2").add(test_utils.DummyTask(name="a")))
self.assertRaisesRegexp(exc.Duplicate,
'^Atoms with duplicate names',
compiler.PatternCompiler().compile, flo)
@@ -325,7 +315,7 @@ class PatternCompileTest(test.TestCase):
def test_retry_in_linear_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = _make_many(2)
a, b = test_utils.make_many(2)
flo = lf.Flow("test", c).add(a, b)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
@@ -343,7 +333,7 @@ class PatternCompileTest(test.TestCase):
def test_retry_in_unordered_flow_with_tasks(self):
c = retry.AlwaysRevert("c")
a, b = _make_many(2)
a, b = test_utils.make_many(2)
flo = uf.Flow("test", c).add(a, b)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
@@ -361,7 +351,7 @@ class PatternCompileTest(test.TestCase):
def test_retry_in_graph_flow_with_tasks(self):
r = retry.AlwaysRevert("cp")
a, b, c = _make_many(3)
a, b, c = test_utils.make_many(3)
flo = gf.Flow("test", r).add(a, b, c).link(b, c)
compilation = compiler.PatternCompiler().compile(flo)
g = compilation.execution_graph
@@ -382,7 +372,7 @@ class PatternCompileTest(test.TestCase):
def test_retries_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
c2 = retry.AlwaysRevert("cp2")
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test", c1).add(
a,
lf.Flow("test", c2).add(b, c),
@@ -407,7 +397,7 @@ class PatternCompileTest(test.TestCase):
def test_retry_subflows_hierarchy(self):
c1 = retry.AlwaysRevert("cp1")
a, b, c, d = _make_many(4)
a, b, c, d = test_utils.make_many(4)
flo = lf.Flow("test", c1).add(
a,
lf.Flow("test").add(b, c),

View File

@@ -0,0 +1,169 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from taskflow.engines.action_engine import compiler
from taskflow.engines.action_engine import executor
from taskflow.engines.action_engine import runtime
from taskflow.patterns import linear_flow as lf
from taskflow import states as st
from taskflow import storage
from taskflow import test
from taskflow.tests import utils as test_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils as pu
class RunnerTest(test.TestCase):
def _make_runtime(self, flow, initial_state=None):
compilation = compiler.PatternCompiler().compile(flow)
flow_detail = pu.create_flow_detail(flow)
store = storage.SingleThreadedStorage(flow_detail)
# This ensures the tasks exist in storage...
for task in compilation.execution_graph:
store.ensure_task(task.name)
if initial_state:
store.set_flow_state(initial_state)
task_notifier = misc.Notifier()
task_executor = executor.SerialTaskExecutor()
task_executor.start()
self.addCleanup(task_executor.stop)
return runtime.Runtime(compiler.PatternCompiler().compile(flow),
store, task_notifier, task_executor)
def test_running(self):
flow = lf.Flow("root")
flow.add(*test_utils.make_many(1))
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.is_running())
rt = self._make_runtime(flow, initial_state=st.SUSPENDED)
self.assertFalse(rt.runner.is_running())
def test_run_iterations(self):
flow = lf.Flow("root")
tasks = test_utils.make_many(
1, task_cls=test_utils.TaskNoRequiresNoReturns)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.is_running())
it = rt.runner.run_iter()
state, failures = six.next(it)
self.assertEqual(st.RESUMING, state)
self.assertEqual(0, len(failures))
state, failures = six.next(it)
self.assertEqual(st.SCHEDULING, state)
self.assertEqual(0, len(failures))
state, failures = six.next(it)
self.assertEqual(st.WAITING, state)
self.assertEqual(0, len(failures))
state, failures = six.next(it)
self.assertEqual(st.ANALYZING, state)
self.assertEqual(0, len(failures))
state, failures = six.next(it)
self.assertEqual(st.SUCCESS, state)
self.assertEqual(0, len(failures))
self.assertRaises(StopIteration, six.next, it)
def test_run_iterations_reverted(self):
flow = lf.Flow("root")
tasks = test_utils.make_many(
1, task_cls=test_utils.TaskWithFailure)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.is_running())
transitions = list(rt.runner.run_iter())
state, failures = transitions[-1]
self.assertEqual(st.REVERTED, state)
self.assertEqual([], failures)
self.assertEqual(st.REVERTED, rt.storage.get_atom_state(tasks[0].name))
def test_run_iterations_failure(self):
flow = lf.Flow("root")
tasks = test_utils.make_many(
1, task_cls=test_utils.NastyFailingTask)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.is_running())
transitions = list(rt.runner.run_iter())
state, failures = transitions[-1]
self.assertEqual(st.FAILURE, state)
self.assertEqual(1, len(failures))
failure = failures[0]
self.assertTrue(failure.check(RuntimeError))
self.assertEqual(st.FAILURE, rt.storage.get_atom_state(tasks[0].name))
def test_run_iterations_suspended(self):
flow = lf.Flow("root")
tasks = test_utils.make_many(
2, task_cls=test_utils.TaskNoRequiresNoReturns)
flow.add(*tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.is_running())
transitions = []
for state, failures in rt.runner.run_iter():
transitions.append((state, failures))
if state == st.ANALYZING:
rt.storage.set_flow_state(st.SUSPENDED)
state, failures = transitions[-1]
self.assertEqual(st.SUSPENDED, state)
self.assertEqual([], failures)
self.assertEqual(st.SUCCESS, rt.storage.get_atom_state(tasks[0].name))
self.assertEqual(st.PENDING, rt.storage.get_atom_state(tasks[1].name))
def test_run_iterations_suspended_failure(self):
flow = lf.Flow("root")
sad_tasks = test_utils.make_many(
1, task_cls=test_utils.NastyFailingTask)
flow.add(*sad_tasks)
happy_tasks = test_utils.make_many(
1, task_cls=test_utils.TaskNoRequiresNoReturns, offset=1)
flow.add(*happy_tasks)
rt = self._make_runtime(flow, initial_state=st.RUNNING)
self.assertTrue(rt.runner.is_running())
transitions = []
for state, failures in rt.runner.run_iter():
transitions.append((state, failures))
if state == st.ANALYZING:
rt.storage.set_flow_state(st.SUSPENDED)
state, failures = transitions[-1]
self.assertEqual(st.SUSPENDED, state)
self.assertEqual([], failures)
self.assertEqual(st.PENDING,
rt.storage.get_atom_state(happy_tasks[0].name))
self.assertEqual(st.FAILURE,
rt.storage.get_atom_state(sad_tasks[0].name))

View File

@@ -15,6 +15,7 @@
# under the License.
import contextlib
import string
import threading
import six
@@ -346,3 +347,16 @@ class WaitForOneFromTask(SaveOrderTask):
if name not in self.wait_for or state not in self.wait_states:
return
self.event.set()
def make_many(amount, task_cls=DummyTask, offset=0):
name_pool = string.ascii_lowercase + string.ascii_uppercase
tasks = []
while amount > 0:
if offset >= len(name_pool):
raise AssertionError('Name pool size to small (%s < %s)'
% (len(name_pool), offset + 1))
tasks.append(task_cls(name=name_pool[offset]))
offset += 1
amount -= 1
return tasks