Add atom priority ability
In situations where many atoms can execute at the same time it is sometimes useful to denote that when this situation happens that certain atoms should execute/revert before other atoms (or at least an attempt should be made to do this) instead of being nearly arbitrary. This adds a priority class attribute to the atom class (which can be overridden or changed as needed) which is then used in the runtime state machine to sort on so that atoms with higher priority get submitted (and therefore executed/reverted) first. Closes-Bug: #1507755 Change-Id: I3dcc705959085cba167883c85278e394b5cb1d2b
This commit is contained in:
parent
4677fdb30e
commit
70e58977c9
@ -194,6 +194,33 @@ class Atom(object):
|
||||
this atom produces.
|
||||
"""
|
||||
|
||||
priority = 0
|
||||
"""A numeric priority that instances of this class will have when running,
|
||||
used when there are multiple *parallel* candidates to execute and/or
|
||||
revert. During this situation the candidate list will be stably sorted
|
||||
based on this priority attribute which will result in atoms with higher
|
||||
priorities executing (or reverting) before atoms with lower
|
||||
priorities (higher being defined as a number bigger, or greater tha
|
||||
an atom with a lower priority number). By default all atoms have the same
|
||||
priority (zero).
|
||||
|
||||
For example when the following is combined into a
|
||||
graph (where each node in the denoted graph is some task)::
|
||||
|
||||
a -> b
|
||||
b -> c
|
||||
b -> e
|
||||
b -> f
|
||||
|
||||
When ``b`` finishes there will then be three candidates that can run
|
||||
``(c, e, f)`` and they may run in any order. What this priority does is
|
||||
sort those three by their priority before submitting them to be
|
||||
worked on (so that instead of say a random run order they will now be
|
||||
ran by there sorted order). This is also true when reverting (in that the
|
||||
sort order of the potential nodes will be used to determine the
|
||||
submission order).
|
||||
"""
|
||||
|
||||
def __init__(self, name=None, provides=None, inject=None):
|
||||
self.name = name
|
||||
self.version = (1, 0)
|
||||
|
@ -108,9 +108,14 @@ class MachineBuilder(object):
|
||||
timeout = WAITING_TIMEOUT
|
||||
|
||||
# Cache some local functions/methods...
|
||||
do_schedule = self._scheduler.schedule
|
||||
do_complete = self._completer.complete
|
||||
|
||||
def do_schedule(next_nodes):
|
||||
return self._scheduler.schedule(
|
||||
sorted(next_nodes,
|
||||
key=lambda node: getattr(node, 'priority', 0),
|
||||
reverse=True))
|
||||
|
||||
def is_runnable():
|
||||
# Checks if the storage says the flow is still runnable...
|
||||
return self._storage.get_flow_state() == st.RUNNING
|
||||
|
@ -461,6 +461,32 @@ class EngineParallelFlowTest(utils.EngineTestBase):
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaises(exc.Empty, engine.run)
|
||||
|
||||
def test_parallel_flow_with_priority(self):
|
||||
flow = uf.Flow('p-1')
|
||||
for i in range(0, 10):
|
||||
t = utils.ProgressingTask(name='task%s' % i)
|
||||
t.priority = i
|
||||
flow.add(t)
|
||||
engine = self._make_engine(flow)
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
expected = [
|
||||
'task9.t RUNNING',
|
||||
'task8.t RUNNING',
|
||||
'task7.t RUNNING',
|
||||
'task6.t RUNNING',
|
||||
'task5.t RUNNING',
|
||||
'task4.t RUNNING',
|
||||
'task3.t RUNNING',
|
||||
'task2.t RUNNING',
|
||||
'task1.t RUNNING',
|
||||
'task0.t RUNNING',
|
||||
]
|
||||
# NOTE(harlowja): chop off the gathering of SUCCESS states, since we
|
||||
# don't care if thats in order...
|
||||
gotten = capturer.values[0:10]
|
||||
self.assertEqual(expected, gotten)
|
||||
|
||||
def test_parallel_flow_one_task(self):
|
||||
flow = uf.Flow('p-1').add(
|
||||
utils.ProgressingTask(name='task1', provides='a')
|
||||
|
Loading…
Reference in New Issue
Block a user