Merge "Add support for conditional execution"

This commit is contained in:
Jenkins
2015-07-09 21:25:17 +00:00
committed by Gerrit Code Review
12 changed files with 403 additions and 62 deletions

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 20 KiB

After

Width:  |  Height:  |  Size: 22 KiB

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 20 KiB

View File

@@ -124,6 +124,11 @@ or if needed will wait for all of the atoms it depends on to complete.
An engine running a task also transitions the task to the ``PENDING`` state An engine running a task also transitions the task to the ``PENDING`` state
after it was reverted and its containing flow was restarted or retried. after it was reverted and its containing flow was restarted or retried.
**IGNORE** - When a conditional decision has been made to skip (not
execute) the task the engine will transition the task to
the ``IGNORE`` state.
**RUNNING** - When an engine running the task starts to execute the task, the **RUNNING** - When an engine running the task starts to execute the task, the
engine will transition the task to the ``RUNNING`` state, and the task will engine will transition the task to the ``RUNNING`` state, and the task will
stay in this state until the tasks :py:meth:`~taskflow.task.BaseTask.execute` stay in this state until the tasks :py:meth:`~taskflow.task.BaseTask.execute`
@@ -171,6 +176,10 @@ flow that the retry is associated with by consulting its
An engine running a retry also transitions the retry to the ``PENDING`` state An engine running a retry also transitions the retry to the ``PENDING`` state
after it was reverted and its associated flow was restarted or retried. after it was reverted and its associated flow was restarted or retried.
**IGNORE** - When a conditional decision has been made to skip (not
execute) the retry the engine will transition the retry to
the ``IGNORE`` state.
**RUNNING** - When an engine starts to execute the retry, the engine **RUNNING** - When an engine starts to execute the retry, the engine
transitions the retry to the ``RUNNING`` state, and the retry stays in this transitions the retry to the ``RUNNING`` state, and the retry stays in this
state until its :py:meth:`~taskflow.retry.Retry.execute` method returns. state until its :py:meth:`~taskflow.retry.Retry.execute` method returns.

View File

