Beef up the action engine comments
Also make a few minor tweaks to the underlying functions and relax the networkx requirement slightly. Change-Id: Ibbfa7f786d3c33956f7ca39b7ea9423696859e45
This commit is contained in:
parent
faf2d155df
commit
fc2a057ac7
@ -6,7 +6,7 @@ six
|
||||
SQLAlchemy>=0.7.8,<=0.7.99
|
||||
alembic>=0.4.1
|
||||
# Very nice graph library
|
||||
networkx>=1.8.1
|
||||
networkx>=1.8
|
||||
Babel>=1.3
|
||||
# Used for backend storage engine loading.
|
||||
stevedore>=0.10
|
||||
|
@ -40,7 +40,15 @@ from taskflow.utils import threading_utils
|
||||
class ActionEngine(base.EngineBase):
|
||||
"""Generic action-based engine.
|
||||
|
||||
Converts the flow to recursive structure of actions.
|
||||
This engine flattens the flow (and any subflows) into a execution graph
|
||||
which contains the full runtime definition to be executed and then uses
|
||||
this graph in combination with the action classes & storage to attempt to
|
||||
run your flow (and any subflows & contained tasks) to completion.
|
||||
|
||||
During this process it is permissible and valid to have a task or multiple
|
||||
tasks in the execution graph fail, which will cause the process of
|
||||
reversion to commence. See the valid states in the states module to learn
|
||||
more about what other states the tasks & flow being ran can go through.
|
||||
"""
|
||||
_graph_action = None
|
||||
|
||||
@ -72,6 +80,13 @@ class ActionEngine(base.EngineBase):
|
||||
return "%s: %s" % (reflection.get_class_name(self), id(self))
|
||||
|
||||
def suspend(self):
|
||||
"""Attempts to suspend the engine.
|
||||
|
||||
If the engine is currently running tasks then this will attempt to
|
||||
suspend future work from being started (currently active tasks can
|
||||
not currently be preempted) and move the engine into a suspend state
|
||||
which can then later be resumed from.
|
||||
"""
|
||||
self._change_state(states.SUSPENDING)
|
||||
|
||||
def get_graph(self):
|
||||
@ -80,6 +95,7 @@ class ActionEngine(base.EngineBase):
|
||||
|
||||
@lock_utils.locked
|
||||
def run(self):
|
||||
"""Runs the flow in the engine to completion."""
|
||||
self.compile()
|
||||
external_provides = set(self.storage.fetch_all().keys())
|
||||
missing = self._flow.requires - external_provides
|
||||
@ -122,7 +138,11 @@ class ActionEngine(base.EngineBase):
|
||||
old_state=old_state)
|
||||
self.notifier.notify(state, details)
|
||||
|
||||
def on_task_state_change(self, task_action, state, result=None):
|
||||
def _on_task_state_change(self, task_action, state, result=None):
|
||||
"""Notifies the engine that the following task action has completed
|
||||
a given state with a given result. This is a *internal* to the action
|
||||
engine and its associated action classes, not for use externally.
|
||||
"""
|
||||
if isinstance(result, misc.Failure):
|
||||
self._failures[task_action.uuid] = result
|
||||
details = dict(engine=self,
|
||||
@ -131,7 +151,12 @@ class ActionEngine(base.EngineBase):
|
||||
result=result)
|
||||
self.task_notifier.notify(state, details)
|
||||
|
||||
@lock_utils.locked
|
||||
def compile(self):
|
||||
"""Compiles the contained flow into a structure which the engine can
|
||||
use to run or if this can not be done then an exception is thrown
|
||||
indicating why this compilation could not be achieved.
|
||||
"""
|
||||
if self._root is not None:
|
||||
return
|
||||
|
||||
@ -180,13 +205,13 @@ class ActionEngine(base.EngineBase):
|
||||
|
||||
|
||||
class SingleThreadedActionEngine(ActionEngine):
|
||||
# This one attempts to run in a serial manner.
|
||||
# NOTE(harlowja): This one attempts to run in a serial manner.
|
||||
_graph_action = graph_action.SequentialGraphAction
|
||||
_storage_cls = t_storage.Storage
|
||||
|
||||
|
||||
class MultiThreadedActionEngine(ActionEngine):
|
||||
# This one attempts to run in a parallel manner.
|
||||
# NOTE(harlowja): This one attempts to run in a parallel manner.
|
||||
_graph_action = graph_action.ParallelGraphAction
|
||||
_storage_cls = t_storage.ThreadSafeStorage
|
||||
|
||||
@ -198,8 +223,11 @@ class MultiThreadedActionEngine(ActionEngine):
|
||||
@lock_utils.locked
|
||||
def run(self):
|
||||
if self._executor is None:
|
||||
self._executor = futures.ThreadPoolExecutor(
|
||||
threading_utils.get_optimal_thread_count())
|
||||
# NOTE(harlowja): since no executor was provided we have to create
|
||||
# one, and also ensure that we shutdown the one we create to
|
||||
# ensure that we don't leak threads.
|
||||
thread_count = threading_utils.get_optimal_thread_count()
|
||||
self._executor = futures.ThreadPoolExecutor(thread_count)
|
||||
owns_executor = True
|
||||
else:
|
||||
owns_executor = False
|
||||
@ -216,4 +244,8 @@ class MultiThreadedActionEngine(ActionEngine):
|
||||
|
||||
@property
|
||||
def executor(self):
|
||||
"""Returns the current executor, if no executor is provided on
|
||||
construction then this executor will change each time the engine
|
||||
is ran.
|
||||
"""
|
||||
return self._executor
|
||||
|
@ -65,7 +65,7 @@ class TaskAction(base.Action):
|
||||
engine.storage.set_task_state(self.uuid, state)
|
||||
if progress is not None:
|
||||
engine.storage.set_task_progress(self.uuid, progress)
|
||||
engine.on_task_state_change(self, state, result=result)
|
||||
engine._on_task_state_change(self, state, result=result)
|
||||
|
||||
def _on_update_progress(self, task, event_data, progress, **kwargs):
|
||||
"""Update task progress value that stored in engine."""
|
||||
|
Loading…
Reference in New Issue
Block a user