Add ability to reset an engine via a reset
method
If an engines work was previously partially completed and it is desired to reset it (and re-run) so that partially completed or ignored (or other) work inside of it can run again make that possible by exposing and documenting a new `reset` method (and use it internally as well). Change-Id: I47f82010a2108d5d8fd5e42ca9f7e5f165e65488
This commit is contained in:
parent
03702c7676
commit
58fbfd0f90
File diff suppressed because one or more lines are too long
Before Width: | Height: | Size: 25 KiB After Width: | Height: | Size: 26 KiB |
@ -50,10 +50,11 @@ Flow
|
||||
:align: center
|
||||
:alt: Flow state transitions
|
||||
|
||||
**PENDING** - A flow starts its execution lifecycle in this state (it has no
|
||||
state prior to being ran by an engine, since flow(s) are just pattern(s)
|
||||
that define the semantics and ordering of their contents and flows gain
|
||||
state only when they are executed).
|
||||
**PENDING** - A flow starts (or
|
||||
via :py:meth:`~taskflow.engines.base.Engine.reset`) its execution lifecycle
|
||||
in this state (it has no state prior to being ran by an engine, since
|
||||
flow(s) are just pattern(s) that define the semantics and ordering of their
|
||||
contents and flows gain state only when they are executed).
|
||||
|
||||
**RUNNING** - In this state the engine running a flow progresses through the
|
||||
flow.
|
||||
|
@ -283,8 +283,19 @@ class ActionEngine(base.Engine):
|
||||
self._storage_ensured = True
|
||||
# Reset everything back to pending (if we were previously reverted).
|
||||
if self.storage.get_flow_state() == states.REVERTED:
|
||||
self._runtime.reset_all()
|
||||
self._change_state(states.PENDING)
|
||||
self.reset()
|
||||
|
||||
@fasteners.locked
|
||||
def reset(self):
|
||||
if not self._storage_ensured:
|
||||
raise exc.InvalidState("Can not reset an engine"
|
||||
" which has not has its storage"
|
||||
" populated")
|
||||
# This transitions *all* contained atoms back into the PENDING state
|
||||
# with an intention to EXECUTE (or dies trying to do that) and then
|
||||
# changes the state of the flow to PENDING so that it can then run...
|
||||
self._runtime.reset_all()
|
||||
self._change_state(states.PENDING)
|
||||
|
||||
@fasteners.locked
|
||||
def compile(self):
|
||||
|
@ -92,13 +92,26 @@ class Engine(object):
|
||||
could not be achieved.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def reset(self):
|
||||
"""Reset back to the ``PENDING`` state.
|
||||
|
||||
If a flow had previously ended up (from a prior engine
|
||||
:py:func:`.run`) in the ``FAILURE``, ``SUCCESS`` or ``REVERTED``
|
||||
states (or for some reason it ended up in an intermediary state) it
|
||||
can be desireable to make it possible to run it again. Calling this
|
||||
method enables that to occur (without causing a state transition
|
||||
failure, which would typically occur if :py:meth:`.run` is called
|
||||
directly without doing a reset).
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
def prepare(self):
|
||||
"""Performs any pre-run, but post-compilation actions.
|
||||
|
||||
NOTE(harlowja): During preparation it is currently assumed that the
|
||||
underlying storage will be initialized, the atoms will be reset and
|
||||
the engine will enter the PENDING state.
|
||||
the engine will enter the ``PENDING`` state.
|
||||
"""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -103,6 +103,7 @@ _ALLOWED_FLOW_TRANSITIONS = frozenset((
|
||||
(FAILURE, RUNNING), # see note below
|
||||
|
||||
(REVERTED, PENDING), # try again
|
||||
(SUCCESS, PENDING), # run it again
|
||||
|
||||
(SUSPENDING, SUSPENDED), # suspend finished
|
||||
(SUSPENDING, SUCCESS), # all tasks finished while we were waiting
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
import contextlib
|
||||
import functools
|
||||
import threading
|
||||
|
||||
import futurist
|
||||
import six
|
||||
@ -798,6 +799,138 @@ class EngineMissingDepsTest(utils.EngineTestBase):
|
||||
self.assertIsNotNone(c_e.cause)
|
||||
|
||||
|
||||
class EngineResetTests(utils.EngineTestBase):
|
||||
def test_completed_reset_run_again(self):
|
||||
task1 = utils.ProgressingTask(name='task1')
|
||||
task2 = utils.ProgressingTask(name='task2')
|
||||
task3 = utils.ProgressingTask(name='task3')
|
||||
|
||||
flow = lf.Flow('root')
|
||||
flow.add(task1, task2, task3)
|
||||
|
||||
engine = self._make_engine(flow)
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
expected = [
|
||||
'task1.t RUNNING',
|
||||
'task1.t SUCCESS(5)',
|
||||
|
||||
'task2.t RUNNING',
|
||||
'task2.t SUCCESS(5)',
|
||||
|
||||
'task3.t RUNNING',
|
||||
'task3.t SUCCESS(5)',
|
||||
]
|
||||
self.assertEqual(expected, capturer.values)
|
||||
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
self.assertEqual([], capturer.values)
|
||||
|
||||
engine.reset()
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
self.assertEqual(expected, capturer.values)
|
||||
|
||||
def test_failed_reset_run_again(self):
|
||||
task1 = utils.ProgressingTask(name='task1')
|
||||
task2 = utils.ProgressingTask(name='task2')
|
||||
task3 = utils.FailingTask(name='task3')
|
||||
|
||||
flow = lf.Flow('root')
|
||||
flow.add(task1, task2, task3)
|
||||
engine = self._make_engine(flow)
|
||||
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
# Also allow a WrappedFailure exception so that when this is used
|
||||
# with the WBE engine (as it can't re-raise the original
|
||||
# exception) that we will work correctly....
|
||||
self.assertRaises((RuntimeError, exc.WrappedFailure), engine.run)
|
||||
|
||||
expected = [
|
||||
'task1.t RUNNING',
|
||||
'task1.t SUCCESS(5)',
|
||||
'task2.t RUNNING',
|
||||
'task2.t SUCCESS(5)',
|
||||
'task3.t RUNNING',
|
||||
|
||||
'task3.t FAILURE(Failure: RuntimeError: Woot!)',
|
||||
|
||||
'task3.t REVERTING',
|
||||
'task3.t REVERTED(None)',
|
||||
'task2.t REVERTING',
|
||||
'task2.t REVERTED(None)',
|
||||
'task1.t REVERTING',
|
||||
'task1.t REVERTED(None)',
|
||||
]
|
||||
self.assertEqual(expected, capturer.values)
|
||||
|
||||
engine.reset()
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
self.assertRaises((RuntimeError, exc.WrappedFailure), engine.run)
|
||||
self.assertEqual(expected, capturer.values)
|
||||
|
||||
def test_suspended_reset_run_again(self):
|
||||
task1 = utils.ProgressingTask(name='task1')
|
||||
task2 = utils.ProgressingTask(name='task2')
|
||||
task3 = utils.ProgressingTask(name='task3')
|
||||
|
||||
flow = lf.Flow('root')
|
||||
flow.add(task1, task2, task3)
|
||||
engine = self._make_engine(flow)
|
||||
suspend_at = object()
|
||||
expected_states = [
|
||||
states.RESUMING,
|
||||
states.SCHEDULING,
|
||||
states.WAITING,
|
||||
states.ANALYZING,
|
||||
states.SCHEDULING,
|
||||
states.WAITING,
|
||||
# Stop/suspend here...
|
||||
suspend_at,
|
||||
states.SUSPENDED,
|
||||
]
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
for i, st in enumerate(engine.run_iter()):
|
||||
expected = expected_states[i]
|
||||
if expected is suspend_at:
|
||||
engine.suspend()
|
||||
else:
|
||||
self.assertEqual(expected, st)
|
||||
|
||||
expected = [
|
||||
'task1.t RUNNING',
|
||||
'task1.t SUCCESS(5)',
|
||||
|
||||
'task2.t RUNNING',
|
||||
'task2.t SUCCESS(5)',
|
||||
]
|
||||
self.assertEqual(expected, capturer.values)
|
||||
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
expected = [
|
||||
'task3.t RUNNING',
|
||||
'task3.t SUCCESS(5)',
|
||||
]
|
||||
self.assertEqual(expected, capturer.values)
|
||||
|
||||
engine.reset()
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
expected = [
|
||||
'task1.t RUNNING',
|
||||
'task1.t SUCCESS(5)',
|
||||
|
||||
'task2.t RUNNING',
|
||||
'task2.t SUCCESS(5)',
|
||||
|
||||
'task3.t RUNNING',
|
||||
'task3.t SUCCESS(5)',
|
||||
]
|
||||
self.assertEqual(expected, capturer.values)
|
||||
|
||||
|
||||
class EngineGraphConditionalFlowTest(utils.EngineTestBase):
|
||||
|
||||
def test_graph_flow_conditional(self):
|
||||
@ -829,6 +962,54 @@ class EngineGraphConditionalFlowTest(utils.EngineTestBase):
|
||||
])
|
||||
self.assertEqual(expected, set(capturer.values))
|
||||
|
||||
def test_graph_flow_conditional_ignore_reset(self):
|
||||
allow_execute = threading.Event()
|
||||
flow = gf.Flow('root')
|
||||
|
||||
task1 = utils.ProgressingTask(name='task1')
|
||||
task2 = utils.ProgressingTask(name='task2')
|
||||
task3 = utils.ProgressingTask(name='task3')
|
||||
|
||||
flow.add(task1, task2, task3)
|
||||
flow.link(task1, task2)
|
||||
flow.link(task2, task3, decider=lambda history: allow_execute.is_set())
|
||||
|
||||
engine = self._make_engine(flow)
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
|
||||
expected = set([
|
||||
'task1.t RUNNING',
|
||||
'task1.t SUCCESS(5)',
|
||||
|
||||
'task2.t RUNNING',
|
||||
'task2.t SUCCESS(5)',
|
||||
|
||||
'task3.t IGNORE',
|
||||
])
|
||||
self.assertEqual(expected, set(capturer.values))
|
||||
self.assertEqual(states.IGNORE,
|
||||
engine.storage.get_atom_state('task3'))
|
||||
self.assertEqual(states.IGNORE,
|
||||
engine.storage.get_atom_intention('task3'))
|
||||
|
||||
engine.reset()
|
||||
allow_execute.set()
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
|
||||
expected = set([
|
||||
'task1.t RUNNING',
|
||||
'task1.t SUCCESS(5)',
|
||||
|
||||
'task2.t RUNNING',
|
||||
'task2.t SUCCESS(5)',
|
||||
|
||||
'task3.t RUNNING',
|
||||
'task3.t SUCCESS(5)',
|
||||
])
|
||||
self.assertEqual(expected, set(capturer.values))
|
||||
|
||||
def test_graph_flow_diamond_ignored(self):
|
||||
flow = gf.Flow('root')
|
||||
|
||||
@ -951,6 +1132,7 @@ class SerialEngineTest(EngineTaskTest,
|
||||
EngineOptionalRequirementsTest,
|
||||
EngineGraphFlowTest,
|
||||
EngineMissingDepsTest,
|
||||
EngineResetTests,
|
||||
EngineGraphConditionalFlowTest,
|
||||
EngineCheckingTaskTest,
|
||||
test.TestCase):
|
||||
@ -978,6 +1160,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
EngineOptionalRequirementsTest,
|
||||
EngineGraphFlowTest,
|
||||
EngineResetTests,
|
||||
EngineMissingDepsTest,
|
||||
EngineGraphConditionalFlowTest,
|
||||
EngineCheckingTaskTest,
|
||||
@ -1018,6 +1201,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
EngineOptionalRequirementsTest,
|
||||
EngineGraphFlowTest,
|
||||
EngineResetTests,
|
||||
EngineMissingDepsTest,
|
||||
EngineGraphConditionalFlowTest,
|
||||
EngineCheckingTaskTest,
|
||||
@ -1041,6 +1225,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
EngineOptionalRequirementsTest,
|
||||
EngineGraphFlowTest,
|
||||
EngineResetTests,
|
||||
EngineMissingDepsTest,
|
||||
EngineGraphConditionalFlowTest,
|
||||
test.TestCase):
|
||||
@ -1069,6 +1254,7 @@ class WorkerBasedEngineTest(EngineTaskTest,
|
||||
EngineLinearAndUnorderedExceptionsTest,
|
||||
EngineOptionalRequirementsTest,
|
||||
EngineGraphFlowTest,
|
||||
EngineResetTests,
|
||||
EngineMissingDepsTest,
|
||||
EngineGraphConditionalFlowTest,
|
||||
test.TestCase):
|
||||
|
Loading…
Reference in New Issue
Block a user