@@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import itertools
from networkx.algorithms import traversal from networkx.algorithms import traversal
import six import six
@@ -21,6 +23,60 @@ from taskflow import retry as retry_atom
from taskflow import states as st from taskflow import states as st
class IgnoreDecider(object):
"""Checks any provided edge-deciders and determines if ok to run."""
def __init__(self, atom, edge_deciders):
self._atom = atom
self._edge_deciders = edge_deciders
def check(self, runtime):
"""Returns bool of whether this decider should allow running."""
results = {}
for name in six.iterkeys(self._edge_deciders):
results[name] = runtime.storage.get(name)
for local_decider in six.itervalues(self._edge_deciders):
if not local_decider(history=results):
return False
return True
def affect(self, runtime):
"""If the :py:func:`~.check` returns false, affects associated atoms.
This will alter the associated atom + successor atoms by setting there
state to ``IGNORE`` so that they are ignored in future runtime
activities.
"""
successors_iter = runtime.analyzer.iterate_subgraph(self._atom)
runtime.reset_nodes(itertools.chain([self._atom], successors_iter),
state=st.IGNORE, intention=st.IGNORE)
def check_and_affect(self, runtime):
"""Handles :py:func:`~.check` + :py:func:`~.affect` in right order."""
proceed = self.check(runtime)
if not proceed:
self.affect(runtime)
return proceed
class NoOpDecider(object):
"""No-op decider that says it is always ok to run & has no effect(s)."""
def check(self, runtime):
"""Always good to go."""
return True
def affect(self, runtime):
"""Does nothing."""
def check_and_affect(self, runtime):
"""Handles :py:func:`~.check` + :py:func:`~.affect` in right order.
Does nothing.
"""
return self.check(runtime)
class Analyzer(object): class Analyzer(object):
"""Analyzes a compilation and aids in execution processes. """Analyzes a compilation and aids in execution processes.
@@ -35,18 +91,21 @@ class Analyzer(object):
self._storage = runtime.storage self._storage = runtime.storage
self._execution_graph = runtime.compilation.execution_graph self._execution_graph = runtime.compilation.execution_graph
self._check_atom_transition = runtime.check_atom_transition self._check_atom_transition = runtime.check_atom_transition
self._fetch_edge_deciders = runtime.fetch_edge_deciders
def get_next_nodes(self, node=None): def get_next_nodes(self, node=None):
"""Get next nodes to run (originating from node or all nodes)."""
if node is None: if node is None:
execute = self.browse_nodes_for_execute() execute = self.browse_nodes_for_execute()
revert = self.browse_nodes_for_revert() revert = self.browse_nodes_for_revert()
return execute + revert return execute + revert
state = self.get_state(node) state = self.get_state(node)
intention = self._storage.get_atom_intention(node.name) intention = self._storage.get_atom_intention(node.name)
if state == st.SUCCESS: if state == st.SUCCESS:
if intention == st.REVERT: if intention == st.REVERT:
return [node] return [
(node, NoOpDecider()),
]
elif intention == st.EXECUTE: elif intention == st.EXECUTE:
return self.browse_nodes_for_execute(node) return self.browse_nodes_for_execute(node)
else: else:
@@ -61,70 +120,86 @@ class Analyzer(object):
def browse_nodes_for_execute(self, node=None): def browse_nodes_for_execute(self, node=None):
"""Browse next nodes to execute. """Browse next nodes to execute.
This returns a collection of nodes that are ready to be executed, if This returns a collection of nodes that *may* be ready to be
given a specific node it will only examine the successors of that node, executed, if given a specific node it will only examine the successors
otherwise it will examine the whole graph. of that node, otherwise it will examine the whole graph.
""" """
if node: if node is not None:
nodes = self._execution_graph.successors(node) nodes = self._execution_graph.successors(node)
else: else:
nodes = self._execution_graph.nodes_iter() nodes = self._execution_graph.nodes_iter()
ready_nodes = []
available_nodes = []
for node in nodes: for node in nodes:
if self._is_ready_for_execute(node): is_ready, late_decider = self._get_maybe_ready_for_execute(node)
available_nodes.append(node) if is_ready:
return available_nodes ready_nodes.append((node, late_decider))
return ready_nodes
def browse_nodes_for_revert(self, node=None): def browse_nodes_for_revert(self, node=None):
"""Browse next nodes to revert. """Browse next nodes to revert.
This returns a collection of nodes that are ready to be be reverted, if This returns a collection of nodes that *may* be ready to be be
given a specific node it will only examine the predecessors of that reverted, if given a specific node it will only examine the
node, otherwise it will examine the whole graph. predecessors of that node, otherwise it will examine the whole
graph.
""" """
if node: if node is not None:
nodes = self._execution_graph.predecessors(node) nodes = self._execution_graph.predecessors(node)
else: else:
nodes = self._execution_graph.nodes_iter() nodes = self._execution_graph.nodes_iter()
ready_nodes = []
available_nodes = []
for node in nodes: for node in nodes:
if self._is_ready_for_revert(node): is_ready, late_decider = self._get_maybe_ready_for_revert(node)
available_nodes.append(node) if is_ready:
return available_nodes ready_nodes.append((node, late_decider))
return ready_nodes
def _get_maybe_ready_for_execute(self, atom):
"""Returns if an atom is *likely* ready to be executed."""
def _is_ready_for_execute(self, atom):
"""Checks if atom is ready to be executed."""
state = self.get_state(atom) state = self.get_state(atom)
intention = self._storage.get_atom_intention(atom.name) intention = self._storage.get_atom_intention(atom.name)
transition = self._check_atom_transition(atom, state, st.RUNNING) transition = self._check_atom_transition(atom, state, st.RUNNING)
if not transition or intention != st.EXECUTE: if not transition or intention != st.EXECUTE:
return False return (False, None)
atom_names = [] predecessor_names = []
for prev_atom in self._execution_graph.predecessors(atom): for previous_atom in self._execution_graph.predecessors(atom):
atom_names.append(prev_atom.name) predecessor_names.append(previous_atom.name)
atom_states = self._storage.get_atoms_states(atom_names) predecessor_states = self._storage.get_atoms_states(predecessor_names)
return all(state == st.SUCCESS and intention == st.EXECUTE predecessor_states_iter = six.itervalues(predecessor_states)
for state, intention in six.itervalues(atom_states)) ok_to_run = all(state == st.SUCCESS and intention == st.EXECUTE
for state, intention in predecessor_states_iter)
if not ok_to_run:
return (False, None)
else:
edge_deciders = self._fetch_edge_deciders(atom)
return (True, IgnoreDecider(atom, edge_deciders))
def _get_maybe_ready_for_revert(self, atom):
"""Returns if an atom is *likely* ready to be reverted."""
def _is_ready_for_revert(self, atom):
"""Checks if atom is ready to be reverted."""
state = self.get_state(atom) state = self.get_state(atom)
intention = self._storage.get_atom_intention(atom.name) intention = self._storage.get_atom_intention(atom.name)
transition = self._check_atom_transition(atom, state, st.REVERTING) transition = self._check_atom_transition(atom, state, st.REVERTING)
if not transition or intention not in (st.REVERT, st.RETRY): if not transition or intention not in (st.REVERT, st.RETRY):
return False return (False, None)
atom_names = [] predecessor_names = []
for prev_atom in self._execution_graph.successors(atom): for previous_atom in self._execution_graph.successors(atom):
atom_names.append(prev_atom.name) predecessor_names.append(previous_atom.name)
atom_states = self._storage.get_atoms_states(atom_names) predecessor_states = self._storage.get_atoms_states(predecessor_names)
return all(state in (st.PENDING, st.REVERTED) predecessor_states_iter = six.itervalues(predecessor_states)
for state, intention in six.itervalues(atom_states)) ok_to_run = all(state in (st.PENDING, st.REVERTED)
for state, intention in predecessor_states_iter)
if not ok_to_run:
return (False, None)
else:
return (True, NoOpDecider())
def iterate_subgraph(self, atom): def iterate_subgraph(self, atom):
"""Iterates a subgraph connected to given atom.""" """Iterates a subgraph connected to given atom."""
@@ -142,17 +217,24 @@ class Analyzer(object):
yield node yield node
def iterate_all_nodes(self): def iterate_all_nodes(self):
"""Yields back all nodes in the execution graph."""
for node in self._execution_graph.nodes_iter(): for node in self._execution_graph.nodes_iter():
yield node yield node
def find_atom_retry(self, atom): def find_atom_retry(self, atom):
"""Returns the retry atom associated to the given atom (or none)."""
return self._execution_graph.node[atom].get('retry') return self._execution_graph.node[atom].get('retry')
def is_success(self): def is_success(self):
"""Checks if all nodes in the execution graph are in 'happy' state."""
for atom in self.iterate_all_nodes(): for atom in self.iterate_all_nodes():
if self.get_state(atom) != st.SUCCESS: atom_state = self.get_state(atom)
if atom_state == st.IGNORE:
continue
if atom_state != st.SUCCESS:
return False return False
return True return True
def get_state(self, atom): def get_state(self, atom):
"""Gets the state of a given atom (from the backend storage unit)."""
return self._storage.get_atom_state(atom.name) return self._storage.get_atom_state(atom.name)

