diff --git a/taskflow/atom.py b/taskflow/atom.py index fba04a975..a5ff4bc11 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -194,6 +194,33 @@ class Atom(object): this atom produces. """ + priority = 0 + """A numeric priority that instances of this class will have when running, + used when there are multiple *parallel* candidates to execute and/or + revert. During this situation the candidate list will be stably sorted + based on this priority attribute which will result in atoms with higher + priorities executing (or reverting) before atoms with lower + priorities (higher being defined as a number bigger, or greater tha + an atom with a lower priority number). By default all atoms have the same + priority (zero). + + For example when the following is combined into a + graph (where each node in the denoted graph is some task):: + + a -> b + b -> c + b -> e + b -> f + + When ``b`` finishes there will then be three candidates that can run + ``(c, e, f)`` and they may run in any order. What this priority does is + sort those three by their priority before submitting them to be + worked on (so that instead of say a random run order they will now be + ran by there sorted order). This is also true when reverting (in that the + sort order of the potential nodes will be used to determine the + submission order). + """ + def __init__(self, name=None, provides=None, inject=None): self.name = name self.version = (1, 0) diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 9013cd8a9..4ef658a3a 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -108,9 +108,14 @@ class MachineBuilder(object): timeout = WAITING_TIMEOUT # Cache some local functions/methods... - do_schedule = self._scheduler.schedule do_complete = self._completer.complete + def do_schedule(next_nodes): + return self._scheduler.schedule( + sorted(next_nodes, + key=lambda node: getattr(node, 'priority', 0), + reverse=True)) + def is_runnable(): # Checks if the storage says the flow is still runnable... return self._storage.get_flow_state() == st.RUNNING diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index c56d75692..5f5b2ad68 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -461,6 +461,32 @@ class EngineParallelFlowTest(utils.EngineTestBase): engine = self._make_engine(flow) self.assertRaises(exc.Empty, engine.run) + def test_parallel_flow_with_priority(self): + flow = uf.Flow('p-1') + for i in range(0, 10): + t = utils.ProgressingTask(name='task%s' % i) + t.priority = i + flow.add(t) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = [ + 'task9.t RUNNING', + 'task8.t RUNNING', + 'task7.t RUNNING', + 'task6.t RUNNING', + 'task5.t RUNNING', + 'task4.t RUNNING', + 'task3.t RUNNING', + 'task2.t RUNNING', + 'task1.t RUNNING', + 'task0.t RUNNING', + ] + # NOTE(harlowja): chop off the gathering of SUCCESS states, since we + # don't care if thats in order... + gotten = capturer.values[0:10] + self.assertEqual(expected, gotten) + def test_parallel_flow_one_task(self): flow = uf.Flow('p-1').add( utils.ProgressingTask(name='task1', provides='a')