From 70e58977c944d23f2133cd1b3d6f3970f101e7ab Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 7 Aug 2015 14:46:24 -0700 Subject: [PATCH] Add atom priority ability In situations where many atoms can execute at the same time it is sometimes useful to denote that when this situation happens that certain atoms should execute/revert before other atoms (or at least an attempt should be made to do this) instead of being nearly arbitrary. This adds a priority class attribute to the atom class (which can be overridden or changed as needed) which is then used in the runtime state machine to sort on so that atoms with higher priority get submitted (and therefore executed/reverted) first. Closes-Bug: #1507755 Change-Id: I3dcc705959085cba167883c85278e394b5cb1d2b --- taskflow/atom.py | 27 +++++++++++++++++++++++ taskflow/engines/action_engine/builder.py | 7 +++++- taskflow/tests/unit/test_engines.py | 26 ++++++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/taskflow/atom.py b/taskflow/atom.py index fba04a975..a5ff4bc11 100644 --- a/taskflow/atom.py +++ b/taskflow/atom.py @@ -194,6 +194,33 @@ class Atom(object): this atom produces. """ + priority = 0 + """A numeric priority that instances of this class will have when running, + used when there are multiple *parallel* candidates to execute and/or + revert. During this situation the candidate list will be stably sorted + based on this priority attribute which will result in atoms with higher + priorities executing (or reverting) before atoms with lower + priorities (higher being defined as a number bigger, or greater tha + an atom with a lower priority number). By default all atoms have the same + priority (zero). + + For example when the following is combined into a + graph (where each node in the denoted graph is some task):: + + a -> b + b -> c + b -> e + b -> f + + When ``b`` finishes there will then be three candidates that can run + ``(c, e, f)`` and they may run in any order. What this priority does is + sort those three by their priority before submitting them to be + worked on (so that instead of say a random run order they will now be + ran by there sorted order). This is also true when reverting (in that the + sort order of the potential nodes will be used to determine the + submission order). + """ + def __init__(self, name=None, provides=None, inject=None): self.name = name self.version = (1, 0) diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 9013cd8a9..4ef658a3a 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -108,9 +108,14 @@ class MachineBuilder(object): timeout = WAITING_TIMEOUT # Cache some local functions/methods... - do_schedule = self._scheduler.schedule do_complete = self._completer.complete + def do_schedule(next_nodes): + return self._scheduler.schedule( + sorted(next_nodes, + key=lambda node: getattr(node, 'priority', 0), + reverse=True)) + def is_runnable(): # Checks if the storage says the flow is still runnable... return self._storage.get_flow_state() == st.RUNNING diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index c56d75692..5f5b2ad68 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -461,6 +461,32 @@ class EngineParallelFlowTest(utils.EngineTestBase): engine = self._make_engine(flow) self.assertRaises(exc.Empty, engine.run) + def test_parallel_flow_with_priority(self): + flow = uf.Flow('p-1') + for i in range(0, 10): + t = utils.ProgressingTask(name='task%s' % i) + t.priority = i + flow.add(t) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = [ + 'task9.t RUNNING', + 'task8.t RUNNING', + 'task7.t RUNNING', + 'task6.t RUNNING', + 'task5.t RUNNING', + 'task4.t RUNNING', + 'task3.t RUNNING', + 'task2.t RUNNING', + 'task1.t RUNNING', + 'task0.t RUNNING', + ] + # NOTE(harlowja): chop off the gathering of SUCCESS states, since we + # don't care if thats in order... + gotten = capturer.values[0:10] + self.assertEqual(expected, gotten) + def test_parallel_flow_one_task(self): flow = uf.Flow('p-1').add( utils.ProgressingTask(name='task1', provides='a')