View File

@@ -94,6 +94,7 @@ class Runner(object):
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
def __init__(self, runtime, waiter): def __init__(self, runtime, waiter):
self._runtime = runtime
self._analyzer = runtime.analyzer self._analyzer = runtime.analyzer
self._completer = runtime.completer self._completer = runtime.completer
self._scheduler = runtime.scheduler self._scheduler = runtime.scheduler
@@ -111,13 +112,26 @@ class Runner(object):
if timeout is None: if timeout is None:
timeout = _WAITING_TIMEOUT timeout = _WAITING_TIMEOUT
# Cache some local functions/methods...
do_schedule = self._scheduler.schedule
wait_for_any = self._waiter.wait_for_any
do_complete = self._completer.complete
def iter_next_nodes(target_node=None):
# Yields and filters and tweaks the next nodes to execute...
maybe_nodes = self._analyzer.get_next_nodes(node=target_node)
for node, late_decider in maybe_nodes:
proceed = late_decider.check_and_affect(self._runtime)
if proceed:
yield node
def resume(old_state, new_state, event): def resume(old_state, new_state, event):
# This reaction function just updates the state machines memory # This reaction function just updates the state machines memory
# to include any nodes that need to be executed (from a previous # to include any nodes that need to be executed (from a previous
# attempt, which may be empty if never ran before) and any nodes # attempt, which may be empty if never ran before) and any nodes
# that are now ready to be ran. # that are now ready to be ran.
memory.next_nodes.update(self._completer.resume()) memory.next_nodes.update(self._completer.resume())
memory.next_nodes.update(self._analyzer.get_next_nodes()) memory.next_nodes.update(iter_next_nodes())
return _SCHEDULE return _SCHEDULE
def game_over(old_state, new_state, event): def game_over(old_state, new_state, event):
@@ -127,7 +141,7 @@ class Runner(object):
# it is *always* called before the final state is entered. # it is *always* called before the final state is entered.
if memory.failures: if memory.failures:
return _FAILED return _FAILED
if self._analyzer.get_next_nodes(): if any(1 for node in iter_next_nodes()):
return _SUSPENDED return _SUSPENDED
elif self._analyzer.is_success(): elif self._analyzer.is_success():
return _SUCCESS return _SUCCESS
@@ -141,8 +155,7 @@ class Runner(object):
# that holds this information to stop or suspend); handles failures # that holds this information to stop or suspend); handles failures
# that occur during this process safely... # that occur during this process safely...
if self.runnable() and memory.next_nodes: if self.runnable() and memory.next_nodes:
not_done, failures = self._scheduler.schedule( not_done, failures = do_schedule(memory.next_nodes)
memory.next_nodes)
if not_done: if not_done:
memory.not_done.update(not_done) memory.not_done.update(not_done)
if failures: if failures:
@@ -155,8 +168,7 @@ class Runner(object):
# call sometime in the future, or equivalent that will work in # call sometime in the future, or equivalent that will work in
# py2 and py3. # py2 and py3.
if memory.not_done: if memory.not_done:
done, not_done = self._waiter.wait_for_any(memory.not_done, done, not_done = wait_for_any(memory.not_done, timeout)
timeout)
memory.done.update(done) memory.done.update(done)
memory.not_done = not_done memory.not_done = not_done
return _ANALYZE return _ANALYZE
@@ -173,7 +185,7 @@ class Runner(object):
node = fut.atom node = fut.atom
try: try:
event, result = fut.result() event, result = fut.result()
retain = self._completer.complete(node, event, result) retain = do_complete(node, event, result)
if isinstance(result, failure.Failure): if isinstance(result, failure.Failure):
if retain: if retain:
memory.failures.append(result) memory.failures.append(result)
@@ -196,7 +208,7 @@ class Runner(object):
memory.failures.append(failure.Failure()) memory.failures.append(failure.Failure())
else: else:
try: try:
more_nodes = self._analyzer.get_next_nodes(node) more_nodes = set(iter_next_nodes(target_node=node))
except Exception: except Exception:
memory.failures.append(failure.Failure()) memory.failures.append(failure.Failure())
else: else:

