From c19758dd4172919db0e2273a9907574a7b27b23a Mon Sep 17 00:00:00 2001 From: "Ivan A. Melnikov" Date: Thu, 19 Dec 2013 10:24:41 +0200 Subject: [PATCH] Refactor engine internals In this change we: - introduce single graph action, that works with futures - split methods of TaskAction into schedule/complete parts, with schedule* returns future; - task executor became entity that manages Executor, instead of engine + graph action. Change-Id: Ic0704534391abdd566bf2d87adabef43a9c88cfb --- taskflow/engines/action_engine/engine.py | 42 +--- taskflow/engines/action_engine/executor.py | 9 +- .../engines/action_engine/graph_action.py | 181 ++++++------------ taskflow/engines/action_engine/task_action.py | 32 ++-- taskflow/tests/unit/test_action_engine.py | 4 +- 5 files changed, 92 insertions(+), 176 deletions(-) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index ac430956..349f28b7 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -18,8 +18,6 @@ import threading -from concurrent import futures - from taskflow.engines.action_engine import executor from taskflow.engines.action_engine import graph_action from taskflow.engines.action_engine import task_action @@ -34,7 +32,6 @@ from taskflow.utils import flow_utils from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import reflection -from taskflow.utils import threading_utils class ActionEngine(base.EngineBase): @@ -192,46 +189,19 @@ class ActionEngine(base.EngineBase): class SingleThreadedActionEngine(ActionEngine): # NOTE(harlowja): This one attempts to run in a serial manner. - _graph_action_cls = graph_action.SequentialGraphAction + _graph_action_cls = graph_action.FutureGraphAction _storage_cls = t_storage.Storage class MultiThreadedActionEngine(ActionEngine): # NOTE(harlowja): This one attempts to run in a parallel manner. - _graph_action_cls = graph_action.ParallelGraphAction + _graph_action_cls = graph_action.FutureGraphAction _storage_cls = t_storage.ThreadSafeStorage + def _task_executor_cls(self): + return executor.ParallelTaskExecutor(self._executor) + def __init__(self, flow, flow_detail, backend, conf): + self._executor = conf.get('executor', None) super(MultiThreadedActionEngine, self).__init__( flow, flow_detail, backend, conf) - self._executor = conf.get('executor', None) - - @lock_utils.locked - def run(self): - if self._executor is None: - # NOTE(harlowja): since no executor was provided we have to create - # one, and also ensure that we shutdown the one we create to - # ensure that we don't leak threads. - thread_count = threading_utils.get_optimal_thread_count() - self._executor = futures.ThreadPoolExecutor(thread_count) - owns_executor = True - else: - owns_executor = False - - try: - ActionEngine.run(self) - finally: - # Don't forget to shutdown the executor!! - if owns_executor: - try: - self._executor.shutdown(wait=True) - finally: - self._executor = None - - @property - def executor(self): - """Returns the current executor, if no executor is provided on - construction then this executor will change each time the engine - is ran. - """ - return self._executor diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 3526bfc0..f7bc0e00 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -22,6 +22,7 @@ import six from concurrent import futures +from taskflow.utils import eventlet_utils as eu from taskflow.utils import misc from taskflow.utils import threading_utils @@ -83,7 +84,7 @@ class TaskExecutorBase(object): """Schedules task reversion""" @abc.abstractmethod - def wait_for_any(self, fs): + def wait_for_any(self, fs, timeout=None): """Wait for futures returned by this executor to complete""" def start(self): @@ -114,7 +115,7 @@ class SerialTaskExecutor(TaskExecutorBase): _revert_task(task, arguments, result, failures, progress_callback)) - def wait_for_any(self, fs): + def wait_for_any(self, fs, timeout=None): # NOTE(imelnikov): this executor returns only done futures return fs, [] @@ -140,8 +141,8 @@ class ParallelTaskExecutor(TaskExecutorBase): _revert_task, task, arguments, result, failures, progress_callback) - def wait_for_any(self, fs): - return futures.wait(fs, return_when=futures.FIRST_COMPLETED) + def wait_for_any(self, fs, timeout=None): + return eu.wait_for_any(fs, timeout) def start(self): if self._own_executor: diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index b1fd9542..e67688ef 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -16,11 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import logging -import threading - -from concurrent import futures from taskflow import states as st from taskflow.utils import misc @@ -67,130 +63,75 @@ class GraphAction(object): return deps_counter -class SequentialGraphAction(GraphAction): +_WAITING_TIMEOUT = 60 # in seconds + + +class FutureGraphAction(GraphAction): + """Graph action build around futures returned by task action. + + This graph action schedules all task it can for execution and than + waits on returned futures. If task executor is able to execute tasks + in parallel, this enables parallel flow run and reversion. + """ def execute(self, engine): - deps_counter = self._get_nodes_dependencies_count() - to_execute = self._browse_nodes_to_execute(deps_counter) + was_suspended = self._run(engine, lambda: engine.is_running, + engine.task_action.schedule_execution, + engine.task_action.complete_execution, + revert=False) - while to_execute and engine.is_running: - node = to_execute.pop() - engine.task_action.execute(node) - to_execute += self._resolve_dependencies(node, deps_counter) - - if to_execute: - return st.SUSPENDED - return st.SUCCESS + return st.SUSPENDED if was_suspended else st.SUCCESS def revert(self, engine): - deps_counter = self._get_nodes_dependencies_count(True) - to_revert = self._browse_nodes_to_execute(deps_counter) + was_suspended = self._run(engine, lambda: engine.is_reverting, + engine.task_action.schedule_reversion, + engine.task_action.complete_reversion, + revert=True) + return st.SUSPENDED if was_suspended else st.REVERTED - while to_revert and engine.is_reverting: - node = to_revert.pop() - engine.task_action.revert(node) - to_revert += self._resolve_dependencies(node, deps_counter, True) + def _run(self, engine, running, schedule_node, complete_node, revert): + deps_counter = self._get_nodes_dependencies_count(revert) + not_done = [] - if to_revert: - return st.SUSPENDED - return st.REVERTED - - -class ParallelGraphAction(SequentialGraphAction): - def execute(self, engine): - """This action executes the provided graph in parallel by selecting - nodes which can run (those which have there dependencies satisfied - or those with no dependencies) and submitting them to the executor - to be ran, and then after running this process will be repeated until - no more nodes can be ran (or a failure has a occurred and all nodes - were stopped from further running). - """ - # A deque is a thread safe push/pop/popleft/append implementation - all_futures = collections.deque() - executor = engine.executor - has_failed = threading.Event() - deps_lock = threading.RLock() - deps_counter = self._get_nodes_dependencies_count() - was_suspended = threading.Event() - - def submit_followups(node): - # Mutating the deps_counter isn't thread safe. - with deps_lock: - to_execute = self._resolve_dependencies(node, deps_counter) - submit_count = 0 - for n in to_execute: - try: - all_futures.append(executor.submit(run_node, n)) - submit_count += 1 - except RuntimeError: - # Someone shutdown the executor while we are still - # using it, get out as quickly as we can... - has_failed.set() - break - return submit_count - - def run_node(node): - if has_failed.is_set(): - # Someone failed, don't even bother running. - return - try: - if engine.is_running: - engine.task_action.execute(node) + def schedule(nodes): + for node in nodes: + future = schedule_node(node) + if future is not None: + not_done.append(future) else: - was_suspended.set() - return - except Exception: - # Make sure others don't continue working (although they may - # be already actively working, but u can't stop that anyway). - has_failed.set() - raise - if has_failed.is_set(): - # Someone else failed, don't even bother submitting any - # followup jobs. - return - # NOTE(harlowja): the future itself will not return until after it - # submits followup tasks, this keeps the parent thread waiting for - # more results since the all_futures deque will not be empty until - # everyone stops submitting followups. - submitted = submit_followups(node) - LOG.debug("After running %s, %s followup actions were submitted", - node, submitted) + schedule(self._resolve_dependencies( + node, deps_counter, revert)) - # Nothing to execute in the first place - if not deps_counter: - return st.SUCCESS - - # Ensure that we obtain the lock just in-case the functions submitted - # immediately themselves start submitting there own jobs (which could - # happen if they are very quick). - with deps_lock: - to_execute = self._browse_nodes_to_execute(deps_counter) - for n in to_execute: - try: - all_futures.append(executor.submit(run_node, n)) - except RuntimeError: - # Someone shutdown the executor while we are still using - # it, get out as quickly as we can.... - break - - # Keep on continuing to consume the futures until there are no more - # futures to consume so that we can get there failures. Notice that - # results are not captured, as results of tasks go into storage and - # do not get returned here. + schedule(self._browse_nodes_to_execute(deps_counter)) failures = [] - while len(all_futures): - # Take in FIFO order, not in LIFO order. - f = all_futures.popleft() - try: - f.result() - except futures.CancelledError: - # TODO(harlowja): can we use the cancellation feature to - # actually achieve cancellation in taskflow?? - pass - except Exception: - failures.append(misc.Failure()) + + was_suspended = False + while not_done: + # NOTE(imelnikov): if timeout occurs before any of futures + # completes, done list will be empty and we'll just go + # for next iteration + done, not_done = engine.task_action.wait_for_any( + not_done, _WAITING_TIMEOUT) + + not_done = list(not_done) + next_nodes = [] + for future in done: + node, _event, result = future.result() + complete_node(node, result) + if isinstance(result, misc.Failure): + failures.append(result) + else: + next_nodes.extend(self._resolve_dependencies( + node, deps_counter, revert)) + + if next_nodes: + if running() and not failures: + schedule(next_nodes) + else: + # NOTE(imelnikov): engine stopped while there were + # still some tasks to do, so we either failed + # or were suspended + was_suspended = True + misc.Failure.reraise_if_any(failures) - if was_suspended.is_set(): - return st.SUSPENDED - else: - return st.SUCCESS + return was_suspended diff --git a/taskflow/engines/action_engine/task_action.py b/taskflow/engines/action_engine/task_action.py index 030bff81..6742a2d1 100644 --- a/taskflow/engines/action_engine/task_action.py +++ b/taskflow/engines/action_engine/task_action.py @@ -62,21 +62,21 @@ class TaskAction(object): LOG.exception("Failed setting task progress for %s to %0.3f", task, progress) - def execute(self, task): + def schedule_execution(self, task): if not self._change_state(task, states.RUNNING, progress=0.0): return - kwargs = self._storage.fetch_mapped_args(task.rebind) - future = self._task_executor.execute_task(task, kwargs, - self._on_update_progress) - self._task_executor.wait_for_any(future) - _task, _event, result = future.result() + return self._task_executor.execute_task(task, kwargs, + self._on_update_progress) + + def complete_execution(self, task, result): if isinstance(result, misc.Failure): self._change_state(task, states.FAILURE, result=result) - result.reraise() - self._change_state(task, states.SUCCESS, result=result, progress=1.0) + else: + self._change_state(task, states.SUCCESS, + result=result, progress=1.0) - def revert(self, task): + def schedule_reversion(self, task): if not self._change_state(task, states.REVERTING, progress=0.0): return kwargs = self._storage.fetch_mapped_args(task.rebind) @@ -85,9 +85,13 @@ class TaskAction(object): future = self._task_executor.revert_task(task, kwargs, task_result, failures, self._on_update_progress) - self._task_executor.wait_for_any(future) - _task, _event, result = future.result() - if isinstance(result, misc.Failure): + return future + + def complete_reversion(self, task, rev_result): + if isinstance(rev_result, misc.Failure): self._change_state(task, states.FAILURE) - result.reraise() - self._change_state(task, states.REVERTED, progress=1.0) + else: + self._change_state(task, states.REVERTED, progress=1.0) + + def wait_for_any(self, fs, timeout): + return self._task_executor.wait_for_any(fs, timeout) diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index ac88a1b9..3686b0a0 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -598,7 +598,7 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) self.assertIsInstance(engine, eng.MultiThreadedActionEngine) - self.assertIs(engine.executor, None) + self.assertIs(engine._executor, None) def test_using_common_executor(self): flow = utils.TaskNoRequiresNoReturns(name='task1') @@ -606,7 +606,7 @@ class MultiThreadedEngineTest(EngineTaskTest, try: e1 = self._make_engine(flow, executor=executor) e2 = self._make_engine(flow, executor=executor) - self.assertIs(e1.executor, e2.executor) + self.assertIs(e1._executor, e2._executor) finally: executor.shutdown(wait=True)