diff --git a/doc/source/img/retry_states.svg b/doc/source/img/retry_states.svg index 1a25bda3..d6801b19 100644 --- a/doc/source/img/retry_states.svg +++ b/doc/source/img/retry_states.svg @@ -3,6 +3,6 @@ - -Retries statesPENDINGRUNNINGSUCCESSFAILURERETRYINGREVERTINGREVERTEDstart + +Retries statesPENDINGIGNORERUNNINGSUCCESSFAILURERETRYINGREVERTINGREVERTEDstart diff --git a/doc/source/img/task_states.svg b/doc/source/img/task_states.svg index dbb48c60..9c27c843 100644 --- a/doc/source/img/task_states.svg +++ b/doc/source/img/task_states.svg @@ -3,6 +3,6 @@ - -Tasks statesPENDINGRUNNINGSUCCESSFAILUREREVERTINGREVERTEDstart + +Tasks statesPENDINGIGNORERUNNINGFAILURESUCCESSREVERTINGREVERTEDstart diff --git a/doc/source/states.rst b/doc/source/states.rst index 36035480..01e9da59 100644 --- a/doc/source/states.rst +++ b/doc/source/states.rst @@ -124,6 +124,11 @@ or if needed will wait for all of the atoms it depends on to complete. An engine running a task also transitions the task to the ``PENDING`` state after it was reverted and its containing flow was restarted or retried. + +**IGNORE** - When a conditional decision has been made to skip (not +execute) the task the engine will transition the task to +the ``IGNORE`` state. + **RUNNING** - When an engine running the task starts to execute the task, the engine will transition the task to the ``RUNNING`` state, and the task will stay in this state until the tasks :py:meth:`~taskflow.task.BaseTask.execute` @@ -171,6 +176,10 @@ flow that the retry is associated with by consulting its An engine running a retry also transitions the retry to the ``PENDING`` state after it was reverted and its associated flow was restarted or retried. +**IGNORE** - When a conditional decision has been made to skip (not +execute) the retry the engine will transition the retry to +the ``IGNORE`` state. + **RUNNING** - When an engine starts to execute the retry, the engine transitions the retry to the ``RUNNING`` state, and the retry stays in this state until its :py:meth:`~taskflow.retry.Retry.execute` method returns. diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index bef7b8b5..909d6751 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -14,6 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +import itertools + from networkx.algorithms import traversal import six @@ -21,6 +23,60 @@ from taskflow import retry as retry_atom from taskflow import states as st +class IgnoreDecider(object): + """Checks any provided edge-deciders and determines if ok to run.""" + + def __init__(self, atom, edge_deciders): + self._atom = atom + self._edge_deciders = edge_deciders + + def check(self, runtime): + """Returns bool of whether this decider should allow running.""" + results = {} + for name in six.iterkeys(self._edge_deciders): + results[name] = runtime.storage.get(name) + for local_decider in six.itervalues(self._edge_deciders): + if not local_decider(history=results): + return False + return True + + def affect(self, runtime): + """If the :py:func:`~.check` returns false, affects associated atoms. + + This will alter the associated atom + successor atoms by setting there + state to ``IGNORE`` so that they are ignored in future runtime + activities. + """ + successors_iter = runtime.analyzer.iterate_subgraph(self._atom) + runtime.reset_nodes(itertools.chain([self._atom], successors_iter), + state=st.IGNORE, intention=st.IGNORE) + + def check_and_affect(self, runtime): + """Handles :py:func:`~.check` + :py:func:`~.affect` in right order.""" + proceed = self.check(runtime) + if not proceed: + self.affect(runtime) + return proceed + + +class NoOpDecider(object): + """No-op decider that says it is always ok to run & has no effect(s).""" + + def check(self, runtime): + """Always good to go.""" + return True + + def affect(self, runtime): + """Does nothing.""" + + def check_and_affect(self, runtime): + """Handles :py:func:`~.check` + :py:func:`~.affect` in right order. + + Does nothing. + """ + return self.check(runtime) + + class Analyzer(object): """Analyzes a compilation and aids in execution processes. @@ -35,18 +91,21 @@ class Analyzer(object): self._storage = runtime.storage self._execution_graph = runtime.compilation.execution_graph self._check_atom_transition = runtime.check_atom_transition + self._fetch_edge_deciders = runtime.fetch_edge_deciders def get_next_nodes(self, node=None): + """Get next nodes to run (originating from node or all nodes).""" if node is None: execute = self.browse_nodes_for_execute() revert = self.browse_nodes_for_revert() return execute + revert - state = self.get_state(node) intention = self._storage.get_atom_intention(node.name) if state == st.SUCCESS: if intention == st.REVERT: - return [node] + return [ + (node, NoOpDecider()), + ] elif intention == st.EXECUTE: return self.browse_nodes_for_execute(node) else: @@ -61,70 +120,86 @@ class Analyzer(object): def browse_nodes_for_execute(self, node=None): """Browse next nodes to execute. - This returns a collection of nodes that are ready to be executed, if - given a specific node it will only examine the successors of that node, - otherwise it will examine the whole graph. + This returns a collection of nodes that *may* be ready to be + executed, if given a specific node it will only examine the successors + of that node, otherwise it will examine the whole graph. """ - if node: + if node is not None: nodes = self._execution_graph.successors(node) else: nodes = self._execution_graph.nodes_iter() - - available_nodes = [] + ready_nodes = [] for node in nodes: - if self._is_ready_for_execute(node): - available_nodes.append(node) - return available_nodes + is_ready, late_decider = self._get_maybe_ready_for_execute(node) + if is_ready: + ready_nodes.append((node, late_decider)) + return ready_nodes def browse_nodes_for_revert(self, node=None): """Browse next nodes to revert. - This returns a collection of nodes that are ready to be be reverted, if - given a specific node it will only examine the predecessors of that - node, otherwise it will examine the whole graph. + This returns a collection of nodes that *may* be ready to be be + reverted, if given a specific node it will only examine the + predecessors of that node, otherwise it will examine the whole + graph. """ - if node: + if node is not None: nodes = self._execution_graph.predecessors(node) else: nodes = self._execution_graph.nodes_iter() - - available_nodes = [] + ready_nodes = [] for node in nodes: - if self._is_ready_for_revert(node): - available_nodes.append(node) - return available_nodes + is_ready, late_decider = self._get_maybe_ready_for_revert(node) + if is_ready: + ready_nodes.append((node, late_decider)) + return ready_nodes + + def _get_maybe_ready_for_execute(self, atom): + """Returns if an atom is *likely* ready to be executed.""" - def _is_ready_for_execute(self, atom): - """Checks if atom is ready to be executed.""" state = self.get_state(atom) intention = self._storage.get_atom_intention(atom.name) transition = self._check_atom_transition(atom, state, st.RUNNING) if not transition or intention != st.EXECUTE: - return False + return (False, None) - atom_names = [] - for prev_atom in self._execution_graph.predecessors(atom): - atom_names.append(prev_atom.name) + predecessor_names = [] + for previous_atom in self._execution_graph.predecessors(atom): + predecessor_names.append(previous_atom.name) - atom_states = self._storage.get_atoms_states(atom_names) - return all(state == st.SUCCESS and intention == st.EXECUTE - for state, intention in six.itervalues(atom_states)) + predecessor_states = self._storage.get_atoms_states(predecessor_names) + predecessor_states_iter = six.itervalues(predecessor_states) + ok_to_run = all(state == st.SUCCESS and intention == st.EXECUTE + for state, intention in predecessor_states_iter) + + if not ok_to_run: + return (False, None) + else: + edge_deciders = self._fetch_edge_deciders(atom) + return (True, IgnoreDecider(atom, edge_deciders)) + + def _get_maybe_ready_for_revert(self, atom): + """Returns if an atom is *likely* ready to be reverted.""" - def _is_ready_for_revert(self, atom): - """Checks if atom is ready to be reverted.""" state = self.get_state(atom) intention = self._storage.get_atom_intention(atom.name) transition = self._check_atom_transition(atom, state, st.REVERTING) if not transition or intention not in (st.REVERT, st.RETRY): - return False + return (False, None) - atom_names = [] - for prev_atom in self._execution_graph.successors(atom): - atom_names.append(prev_atom.name) + predecessor_names = [] + for previous_atom in self._execution_graph.successors(atom): + predecessor_names.append(previous_atom.name) - atom_states = self._storage.get_atoms_states(atom_names) - return all(state in (st.PENDING, st.REVERTED) - for state, intention in six.itervalues(atom_states)) + predecessor_states = self._storage.get_atoms_states(predecessor_names) + predecessor_states_iter = six.itervalues(predecessor_states) + ok_to_run = all(state in (st.PENDING, st.REVERTED) + for state, intention in predecessor_states_iter) + + if not ok_to_run: + return (False, None) + else: + return (True, NoOpDecider()) def iterate_subgraph(self, atom): """Iterates a subgraph connected to given atom.""" @@ -142,17 +217,24 @@ class Analyzer(object): yield node def iterate_all_nodes(self): + """Yields back all nodes in the execution graph.""" for node in self._execution_graph.nodes_iter(): yield node def find_atom_retry(self, atom): + """Returns the retry atom associated to the given atom (or none).""" return self._execution_graph.node[atom].get('retry') def is_success(self): + """Checks if all nodes in the execution graph are in 'happy' state.""" for atom in self.iterate_all_nodes(): - if self.get_state(atom) != st.SUCCESS: + atom_state = self.get_state(atom) + if atom_state == st.IGNORE: + continue + if atom_state != st.SUCCESS: return False return True def get_state(self, atom): + """Gets the state of a given atom (from the backend storage unit).""" return self._storage.get_atom_state(atom.name) diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py index 8d637c1c..9b6043a3 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/runner.py @@ -94,6 +94,7 @@ class Runner(object): ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) def __init__(self, runtime, waiter): + self._runtime = runtime self._analyzer = runtime.analyzer self._completer = runtime.completer self._scheduler = runtime.scheduler @@ -111,13 +112,26 @@ class Runner(object): if timeout is None: timeout = _WAITING_TIMEOUT + # Cache some local functions/methods... + do_schedule = self._scheduler.schedule + wait_for_any = self._waiter.wait_for_any + do_complete = self._completer.complete + + def iter_next_nodes(target_node=None): + # Yields and filters and tweaks the next nodes to execute... + maybe_nodes = self._analyzer.get_next_nodes(node=target_node) + for node, late_decider in maybe_nodes: + proceed = late_decider.check_and_affect(self._runtime) + if proceed: + yield node + def resume(old_state, new_state, event): # This reaction function just updates the state machines memory # to include any nodes that need to be executed (from a previous # attempt, which may be empty if never ran before) and any nodes # that are now ready to be ran. memory.next_nodes.update(self._completer.resume()) - memory.next_nodes.update(self._analyzer.get_next_nodes()) + memory.next_nodes.update(iter_next_nodes()) return _SCHEDULE def game_over(old_state, new_state, event): @@ -127,7 +141,7 @@ class Runner(object): # it is *always* called before the final state is entered. if memory.failures: return _FAILED - if self._analyzer.get_next_nodes(): + if any(1 for node in iter_next_nodes()): return _SUSPENDED elif self._analyzer.is_success(): return _SUCCESS @@ -141,8 +155,7 @@ class Runner(object): # that holds this information to stop or suspend); handles failures # that occur during this process safely... if self.runnable() and memory.next_nodes: - not_done, failures = self._scheduler.schedule( - memory.next_nodes) + not_done, failures = do_schedule(memory.next_nodes) if not_done: memory.not_done.update(not_done) if failures: @@ -155,8 +168,7 @@ class Runner(object): # call sometime in the future, or equivalent that will work in # py2 and py3. if memory.not_done: - done, not_done = self._waiter.wait_for_any(memory.not_done, - timeout) + done, not_done = wait_for_any(memory.not_done, timeout) memory.done.update(done) memory.not_done = not_done return _ANALYZE @@ -173,7 +185,7 @@ class Runner(object): node = fut.atom try: event, result = fut.result() - retain = self._completer.complete(node, event, result) + retain = do_complete(node, event, result) if isinstance(result, failure.Failure): if retain: memory.failures.append(result) @@ -196,7 +208,7 @@ class Runner(object): memory.failures.append(failure.Failure()) else: try: - more_nodes = self._analyzer.get_next_nodes(node) + more_nodes = set(iter_next_nodes(target_node=node)) except Exception: memory.failures.append(failure.Failure()) else: diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 0439da17..0cf303c6 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -23,6 +23,7 @@ from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import runner as ru from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc +from taskflow import flow as flow_type from taskflow import states as st from taskflow import task from taskflow.utils import misc @@ -61,6 +62,7 @@ class Runtime(object): 'retry': self.retry_scheduler, 'task': self.task_scheduler, } + execution_graph = self._compilation.execution_graph for atom in self.analyzer.iterate_all_nodes(): metadata = {} walker = sc.ScopeWalker(self.compilation, atom, names_only=True) @@ -72,10 +74,20 @@ class Runtime(object): check_transition_handler = st.check_retry_transition change_state_handler = change_state_handlers['retry'] scheduler = schedulers['retry'] + edge_deciders = {} + for previous_atom in execution_graph.predecessors(atom): + # If there is any link function that says if this connection + # is able to run (or should not) ensure we retain it and use + # it later as needed. + u_v_data = execution_graph.adj[previous_atom][atom] + u_v_decider = u_v_data.get(flow_type.LINK_DECIDER) + if u_v_decider is not None: + edge_deciders[previous_atom.name] = u_v_decider metadata['scope_walker'] = walker metadata['check_transition_handler'] = check_transition_handler metadata['change_state_handler'] = change_state_handler metadata['scheduler'] = scheduler + metadata['edge_deciders'] = edge_deciders self._atom_cache[atom.name] = metadata @property @@ -130,6 +142,14 @@ class Runtime(object): check_transition_handler = metadata['check_transition_handler'] return check_transition_handler(current_state, target_state) + def fetch_edge_deciders(self, atom): + """Fetches the edge deciders for the given atom.""" + # This does not check if the name exists (since this is only used + # internally to the engine, and is not exposed to atoms that will + # not exist and therefore doesn't need to handle that case). + metadata = self._atom_cache[atom.name] + return metadata['edge_deciders'] + def fetch_scheduler(self, atom): """Fetches the cached specific scheduler for the given atom.""" # This does not check if the name exists (since this is only used diff --git a/taskflow/examples/switch_graph_flow.py b/taskflow/examples/switch_graph_flow.py new file mode 100644 index 00000000..273763cd --- /dev/null +++ b/taskflow/examples/switch_graph_flow.py @@ -0,0 +1,75 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import os +import sys + +logging.basicConfig(level=logging.ERROR) + +top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), + os.pardir, + os.pardir)) +sys.path.insert(0, top_dir) + +from taskflow import engines +from taskflow.patterns import graph_flow as gf +from taskflow.persistence import backends +from taskflow import task +from taskflow.utils import persistence_utils as pu + + +class DummyTask(task.Task): + def execute(self): + print("Running %s" % self.name) + + +def allow(history): + print(history) + return False + + +r = gf.Flow("root") +r_a = DummyTask('r-a') +r_b = DummyTask('r-b') +r.add(r_a, r_b) +r.link(r_a, r_b, decider=allow) + +backend = backends.fetch({ + 'connection': 'memory://', +}) +book, flow_detail = pu.temporary_flow_detail(backend=backend) + +e = engines.load(r, flow_detail=flow_detail, book=book, backend=backend) +e.compile() +e.prepare() +e.run() + + +print("---------") +print("After run") +print("---------") +entries = [os.path.join(backend.memory.root_path, child) + for child in backend.memory.ls(backend.memory.root_path)] +while entries: + path = entries.pop() + value = backend.memory[path] + if value: + print("%s -> %s" % (path, value)) + else: + print("%s" % (path)) + entries.extend(os.path.join(path, child) + for child in backend.memory.ls(path)) diff --git a/taskflow/flow.py b/taskflow/flow.py index 5eb05825..56786d4d 100644 --- a/taskflow/flow.py +++ b/taskflow/flow.py @@ -31,6 +31,9 @@ LINK_RETRY = 'retry' # This key denotes the link was created due to symbol constraints and the # value will be a set of names that the constraint ensures are satisfied. LINK_REASONS = 'reasons' +# +# This key denotes a callable that will determine if the target is visited. +LINK_DECIDER = 'decider' @six.add_metaclass(abc.ABCMeta) diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 7d407c8c..50a4d61d 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -16,6 +16,8 @@ import collections +import six + from taskflow import exceptions as exc from taskflow import flow from taskflow.types import graph as gr @@ -66,16 +68,20 @@ class Flow(flow.Flow): #: Extracts the unsatisified symbol requirements of a single node. _unsatisfied_requires = staticmethod(_unsatisfied_requires) - def link(self, u, v): + def link(self, u, v, decider=None): """Link existing node u as a runtime dependency of existing node v.""" if not self._graph.has_node(u): raise ValueError("Node '%s' not found to link from" % (u)) if not self._graph.has_node(v): raise ValueError("Node '%s' not found to link to" % (v)) - self._swap(self._link(u, v, manual=True)) + if decider is not None: + if not six.callable(decider): + raise ValueError("Decider boolean callback must be callable") + self._swap(self._link(u, v, manual=True, decider=decider)) return self - def _link(self, u, v, graph=None, reason=None, manual=False): + def _link(self, u, v, graph=None, + reason=None, manual=False, decider=None): mutable_graph = True if graph is None: graph = self._graph @@ -85,6 +91,8 @@ class Flow(flow.Flow): attrs = graph.get_edge_data(u, v) if not attrs: attrs = {} + if decider is not None: + attrs[flow.LINK_DECIDER] = decider if manual: attrs[flow.LINK_MANUAL] = True if reason is not None: @@ -281,9 +289,9 @@ class TargetedFlow(Flow): self._subgraph = None return self - def link(self, u, v): + def link(self, u, v, decider=None): """Link existing node u as a runtime dependency of existing node v.""" - super(TargetedFlow, self).link(u, v) + super(TargetedFlow, self).link(u, v, decider=decider) # reset cached subgraph, in case it was affected self._subgraph = None return self diff --git a/taskflow/states.py b/taskflow/states.py index 265d6b2c..cbef58c7 100644 --- a/taskflow/states.py +++ b/taskflow/states.py @@ -40,10 +40,11 @@ REVERTING = REVERTING SUCCESS = SUCCESS RUNNING = RUNNING RETRYING = 'RETRYING' +IGNORE = 'IGNORE' # Atom intentions. EXECUTE = 'EXECUTE' -IGNORE = 'IGNORE' +IGNORE = IGNORE REVERT = 'REVERT' RETRY = 'RETRY' INTENTIONS = (EXECUTE, IGNORE, REVERT, RETRY) @@ -160,6 +161,7 @@ def check_flow_transition(old_state, new_state): _ALLOWED_TASK_TRANSITIONS = frozenset(( (PENDING, RUNNING), # run it! + (PENDING, IGNORE), # skip it! (RUNNING, SUCCESS), # the task finished successfully (RUNNING, FAILURE), # the task failed @@ -171,6 +173,7 @@ _ALLOWED_TASK_TRANSITIONS = frozenset(( (REVERTING, FAILURE), # revert failed (REVERTED, PENDING), # try again + (IGNORE, PENDING), # try again )) diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index 4e38dfa5..ed073e6a 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -15,7 +15,9 @@ # under the License. import contextlib +import functools +import six import testtools import taskflow.engines @@ -772,6 +774,126 @@ class EngineMissingDepsTest(utils.EngineTestBase): self.assertIsNotNone(c_e.cause) +class EngineGraphConditionalFlowTest(utils.EngineTestBase): + + def test_graph_flow_conditional(self): + flow = gf.Flow('root') + + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task2_2 = utils.ProgressingTask(name='task2_2') + task3 = utils.ProgressingTask(name='task3') + + flow.add(task1, task2, task2_2, task3) + flow.link(task1, task2, decider=lambda history: False) + flow.link(task2, task2_2) + flow.link(task1, task3, decider=lambda history: True) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t IGNORE', + 'task2_2.t IGNORE', + + 'task3.t RUNNING', + 'task3.t SUCCESS(5)', + ]) + self.assertEqual(expected, set(capturer.values)) + + def test_graph_flow_diamond_ignored(self): + flow = gf.Flow('root') + + task1 = utils.ProgressingTask(name='task1') + task2 = utils.ProgressingTask(name='task2') + task3 = utils.ProgressingTask(name='task3') + task4 = utils.ProgressingTask(name='task4') + + flow.add(task1, task2, task3, task4) + flow.link(task1, task2) + flow.link(task2, task4, decider=lambda history: False) + flow.link(task1, task3) + flow.link(task3, task4, decider=lambda history: True) + + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + + 'task2.t RUNNING', + 'task2.t SUCCESS(5)', + + 'task3.t RUNNING', + 'task3.t SUCCESS(5)', + + 'task4.t IGNORE', + ]) + self.assertEqual(expected, set(capturer.values)) + self.assertEqual(states.IGNORE, + engine.storage.get_atom_state('task4')) + self.assertEqual(states.IGNORE, + engine.storage.get_atom_intention('task4')) + + def test_graph_flow_conditional_history(self): + + def even_odd_decider(history, allowed): + total = sum(six.itervalues(history)) + if total == allowed: + return True + return False + + flow = gf.Flow('root') + + task1 = utils.TaskMultiArgOneReturn(name='task1') + task2 = utils.ProgressingTask(name='task2') + task2_2 = utils.ProgressingTask(name='task2_2') + task3 = utils.ProgressingTask(name='task3') + task3_3 = utils.ProgressingTask(name='task3_3') + + flow.add(task1, task2, task2_2, task3, task3_3) + flow.link(task1, task2, + decider=functools.partial(even_odd_decider, allowed=2)) + flow.link(task2, task2_2) + + flow.link(task1, task3, + decider=functools.partial(even_odd_decider, allowed=1)) + flow.link(task3, task3_3) + + engine = self._make_engine(flow) + engine.storage.inject({'x': 0, 'y': 1, 'z': 1}) + + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', 'task1.t SUCCESS(2)', + 'task3.t IGNORE', 'task3_3.t IGNORE', + 'task2.t RUNNING', 'task2.t SUCCESS(5)', + 'task2_2.t RUNNING', 'task2_2.t SUCCESS(5)', + ]) + self.assertEqual(expected, set(capturer.values)) + + engine = self._make_engine(flow) + engine.storage.inject({'x': 0, 'y': 0, 'z': 1}) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + + expected = set([ + 'task1.t RUNNING', 'task1.t SUCCESS(1)', + 'task2.t IGNORE', 'task2_2.t IGNORE', + 'task3.t RUNNING', 'task3.t SUCCESS(5)', + 'task3_3.t RUNNING', 'task3_3.t SUCCESS(5)', + ]) + self.assertEqual(expected, set(capturer.values)) + + class EngineCheckingTaskTest(utils.EngineTestBase): # FIXME: this test uses a inner class that workers/process engines can't # get to, so we need to do something better to make this test useful for @@ -805,6 +927,7 @@ class SerialEngineTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, EngineCheckingTaskTest, test.TestCase): def _make_engine(self, flow, @@ -832,6 +955,7 @@ class ParallelEngineWithThreadsTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, EngineCheckingTaskTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -871,6 +995,7 @@ class ParallelEngineWithEventletTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, EngineCheckingTaskTest, test.TestCase): @@ -893,6 +1018,7 @@ class ParallelEngineWithProcessTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, test.TestCase): _EXECUTOR_WORKERS = 2 @@ -920,6 +1046,7 @@ class WorkerBasedEngineTest(EngineTaskTest, EngineOptionalRequirementsTest, EngineGraphFlowTest, EngineMissingDepsTest, + EngineGraphConditionalFlowTest, test.TestCase): def setUp(self): super(WorkerBasedEngineTest, self).setUp() diff --git a/tools/state_graph.py b/tools/state_graph.py index 997920da..c37cd703 100755 --- a/tools/state_graph.py +++ b/tools/state_graph.py @@ -14,6 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +import mock + import optparse import os import sys @@ -37,10 +39,10 @@ from taskflow.types import fsm # actually be running it...). class DummyRuntime(object): def __init__(self): - self.analyzer = None - self.completer = None - self.scheduler = None - self.storage = None + self.analyzer = mock.MagicMock() + self.completer = mock.MagicMock() + self.scheduler = mock.MagicMock() + self.storage = mock.MagicMock() def clean_event(name): @@ -130,7 +132,7 @@ def main(): list(states._ALLOWED_RETRY_TRANSITIONS)) elif options.engines: source_type = "Engines" - r = runner.Runner(DummyRuntime(), None) + r = runner.Runner(DummyRuntime(), mock.MagicMock()) source, memory = r.build() internal_states.extend(runner._META_STATES) ordering = 'out'