View File

@@ -23,6 +23,7 @@ from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import runner as ru from taskflow.engines.action_engine import runner as ru
from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc from taskflow.engines.action_engine import scopes as sc
from taskflow import flow as flow_type
from taskflow import states as st from taskflow import states as st
from taskflow import task from taskflow import task
from taskflow.utils import misc from taskflow.utils import misc
@@ -61,6 +62,7 @@ class Runtime(object):
'retry': self.retry_scheduler, 'retry': self.retry_scheduler,
'task': self.task_scheduler, 'task': self.task_scheduler,
} }
execution_graph = self._compilation.execution_graph
for atom in self.analyzer.iterate_all_nodes(): for atom in self.analyzer.iterate_all_nodes():
metadata = {} metadata = {}
walker = sc.ScopeWalker(self.compilation, atom, names_only=True) walker = sc.ScopeWalker(self.compilation, atom, names_only=True)
@@ -72,10 +74,20 @@ class Runtime(object):
check_transition_handler = st.check_retry_transition check_transition_handler = st.check_retry_transition
change_state_handler = change_state_handlers['retry'] change_state_handler = change_state_handlers['retry']
scheduler = schedulers['retry'] scheduler = schedulers['retry']
edge_deciders = {}
for previous_atom in execution_graph.predecessors(atom):
# If there is any link function that says if this connection
# is able to run (or should not) ensure we retain it and use
# it later as needed.
u_v_data = execution_graph.adj[previous_atom][atom]
u_v_decider = u_v_data.get(flow_type.LINK_DECIDER)
if u_v_decider is not None:
edge_deciders[previous_atom.name] = u_v_decider
metadata['scope_walker'] = walker metadata['scope_walker'] = walker
metadata['check_transition_handler'] = check_transition_handler metadata['check_transition_handler'] = check_transition_handler
metadata['change_state_handler'] = change_state_handler metadata['change_state_handler'] = change_state_handler
metadata['scheduler'] = scheduler metadata['scheduler'] = scheduler
metadata['edge_deciders'] = edge_deciders
self._atom_cache[atom.name] = metadata self._atom_cache[atom.name] = metadata
@property @property
@@ -130,6 +142,14 @@ class Runtime(object):
check_transition_handler = metadata['check_transition_handler'] check_transition_handler = metadata['check_transition_handler']
return check_transition_handler(current_state, target_state) return check_transition_handler(current_state, target_state)
def fetch_edge_deciders(self, atom):
"""Fetches the edge deciders for the given atom."""
# This does not check if the name exists (since this is only used
# internally to the engine, and is not exposed to atoms that will
# not exist and therefore doesn't need to handle that case).
metadata = self._atom_cache[atom.name]
return metadata['edge_deciders']
def fetch_scheduler(self, atom): def fetch_scheduler(self, atom):
"""Fetches the cached specific scheduler for the given atom.""" """Fetches the cached specific scheduler for the given atom."""
# This does not check if the name exists (since this is only used # This does not check if the name exists (since this is only used

