From 359cc490bd2426c412e166b5c455f5bccd87bd9b Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 25 Jun 2015 16:02:21 -0700 Subject: [PATCH] Create and use a serial retry executor To make it easily possible to change the retry atom execution from being in the engine thread this creates a retry executor (which is similar to the task executor) and provide that a serial executor (which it will use to execute with). This makes the retry and task actions closer to being the same and makes the surrounding code that much similar (which makes understanding it easier). Change-Id: I993e938280df3bd97f8076293183ef21989e2dba --- .../engines/action_engine/actions/base.py | 16 +++++ .../engines/action_engine/actions/retry.py | 66 +++++-------------- .../engines/action_engine/actions/task.py | 6 +- taskflow/engines/action_engine/completer.py | 11 +++- taskflow/engines/action_engine/engine.py | 22 +++++-- taskflow/engines/action_engine/executor.py | 48 ++++++++++++-- taskflow/engines/action_engine/runner.py | 3 +- taskflow/engines/action_engine/runtime.py | 10 ++- taskflow/engines/action_engine/scheduler.py | 6 +- .../tests/unit/action_engine/test_runner.py | 4 +- .../tests/unit/worker_based/test_executor.py | 24 ------- .../tests/unit/worker_based/test_pipeline.py | 5 +- 12 files changed, 120 insertions(+), 101 deletions(-) diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py index 48846746..3a014e12 100644 --- a/taskflow/engines/action_engine/actions/base.py +++ b/taskflow/engines/action_engine/actions/base.py @@ -37,3 +37,19 @@ class Action(object): def __init__(self, storage, notifier): self._storage = storage self._notifier = notifier + + @abc.abstractmethod + def schedule_execution(self, atom): + """Schedules atom execution.""" + + @abc.abstractmethod + def schedule_reversion(self, atom): + """Schedules atom reversion.""" + + @abc.abstractmethod + def complete_reversion(self, atom, result): + """Completes atom reversion.""" + + @abc.abstractmethod + def complete_execution(self, atom, result): + """Completes atom execution.""" diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py index 0be19af9..126b9038 100644 --- a/taskflow/engines/action_engine/actions/retry.py +++ b/taskflow/engines/action_engine/actions/retry.py @@ -14,10 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -import futurist - from taskflow.engines.action_engine.actions import base -from taskflow.engines.action_engine import executor as ex from taskflow import logging from taskflow import retry as retry_atom from taskflow import states @@ -26,28 +23,12 @@ from taskflow.types import failure LOG = logging.getLogger(__name__) -def _execute_retry(retry, arguments): - try: - result = retry.execute(**arguments) - except Exception: - result = failure.Failure() - return (ex.EXECUTED, result) - - -def _revert_retry(retry, arguments): - try: - result = retry.revert(**arguments) - except Exception: - result = failure.Failure() - return (ex.REVERTED, result) - - class RetryAction(base.Action): """An action that handles executing, state changes, ... of retry atoms.""" - def __init__(self, storage, notifier): + def __init__(self, storage, notifier, retry_executor): super(RetryAction, self).__init__(storage, notifier) - self._executor = futurist.SynchronousExecutor() + self._retry_executor = retry_executor def _get_retry_args(self, retry, addons=None): arguments = self._storage.fetch_mapped_args( @@ -88,41 +69,30 @@ class RetryAction(base.Action): details['result'] = result self._notifier.notify(state, details) - def execute(self, retry): - - def _on_done_callback(fut): - result = fut.result()[-1] - if isinstance(result, failure.Failure): - self.change_state(retry, states.FAILURE, result=result) - else: - self.change_state(retry, states.SUCCESS, result=result) - + def schedule_execution(self, retry): self.change_state(retry, states.RUNNING) - fut = self._executor.submit(_execute_retry, retry, - self._get_retry_args(retry)) - fut.add_done_callback(_on_done_callback) - fut.atom = retry - return fut + return self._retry_executor.execute_retry( + retry, self._get_retry_args(retry)) - def revert(self, retry): + def complete_reversion(self, retry, result): + if isinstance(result, failure.Failure): + self.change_state(retry, states.REVERT_FAILURE, result=result) + else: + self.change_state(retry, states.REVERTED, result=result) - def _on_done_callback(fut): - result = fut.result()[-1] - if isinstance(result, failure.Failure): - self.change_state(retry, states.REVERT_FAILURE, result=result) - else: - self.change_state(retry, states.REVERTED, result=result) + def complete_execution(self, retry, result): + if isinstance(result, failure.Failure): + self.change_state(retry, states.FAILURE, result=result) + else: + self.change_state(retry, states.SUCCESS, result=result) + def schedule_reversion(self, retry): self.change_state(retry, states.REVERTING) arg_addons = { retry_atom.REVERT_FLOW_FAILURES: self._storage.get_failures(), } - fut = self._executor.submit(_revert_retry, retry, - self._get_retry_args(retry, - addons=arg_addons)) - fut.add_done_callback(_on_done_callback) - fut.atom = retry - return fut + return self._retry_executor.revert_retry( + retry, self._get_retry_args(retry, addons=arg_addons)) def on_failure(self, retry, atom, last_failure): self._storage.save_retry_failure(retry.name, atom.name, last_failure) diff --git a/taskflow/engines/action_engine/actions/task.py b/taskflow/engines/action_engine/actions/task.py index 7ae6b55f..ac117e1c 100644 --- a/taskflow/engines/action_engine/actions/task.py +++ b/taskflow/engines/action_engine/actions/task.py @@ -134,10 +134,9 @@ class TaskAction(base.Action): task) else: progress_callback = None - future = self._task_executor.revert_task( + return self._task_executor.revert_task( task, task_uuid, arguments, task_result, failures, progress_callback=progress_callback) - return future def complete_reversion(self, task, result): if isinstance(result, failure.Failure): @@ -145,6 +144,3 @@ class TaskAction(base.Action): else: self.change_state(task, states.REVERTED, progress=1.0, result=result) - - def wait_for_any(self, fs, timeout): - return self._task_executor.wait_for_any(fs, timeout) diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 47300a46..0ab727a4 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -106,9 +106,9 @@ class Completer(object): def __init__(self, runtime): self._runtime = weakref.proxy(runtime) self._analyzer = runtime.analyzer - self._retry_action = runtime.retry_action self._storage = runtime.storage self._task_action = runtime.task_action + self._retry_action = runtime.retry_action self._undefined_resolver = RevertAll(self._runtime) def _complete_task(self, task, event, result): @@ -118,6 +118,13 @@ class Completer(object): else: self._task_action.complete_reversion(task, result) + def _complete_retry(self, retry, event, result): + """Completes the given retry, processes retry failure.""" + if event == ex.EXECUTED: + self._retry_action.complete_execution(retry, result) + else: + self._retry_action.complete_reversion(retry, result) + def resume(self): """Resumes nodes in the contained graph. @@ -148,6 +155,8 @@ class Completer(object): """ if isinstance(node, task_atom.BaseTask): self._complete_task(node, event, result) + else: + self._complete_retry(node, event, result) if isinstance(result, failure.Failure): if event == ex.EXECUTED: self._process_atom_failure(node, result) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 6d9ee264..fa6247d8 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -41,13 +41,17 @@ LOG = logging.getLogger(__name__) @contextlib.contextmanager -def _start_stop(executor): - # A teenie helper context manager to safely start/stop a executor... - executor.start() +def _start_stop(task_executor, retry_executor): + # A teenie helper context manager to safely start/stop engine executors... + task_executor.start() try: - yield executor + retry_executor.start() + try: + yield (task_executor, retry_executor) + finally: + retry_executor.stop() finally: - executor.stop() + task_executor.stop() class ActionEngine(base.Engine): @@ -82,6 +86,9 @@ class ActionEngine(base.Engine): self._lock = threading.RLock() self._state_lock = threading.RLock() self._storage_ensured = False + # Retries are not *currently* executed out of the engines process + # or thread (this could change in the future if we desire it to). + self._retry_executor = executor.SerialRetryExecutor() def _check(self, name, check_compiled, check_storage_ensured): """Check (and raise) if the engine has not reached a certain stage.""" @@ -167,7 +174,7 @@ class ActionEngine(base.Engine): self.validate() runner = self._runtime.runner last_state = None - with _start_stop(self._task_executor): + with _start_stop(self._task_executor, self._retry_executor): self._change_state(states.RUNNING) try: closed = False @@ -294,7 +301,8 @@ class ActionEngine(base.Engine): self._runtime = runtime.Runtime(self._compilation, self.storage, self.atom_notifier, - self._task_executor) + self._task_executor, + self._retry_executor) self._runtime.compile() self._compiled = True diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index f03aa8e2..b47322d7 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -33,7 +33,6 @@ from taskflow import logging from taskflow import task as task_atom from taskflow.types import failure from taskflow.types import notifier -from taskflow.utils import async_utils from taskflow.utils import threading_utils # Execution and reversion events. @@ -58,6 +57,22 @@ _KIND_EVENT = 'event' LOG = logging.getLogger(__name__) +def _execute_retry(retry, arguments): + try: + result = retry.execute(**arguments) + except Exception: + result = failure.Failure() + return (EXECUTED, result) + + +def _revert_retry(retry, arguments): + try: + result = retry.revert(**arguments) + except Exception: + result = failure.Failure() + return (REVERTED, result) + + def _execute_task(task, arguments, progress_callback=None): with notifier.register_deregister(task.notifier, _UPDATE_PROGRESS, @@ -322,6 +337,33 @@ class _Dispatcher(object): self._dead.wait(leftover) +class SerialRetryExecutor(object): + """Executes and reverts retries.""" + + def __init__(self): + self._executor = futurist.SynchronousExecutor() + + def start(self): + """Prepare to execute retries.""" + self._executor.restart() + + def stop(self): + """Finalize retry executor.""" + self._executor.shutdown() + + def execute_retry(self, retry, arguments): + """Schedules retry execution.""" + fut = self._executor.submit(_execute_retry, retry, arguments) + fut.atom = retry + return fut + + def revert_retry(self, retry, arguments): + """Schedules retry reversion.""" + fut = self._executor.submit(_revert_retry, retry, arguments) + fut.atom = retry + return fut + + @six.add_metaclass(abc.ABCMeta) class TaskExecutor(object): """Executes and reverts tasks. @@ -341,10 +383,6 @@ class TaskExecutor(object): progress_callback=None): """Schedules task reversion.""" - def wait_for_any(self, fs, timeout=None): - """Wait for futures returned by this executor to complete.""" - return async_utils.wait_for_any(fs, timeout=timeout) - def start(self): """Prepare to execute tasks.""" diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py index e8cd1734..f02f3f09 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/runner.py @@ -117,7 +117,6 @@ class Runner(object): # Cache some local functions/methods... do_schedule = self._scheduler.schedule - wait_for_any = self._waiter.wait_for_any do_complete = self._completer.complete def iter_next_nodes(target_node=None): @@ -171,7 +170,7 @@ class Runner(object): # call sometime in the future, or equivalent that will work in # py2 and py3. if memory.not_done: - done, not_done = wait_for_any(memory.not_done, timeout) + done, not_done = self._waiter(memory.not_done, timeout=timeout) memory.done.update(done) memory.not_done = not_done return _ANALYZE diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 2616b868..38998b8c 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -26,6 +26,7 @@ from taskflow.engines.action_engine import scopes as sc from taskflow import flow as flow_type from taskflow import states as st from taskflow import task +from taskflow.utils import async_utils from taskflow.utils import misc @@ -37,9 +38,11 @@ class Runtime(object): action engine to run to completion. """ - def __init__(self, compilation, storage, atom_notifier, task_executor): + def __init__(self, compilation, storage, atom_notifier, + task_executor, retry_executor): self._atom_notifier = atom_notifier self._task_executor = task_executor + self._retry_executor = retry_executor self._storage = storage self._compilation = compilation self._atom_cache = {} @@ -111,7 +114,7 @@ class Runtime(object): @misc.cachedproperty def runner(self): - return ru.Runner(self, self._task_executor) + return ru.Runner(self, async_utils.wait_for_any) @misc.cachedproperty def completer(self): @@ -132,7 +135,8 @@ class Runtime(object): @misc.cachedproperty def retry_action(self): return ra.RetryAction(self._storage, - self._atom_notifier) + self._atom_notifier, + self._retry_executor) @misc.cachedproperty def task_action(self): diff --git a/taskflow/engines/action_engine/scheduler.py b/taskflow/engines/action_engine/scheduler.py index 4ab0b0e1..404781e6 100644 --- a/taskflow/engines/action_engine/scheduler.py +++ b/taskflow/engines/action_engine/scheduler.py @@ -37,13 +37,13 @@ class RetryScheduler(object): """ intention = self._storage.get_atom_intention(retry.name) if intention == st.EXECUTE: - return self._retry_action.execute(retry) + return self._retry_action.schedule_execution(retry) elif intention == st.REVERT: - return self._retry_action.revert(retry) + return self._retry_action.schedule_reversion(retry) elif intention == st.RETRY: self._retry_action.change_state(retry, st.RETRYING) self._runtime.retry_subflow(retry) - return self._retry_action.execute(retry) + return self._retry_action.schedule_execution(retry) else: raise excp.ExecutionFailure("Unknown how to schedule retry with" " intention: %s" % intention) diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index 7af2bd78..bff74cb9 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -42,10 +42,12 @@ class _RunnerTestMixin(object): store.set_flow_state(initial_state) task_notifier = notifier.Notifier() task_executor = executor.SerialTaskExecutor() + retry_executor = executor.SerialRetryExecutor() task_executor.start() self.addCleanup(task_executor.stop) r = runtime.Runtime(compilation, store, - task_notifier, task_executor) + task_notifier, task_executor, + retry_executor) r.compile() return r diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 504433de..0fad2bd3 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -17,7 +17,6 @@ import threading import time -import futurist from oslo_utils import fixture from oslo_utils import timeutils @@ -58,8 +57,6 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.request_inst_mock.uuid = self.task_uuid self.request_inst_mock.expired = False self.request_inst_mock.task_cls = self.task.name - self.wait_for_any_mock = self.patch( - 'taskflow.engines.action_engine.executor.async_utils.wait_for_any') self.message_mock = mock.MagicMock(name='message') self.message_mock.properties = {'correlation_id': self.task_uuid, 'type': pr.RESPONSE} @@ -281,27 +278,6 @@ class TestWorkerTaskExecutor(test.MockTestCase): ] self.assertEqual(expected_calls, self.master_mock.mock_calls) - def test_wait_for_any(self): - fs = [futurist.Future(), futurist.Future()] - ex = self.executor() - ex.wait_for_any(fs) - - expected_calls = [ - mock.call(fs, timeout=None) - ] - self.assertEqual(self.wait_for_any_mock.mock_calls, expected_calls) - - def test_wait_for_any_with_timeout(self): - timeout = 30 - fs = [futurist.Future(), futurist.Future()] - ex = self.executor() - ex.wait_for_any(fs, timeout) - - master_mock_calls = [ - mock.call(fs, timeout=timeout) - ] - self.assertEqual(self.wait_for_any_mock.mock_calls, master_mock_calls) - def test_start_stop(self): ex = self.executor() ex.start() diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index 3030b831..a2075763 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -24,6 +24,7 @@ from taskflow.engines.worker_based import server as worker_server from taskflow import test from taskflow.tests import utils as test_utils from taskflow.types import failure +from taskflow.utils import async_utils from taskflow.utils import threading_utils @@ -77,7 +78,7 @@ class TestPipeline(test.TestCase): progress_callback = lambda *args, **kwargs: None f = executor.execute_task(t, uuidutils.generate_uuid(), {}, progress_callback=progress_callback) - executor.wait_for_any([f]) + async_utils.wait_for_any([f]) event, result = f.result() self.assertEqual(1, result) @@ -93,7 +94,7 @@ class TestPipeline(test.TestCase): progress_callback = lambda *args, **kwargs: None f = executor.execute_task(t, uuidutils.generate_uuid(), {}, progress_callback=progress_callback) - executor.wait_for_any([f]) + async_utils.wait_for_any([f]) action, result = f.result() self.assertIsInstance(result, failure.Failure)