From fc2a057ac7d39b87157de1f53ba7bbad2bd62aa3 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sat, 12 Oct 2013 23:19:12 -0700 Subject: [PATCH] Beef up the action engine comments Also make a few minor tweaks to the underlying functions and relax the networkx requirement slightly. Change-Id: Ibbfa7f786d3c33956f7ca39b7ea9423696859e45 --- requirements.txt | 2 +- taskflow/engines/action_engine/engine.py | 44 ++++++++++++++++--- taskflow/engines/action_engine/task_action.py | 2 +- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/requirements.txt b/requirements.txt index 03a3957f..e4e1c2c9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ six SQLAlchemy>=0.7.8,<=0.7.99 alembic>=0.4.1 # Very nice graph library -networkx>=1.8.1 +networkx>=1.8 Babel>=1.3 # Used for backend storage engine loading. stevedore>=0.10 diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 660c0777..11e7435b 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -40,7 +40,15 @@ from taskflow.utils import threading_utils class ActionEngine(base.EngineBase): """Generic action-based engine. - Converts the flow to recursive structure of actions. + This engine flattens the flow (and any subflows) into a execution graph + which contains the full runtime definition to be executed and then uses + this graph in combination with the action classes & storage to attempt to + run your flow (and any subflows & contained tasks) to completion. + + During this process it is permissible and valid to have a task or multiple + tasks in the execution graph fail, which will cause the process of + reversion to commence. See the valid states in the states module to learn + more about what other states the tasks & flow being ran can go through. """ _graph_action = None @@ -72,6 +80,13 @@ class ActionEngine(base.EngineBase): return "%s: %s" % (reflection.get_class_name(self), id(self)) def suspend(self): + """Attempts to suspend the engine. + + If the engine is currently running tasks then this will attempt to + suspend future work from being started (currently active tasks can + not currently be preempted) and move the engine into a suspend state + which can then later be resumed from. + """ self._change_state(states.SUSPENDING) def get_graph(self): @@ -80,6 +95,7 @@ class ActionEngine(base.EngineBase): @lock_utils.locked def run(self): + """Runs the flow in the engine to completion.""" self.compile() external_provides = set(self.storage.fetch_all().keys()) missing = self._flow.requires - external_provides @@ -122,7 +138,11 @@ class ActionEngine(base.EngineBase): old_state=old_state) self.notifier.notify(state, details) - def on_task_state_change(self, task_action, state, result=None): + def _on_task_state_change(self, task_action, state, result=None): + """Notifies the engine that the following task action has completed + a given state with a given result. This is a *internal* to the action + engine and its associated action classes, not for use externally. + """ if isinstance(result, misc.Failure): self._failures[task_action.uuid] = result details = dict(engine=self, @@ -131,7 +151,12 @@ class ActionEngine(base.EngineBase): result=result) self.task_notifier.notify(state, details) + @lock_utils.locked def compile(self): + """Compiles the contained flow into a structure which the engine can + use to run or if this can not be done then an exception is thrown + indicating why this compilation could not be achieved. + """ if self._root is not None: return @@ -180,13 +205,13 @@ class ActionEngine(base.EngineBase): class SingleThreadedActionEngine(ActionEngine): - # This one attempts to run in a serial manner. + # NOTE(harlowja): This one attempts to run in a serial manner. _graph_action = graph_action.SequentialGraphAction _storage_cls = t_storage.Storage class MultiThreadedActionEngine(ActionEngine): - # This one attempts to run in a parallel manner. + # NOTE(harlowja): This one attempts to run in a parallel manner. _graph_action = graph_action.ParallelGraphAction _storage_cls = t_storage.ThreadSafeStorage @@ -198,8 +223,11 @@ class MultiThreadedActionEngine(ActionEngine): @lock_utils.locked def run(self): if self._executor is None: - self._executor = futures.ThreadPoolExecutor( - threading_utils.get_optimal_thread_count()) + # NOTE(harlowja): since no executor was provided we have to create + # one, and also ensure that we shutdown the one we create to + # ensure that we don't leak threads. + thread_count = threading_utils.get_optimal_thread_count() + self._executor = futures.ThreadPoolExecutor(thread_count) owns_executor = True else: owns_executor = False @@ -216,4 +244,8 @@ class MultiThreadedActionEngine(ActionEngine): @property def executor(self): + """Returns the current executor, if no executor is provided on + construction then this executor will change each time the engine + is ran. + """ return self._executor diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index ffd2b521..bee0266f 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -65,7 +65,7 @@ class TaskAction(base.Action): engine.storage.set_task_state(self.uuid, state) if progress is not None: engine.storage.set_task_progress(self.uuid, progress) - engine.on_task_state_change(self, state, result=result) + engine._on_task_state_change(self, state, result=result) def _on_update_progress(self, task, event_data, progress, **kwargs): """Update task progress value that stored in engine."""