View File

@@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
logging.basicConfig(level=logging.ERROR)
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
os.pardir,
os.pardir))
sys.path.insert(0, top_dir)
from taskflow import engines
from taskflow.patterns import graph_flow as gf
from taskflow.persistence import backends
from taskflow import task
from taskflow.utils import persistence_utils as pu
class DummyTask(task.Task):
def execute(self):
print("Running %s" % self.name)
def allow(history):
print(history)
return False
r = gf.Flow("root")
r_a = DummyTask('r-a')
r_b = DummyTask('r-b')
r.add(r_a, r_b)
r.link(r_a, r_b, decider=allow)
backend = backends.fetch({
'connection': 'memory://',
})
book, flow_detail = pu.temporary_flow_detail(backend=backend)
e = engines.load(r, flow_detail=flow_detail, book=book, backend=backend)
e.compile()
e.prepare()
e.run()
print("---------")
print("After run")
print("---------")
entries = [os.path.join(backend.memory.root_path, child)
for child in backend.memory.ls(backend.memory.root_path)]
while entries:
path = entries.pop()
value = backend.memory[path]
if value:
print("%s -> %s" % (path, value))
else:
print("%s" % (path))
entries.extend(os.path.join(path, child)
for child in backend.memory.ls(path))

View File

@@ -31,6 +31,9 @@ LINK_RETRY = 'retry'
# This key denotes the link was created due to symbol constraints and the # This key denotes the link was created due to symbol constraints and the
# value will be a set of names that the constraint ensures are satisfied. # value will be a set of names that the constraint ensures are satisfied.
LINK_REASONS = 'reasons' LINK_REASONS = 'reasons'
#
# This key denotes a callable that will determine if the target is visited.
LINK_DECIDER = 'decider'
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)

