From f7daa45d69876072368111b2c55ae5f3c2593287 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 18 Sep 2013 12:32:15 -0700 Subject: [PATCH] Reintegrate parallel action Fixes: bug 1221505 Fixes: bug 1225759 Change-Id: Id4c915d36d0da679b313dba8421ac621aeb7c818 --- taskflow/engines/action_engine/engine.py | 13 ++- .../engines/action_engine/graph_action.py | 106 ++++++++++++++++++ 2 files changed, 116 insertions(+), 3 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index bc0e544c..6e880713 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -40,6 +40,7 @@ class ActionEngine(object): Converts the flow to recursive structure of actions. """ + _graph_action = None def __init__(self, flow, storage): self._failures = [] @@ -100,20 +101,23 @@ class ActionEngine(object): self.task_notifier.notify(state, details) def _translate_flow_to_action(self): - # Flatten the flow into just 1 graph. + assert self._graph_action is not None, ('Graph action class must be' + ' specified') task_graph = flow_utils.flatten(self._flow) - ga = graph_action.SequentialGraphAction(task_graph) + ga = self._graph_action(task_graph) for n in task_graph.nodes_iter(): ga.add(n, task_action.TaskAction(n, self)) return ga - @decorators.locked def compile(self): if self._root is None: self._root = self._translate_flow_to_action() class SingleThreadedActionEngine(ActionEngine): + # This one attempts to run in a serial manner. + _graph_action = graph_action.SequentialGraphAction + def __init__(self, flow, flow_detail=None, book=None, backend=None): if flow_detail is None: flow_detail = p_utils.create_flow_detail(flow, @@ -124,6 +128,9 @@ class SingleThreadedActionEngine(ActionEngine): class MultiThreadedActionEngine(ActionEngine): + # This one attempts to run in a parallel manner. + _graph_action = graph_action.ParallelGraphAction + def __init__(self, flow, flow_detail=None, book=None, backend=None, executor=None): if flow_detail is None: diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index ac4d7f58..6930d276 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -16,7 +16,17 @@ # License for the specific language governing permissions and limitations # under the License. +import collections +import logging +import threading + +from concurrent import futures + from taskflow.engines.action_engine import base_action as base +from taskflow import exceptions as exc +from taskflow.utils import misc + +LOG = logging.getLogger(__name__) class GraphAction(base.Action): @@ -79,3 +89,99 @@ class SequentialGraphAction(GraphAction): action = self._action_mapping[node] action.revert(engine) # raises on failure to_revert += self._resolve_dependencies(node, deps_counter, True) + + +class ParallelGraphAction(SequentialGraphAction): + def execute(self, engine): + """This action executes the provided graph in parallel by selecting + nodes which can run (those which have there dependencies satisified + or those with no dependencies) and submitting them to the executor + to be ran, and then after running this process will be repeated until + no more nodes can be ran (or a failure has a occured and all nodes + were stopped from further running). + """ + # A deque is a thread safe push/pop/popleft/append implementation + all_futures = collections.deque() + executor = engine.executor + has_failed = threading.Event() + deps_lock = threading.RLock() + deps_counter = self._get_nodes_dependencies_count() + + def submit_followups(node): + # Mutating the deps_counter isn't thread safe. + with deps_lock: + to_execute = self._resolve_dependencies(node, deps_counter) + submit_count = 0 + for n in to_execute: + try: + all_futures.append(executor.submit(run_node, n)) + submit_count += 1 + except RuntimeError: + # Someone shutdown the executor while we are still + # using it, get out as quickly as we can... + has_failed.set() + break + return submit_count + + def run_node(node): + if has_failed.is_set(): + # Someone failed, don't even bother running. + return + action = self._action_mapping[node] + try: + action.execute(engine) + except Exception: + # Make sure others don't continue working (although they may + # be already actively working, but u can't stop that anyway). + has_failed.set() + raise + if has_failed.is_set(): + # Someone else failed, don't even bother submitting any + # followup jobs. + return + # NOTE(harlowja): the future itself will not return until after it + # submits followup tasks, this keeps the parent thread waiting for + # more results since the all_futures deque will not be empty until + # everyone stops submitting followups. + submitted = submit_followups(node) + LOG.debug("After running %s, %s followup actions were submitted", + node, submitted) + + # Nothing to execute in the first place + if not deps_counter: + return + + # Ensure that we obtain the lock just in-case the functions submitted + # immediately themselves start submitting there own jobs (which could + # happen if they are very quick). + with deps_lock: + to_execute = self._browse_nodes_to_execute(deps_counter) + for n in to_execute: + try: + all_futures.append(executor.submit(run_node, n)) + except RuntimeError: + # Someone shutdown the executor while we are still using + # it, get out as quickly as we can.... + break + + # Keep on continuing to consume the futures until there are no more + # futures to consume so that we can get there failures. Notice that + # results are not captured, as results of tasks go into storage and + # do not get returned here. + failures = [] + while len(all_futures): + # Take in FIFO order, not in LIFO order. + f = all_futures.popleft() + try: + f.result() + except futures.CancelledError: + # TODO(harlowja): can we use the cancellation feature to + # actually achieve cancellation in taskflow?? + pass + except Exception: + failures.append(misc.Failure()) + if len(failures) > 1: + raise exc.LinkedException.link([fail.exc_info + for fail in failures]) + elif len(failures) == 1: + failures[0].reraise()