Merge "Add atom priority ability"
This commit is contained in:
@@ -194,6 +194,33 @@ class Atom(object):
|
||||
this atom produces.
|
||||
"""
|
||||
|
||||
priority = 0
|
||||
"""A numeric priority that instances of this class will have when running,
|
||||
used when there are multiple *parallel* candidates to execute and/or
|
||||
revert. During this situation the candidate list will be stably sorted
|
||||
based on this priority attribute which will result in atoms with higher
|
||||
priorities executing (or reverting) before atoms with lower
|
||||
priorities (higher being defined as a number bigger, or greater tha
|
||||
an atom with a lower priority number). By default all atoms have the same
|
||||
priority (zero).
|
||||
|
||||
For example when the following is combined into a
|
||||
graph (where each node in the denoted graph is some task)::
|
||||
|
||||
a -> b
|
||||
b -> c
|
||||
b -> e
|
||||
b -> f
|
||||
|
||||
When ``b`` finishes there will then be three candidates that can run
|
||||
``(c, e, f)`` and they may run in any order. What this priority does is
|
||||
sort those three by their priority before submitting them to be
|
||||
worked on (so that instead of say a random run order they will now be
|
||||
ran by there sorted order). This is also true when reverting (in that the
|
||||
sort order of the potential nodes will be used to determine the
|
||||
submission order).
|
||||
"""
|
||||
|
||||
def __init__(self, name=None, provides=None, inject=None):
|
||||
self.name = name
|
||||
self.version = (1, 0)
|
||||
|
@@ -108,9 +108,14 @@ class MachineBuilder(object):
|
||||
timeout = WAITING_TIMEOUT
|
||||
|
||||
# Cache some local functions/methods...
|
||||
do_schedule = self._scheduler.schedule
|
||||
do_complete = self._completer.complete
|
||||
|
||||
def do_schedule(next_nodes):
|
||||
return self._scheduler.schedule(
|
||||
sorted(next_nodes,
|
||||
key=lambda node: getattr(node, 'priority', 0),
|
||||
reverse=True))
|
||||
|
||||
def is_runnable():
|
||||
# Checks if the storage says the flow is still runnable...
|
||||
return self._storage.get_flow_state() == st.RUNNING
|
||||
|
@@ -461,6 +461,32 @@ class EngineParallelFlowTest(utils.EngineTestBase):
|
||||
engine = self._make_engine(flow)
|
||||
self.assertRaises(exc.Empty, engine.run)
|
||||
|
||||
def test_parallel_flow_with_priority(self):
|
||||
flow = uf.Flow('p-1')
|
||||
for i in range(0, 10):
|
||||
t = utils.ProgressingTask(name='task%s' % i)
|
||||
t.priority = i
|
||||
flow.add(t)
|
||||
engine = self._make_engine(flow)
|
||||
with utils.CaptureListener(engine, capture_flow=False) as capturer:
|
||||
engine.run()
|
||||
expected = [
|
||||
'task9.t RUNNING',
|
||||
'task8.t RUNNING',
|
||||
'task7.t RUNNING',
|
||||
'task6.t RUNNING',
|
||||
'task5.t RUNNING',
|
||||
'task4.t RUNNING',
|
||||
'task3.t RUNNING',
|
||||
'task2.t RUNNING',
|
||||
'task1.t RUNNING',
|
||||
'task0.t RUNNING',
|
||||
]
|
||||
# NOTE(harlowja): chop off the gathering of SUCCESS states, since we
|
||||
# don't care if thats in order...
|
||||
gotten = capturer.values[0:10]
|
||||
self.assertEqual(expected, gotten)
|
||||
|
||||
def test_parallel_flow_one_task(self):
|
||||
flow = uf.Flow('p-1').add(
|
||||
utils.ProgressingTask(name='task1', provides='a')
|
||||
|
Reference in New Issue
Block a user