View File

@@ -16,6 +16,8 @@
import collections import collections
import six
from taskflow import exceptions as exc from taskflow import exceptions as exc
from taskflow import flow from taskflow import flow
from taskflow.types import graph as gr from taskflow.types import graph as gr
@@ -66,16 +68,20 @@ class Flow(flow.Flow):
#: Extracts the unsatisified symbol requirements of a single node. #: Extracts the unsatisified symbol requirements of a single node.
_unsatisfied_requires = staticmethod(_unsatisfied_requires) _unsatisfied_requires = staticmethod(_unsatisfied_requires)
def link(self, u, v): def link(self, u, v, decider=None):
"""Link existing node u as a runtime dependency of existing node v.""" """Link existing node u as a runtime dependency of existing node v."""
if not self._graph.has_node(u): if not self._graph.has_node(u):
raise ValueError("Node '%s' not found to link from" % (u)) raise ValueError("Node '%s' not found to link from" % (u))
if not self._graph.has_node(v): if not self._graph.has_node(v):
raise ValueError("Node '%s' not found to link to" % (v)) raise ValueError("Node '%s' not found to link to" % (v))
self._swap(self._link(u, v, manual=True)) if decider is not None:
if not six.callable(decider):
raise ValueError("Decider boolean callback must be callable")
self._swap(self._link(u, v, manual=True, decider=decider))
return self return self
def _link(self, u, v, graph=None, reason=None, manual=False): def _link(self, u, v, graph=None,
reason=None, manual=False, decider=None):
mutable_graph = True mutable_graph = True
if graph is None: if graph is None:
graph = self._graph graph = self._graph
@@ -85,6 +91,8 @@ class Flow(flow.Flow):
attrs = graph.get_edge_data(u, v) attrs = graph.get_edge_data(u, v)
if not attrs: if not attrs:
attrs = {} attrs = {}
if decider is not None:
attrs[flow.LINK_DECIDER] = decider
if manual: if manual:
attrs[flow.LINK_MANUAL] = True attrs[flow.LINK_MANUAL] = True
if reason is not None: if reason is not None:
@@ -281,9 +289,9 @@ class TargetedFlow(Flow):
self._subgraph = None self._subgraph = None
return self return self
def link(self, u, v): def link(self, u, v, decider=None):
"""Link existing node u as a runtime dependency of existing node v.""" """Link existing node u as a runtime dependency of existing node v."""
super(TargetedFlow, self).link(u, v) super(TargetedFlow, self).link(u, v, decider=decider)
# reset cached subgraph, in case it was affected # reset cached subgraph, in case it was affected
self._subgraph = None self._subgraph = None
return self return self

View File

@@ -40,10 +40,11 @@ REVERTING = REVERTING
SUCCESS = SUCCESS SUCCESS = SUCCESS
RUNNING = RUNNING RUNNING = RUNNING
RETRYING = 'RETRYING' RETRYING = 'RETRYING'
IGNORE = 'IGNORE'
# Atom intentions. # Atom intentions.
EXECUTE = 'EXECUTE' EXECUTE = 'EXECUTE'
IGNORE = 'IGNORE' IGNORE = IGNORE
REVERT = 'REVERT' REVERT = 'REVERT'
RETRY = 'RETRY' RETRY = 'RETRY'
INTENTIONS = (EXECUTE, IGNORE, REVERT, RETRY) INTENTIONS = (EXECUTE, IGNORE, REVERT, RETRY)
@@ -160,6 +161,7 @@ def check_flow_transition(old_state, new_state):
_ALLOWED_TASK_TRANSITIONS = frozenset(( _ALLOWED_TASK_TRANSITIONS = frozenset((
(PENDING, RUNNING), # run it! (PENDING, RUNNING), # run it!
(PENDING, IGNORE), # skip it!
(RUNNING, SUCCESS), # the task finished successfully (RUNNING, SUCCESS), # the task finished successfully
(RUNNING, FAILURE), # the task failed (RUNNING, FAILURE), # the task failed
@@ -171,6 +173,7 @@ _ALLOWED_TASK_TRANSITIONS = frozenset((
(REVERTING, FAILURE), # revert failed (REVERTING, FAILURE), # revert failed
(REVERTED, PENDING), # try again (REVERTED, PENDING), # try again
(IGNORE, PENDING), # try again
)) ))

View File

@@ -15,7 +15,9 @@
# under the License. # under the License.
import contextlib import contextlib
import functools
import six
import testtools import testtools
import taskflow.engines import taskflow.engines
@@ -772,6 +774,126 @@ class EngineMissingDepsTest(utils.EngineTestBase):
self.assertIsNotNone(c_e.cause) self.assertIsNotNone(c_e.cause)
class EngineGraphConditionalFlowTest(utils.EngineTestBase):
def test_graph_flow_conditional(self):
flow = gf.Flow('root')
task1 = utils.ProgressingTask(name='task1')
task2 = utils.ProgressingTask(name='task2')
task2_2 = utils.ProgressingTask(name='task2_2')
task3 = utils.ProgressingTask(name='task3')
flow.add(task1, task2, task2_2, task3)
flow.link(task1, task2, decider=lambda history: False)
flow.link(task2, task2_2)
flow.link(task1, task3, decider=lambda history: True)
engine = self._make_engine(flow)
with utils.CaptureListener(engine, capture_flow=False) as capturer:
engine.run()
expected = set([
'task1.t RUNNING',
'task1.t SUCCESS(5)',
'task2.t IGNORE',
'task2_2.t IGNORE',
'task3.t RUNNING',
'task3.t SUCCESS(5)',
])
self.assertEqual(expected, set(capturer.values))
def test_graph_flow_diamond_ignored(self):
flow = gf.Flow('root')
task1 = utils.ProgressingTask(name='task1')
task2 = utils.ProgressingTask(name='task2')
task3 = utils.ProgressingTask(name='task3')
task4 = utils.ProgressingTask(name='task4')
flow.add(task1, task2, task3, task4)
flow.link(task1, task2)
flow.link(task2, task4, decider=lambda history: False)
flow.link(task1, task3)
flow.link(task3, task4, decider=lambda history: True)
engine = self._make_engine(flow)
with utils.CaptureListener(engine, capture_flow=False) as capturer:
engine.run()
expected = set([
'task1.t RUNNING',
'task1.t SUCCESS(5)',
'task2.t RUNNING',
'task2.t SUCCESS(5)',
'task3.t RUNNING',
'task3.t SUCCESS(5)',
'task4.t IGNORE',
])
self.assertEqual(expected, set(capturer.values))
self.assertEqual(states.IGNORE,
engine.storage.get_atom_state('task4'))
self.assertEqual(states.IGNORE,
engine.storage.get_atom_intention('task4'))
def test_graph_flow_conditional_history(self):
def even_odd_decider(history, allowed):
total = sum(six.itervalues(history))
if total == allowed:
return True
return False
flow = gf.Flow('root')
task1 = utils.TaskMultiArgOneReturn(name='task1')
task2 = utils.ProgressingTask(name='task2')
task2_2 = utils.ProgressingTask(name='task2_2')
task3 = utils.ProgressingTask(name='task3')
task3_3 = utils.ProgressingTask(name='task3_3')
flow.add(task1, task2, task2_2, task3, task3_3)
flow.link(task1, task2,
decider=functools.partial(even_odd_decider, allowed=2))
flow.link(task2, task2_2)
flow.link(task1, task3,
decider=functools.partial(even_odd_decider, allowed=1))
flow.link(task3, task3_3)
engine = self._make_engine(flow)
engine.storage.inject({'x': 0, 'y': 1, 'z': 1})
with utils.CaptureListener(engine, capture_flow=False) as capturer:
engine.run()
expected = set([
'task1.t RUNNING', 'task1.t SUCCESS(2)',
'task3.t IGNORE', 'task3_3.t IGNORE',
'task2.t RUNNING', 'task2.t SUCCESS(5)',
'task2_2.t RUNNING', 'task2_2.t SUCCESS(5)',
])
self.assertEqual(expected, set(capturer.values))
engine = self._make_engine(flow)
engine.storage.inject({'x': 0, 'y': 0, 'z': 1})
with utils.CaptureListener(engine, capture_flow=False) as capturer:
engine.run()
expected = set([
'task1.t RUNNING', 'task1.t SUCCESS(1)',
'task2.t IGNORE', 'task2_2.t IGNORE',
'task3.t RUNNING', 'task3.t SUCCESS(5)',
'task3_3.t RUNNING', 'task3_3.t SUCCESS(5)',
])
self.assertEqual(expected, set(capturer.values))
class EngineCheckingTaskTest(utils.EngineTestBase): class EngineCheckingTaskTest(utils.EngineTestBase):
# FIXME: this test uses a inner class that workers/process engines can't # FIXME: this test uses a inner class that workers/process engines can't
# get to, so we need to do something better to make this test useful for # get to, so we need to do something better to make this test useful for
@@ -805,6 +927,7 @@ class SerialEngineTest(EngineTaskTest,
EngineOptionalRequirementsTest, EngineOptionalRequirementsTest,
EngineGraphFlowTest, EngineGraphFlowTest,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest, EngineCheckingTaskTest,
test.TestCase): test.TestCase):
def _make_engine(self, flow, def _make_engine(self, flow,
@@ -832,6 +955,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest,
EngineOptionalRequirementsTest, EngineOptionalRequirementsTest,
EngineGraphFlowTest, EngineGraphFlowTest,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest, EngineCheckingTaskTest,
test.TestCase): test.TestCase):
_EXECUTOR_WORKERS = 2 _EXECUTOR_WORKERS = 2
@@ -871,6 +995,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest,
EngineOptionalRequirementsTest, EngineOptionalRequirementsTest,
EngineGraphFlowTest, EngineGraphFlowTest,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
EngineCheckingTaskTest, EngineCheckingTaskTest,
test.TestCase): test.TestCase):
@@ -893,6 +1018,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
EngineOptionalRequirementsTest, EngineOptionalRequirementsTest,
EngineGraphFlowTest, EngineGraphFlowTest,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
test.TestCase): test.TestCase):
_EXECUTOR_WORKERS = 2 _EXECUTOR_WORKERS = 2
@@ -920,6 +1046,7 @@ class WorkerBasedEngineTest(EngineTaskTest,
EngineOptionalRequirementsTest, EngineOptionalRequirementsTest,
EngineGraphFlowTest, EngineGraphFlowTest,
EngineMissingDepsTest, EngineMissingDepsTest,
EngineGraphConditionalFlowTest,
test.TestCase): test.TestCase):
def setUp(self): def setUp(self):
super(WorkerBasedEngineTest, self).setUp() super(WorkerBasedEngineTest, self).setUp()

View File

@@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import mock
import optparse import optparse
import os import os
import sys import sys
@@ -37,10 +39,10 @@ from taskflow.types import fsm
# actually be running it...). # actually be running it...).
class DummyRuntime(object): class DummyRuntime(object):
def __init__(self): def __init__(self):
self.analyzer = None self.analyzer = mock.MagicMock()
self.completer = None self.completer = mock.MagicMock()
self.scheduler = None self.scheduler = mock.MagicMock()
self.storage = None self.storage = mock.MagicMock()
def clean_event(name): def clean_event(name):
@@ -130,7 +132,7 @@ def main():
list(states._ALLOWED_RETRY_TRANSITIONS)) list(states._ALLOWED_RETRY_TRANSITIONS))
elif options.engines: elif options.engines:
source_type = "Engines" source_type = "Engines"
r = runner.Runner(DummyRuntime(), None) r = runner.Runner(DummyRuntime(), mock.MagicMock())
source, memory = r.build() source, memory = r.build()
internal_states.extend(runner._META_STATES) internal_states.extend(runner._META_STATES)
ordering = 'out' ordering = 'out'