diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py index 48846746..3a014e12 100644 --- a/taskflow/engines/action_engine/actions/base.py +++ b/taskflow/engines/action_engine/actions/base.py @@ -37,3 +37,19 @@ class Action(object): def __init__(self, storage, notifier): self._storage = storage self._notifier = notifier + + @abc.abstractmethod + def schedule_execution(self, atom): + """Schedules atom execution.""" + + @abc.abstractmethod + def schedule_reversion(self, atom): + """Schedules atom reversion.""" + + @abc.abstractmethod + def complete_reversion(self, atom, result): + """Completes atom reversion.""" + + @abc.abstractmethod + def complete_execution(self, atom, result): + """Completes atom execution.""" diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py index 0be19af9..126b9038 100644 --- a/taskflow/engines/action_engine/actions/retry.py +++ b/taskflow/engines/action_engine/actions/retry.py @@ -14,10 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. -import futurist - from taskflow.engines.action_engine.actions import base -from taskflow.engines.action_engine import executor as ex from taskflow import logging from taskflow import retry as retry_atom from taskflow import states @@ -26,28 +23,12 @@ from taskflow.types import failure LOG = logging.getLogger(__name__) -def _execute_retry(retry, arguments): - try: - result = retry.execute(**arguments) - except Exception: - result = failure.Failure() - return (ex.EXECUTED, result) - - -def _revert_retry(retry, arguments): - try: - result = retry.revert(**arguments) - except Exception: - result = failure.Failure() - return (ex.REVERTED, result) - - class RetryAction(base.Action): """An action that handles executing, state changes, ... of retry atoms.""" - def __init__(self, storage, notifier): + def __init__(self, storage, notifier, retry_executor): super(RetryAction, self).__init__(storage, notifier) - self._executor = futurist.SynchronousExecutor() + self._retry_executor = retry_executor def _get_retry_args(self, retry, addons=None): arguments = self._storage.fetch_mapped_args( @@ -88,41 +69,30 @@ class RetryAction(base.Action): details['result'] = result self._notifier.notify(state, details) - def execute(self, retry): - - def _on_done_callback(fut): - result = fut.result()[-1] - if isinstance(result, failure.Failure): - self.change_state(retry, states.FAILURE, result=result) - else: - self.change_state(retry, states.SUCCESS, result=result) - + def schedule_execution(self, retry): self.change_state(retry, states.RUNNING) - fut = self._executor.submit(_execute_retry, retry, - self._get_retry_args(retry)) - fut.add_done_callback(_on_done_callback) - fut.atom = retry - return fut + return self._retry_executor.execute_retry( + retry, self._get_retry_args(retry)) - def revert(self, retry): + def complete_reversion(self, retry, result): + if isinstance(result, failure.Failure): + self.change_state(retry, states.REVERT_FAILURE, result=result) + else: + self.change_state(retry, states.REVERTED, result=result) - def _on_done_callback(fut): - result = fut.result()[-1] - if isinstance(result, failure.Failure): - self.change_state(retry, states.REVERT_FAILURE, result=result) - else: - self.change_state(retry, states.REVERTED, result=result) + def complete_execution(self, retry, result): + if isinstance(result, failure.Failure): + self.change_state(retry, states.FAILURE, result=result) + else: + self.change_state(retry, states.SUCCESS, result=result) + def schedule_reversion(self, retry): self.change_state(retry, states.REVERTING) arg_addons = { retry_atom.REVERT_FLOW_FAILURES: self._storage.get_failures(), } - fut = self._executor.submit(_revert_retry, retry, - self._get_retry_args(retry, - addons=arg_addons)) - fut.add_done_callback(_on_done_callback) - fut.atom = retry - return fut + return self._retry_executor.revert_retry( + retry, self._get_retry_args(retry, addons=arg_addons)) def on_failure(self, retry, atom, last_failure): self._storage.save_retry_failure(retry.name, atom.name, last_failure) diff --git a/taskflow/engines/action_engine/actions/task.py b/taskflow/engines/action_engine/actions/task.py index 7ae6b55f..ac117e1c 100644 --- a/taskflow/engines/action_engine/actions/task.py +++ b/taskflow/engines/action_engine/actions/task.py @@ -134,10 +134,9 @@ class TaskAction(base.Action): task) else: progress_callback = None - future = self._task_executor.revert_task( + return self._task_executor.revert_task( task, task_uuid, arguments, task_result, failures, progress_callback=progress_callback) - return future def complete_reversion(self, task, result): if isinstance(result, failure.Failure): @@ -145,6 +144,3 @@ class TaskAction(base.Action): else: self.change_state(task, states.REVERTED, progress=1.0, result=result) - - def wait_for_any(self, fs, timeout): - return self._task_executor.wait_for_any(fs, timeout) diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 47300a46..0ab727a4 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -106,9 +106,9 @@ class Completer(object): def __init__(self, runtime): self._runtime = weakref.proxy(runtime) self._analyzer = runtime.analyzer - self._retry_action = runtime.retry_action self._storage = runtime.storage self._task_action = runtime.task_action + self._retry_action = runtime.retry_action self._undefined_resolver = RevertAll(self._runtime) def _complete_task(self, task, event, result): @@ -118,6 +118,13 @@ class Completer(object): else: self._task_action.complete_reversion(task, result) + def _complete_retry(self, retry, event, result): + """Completes the given retry, processes retry failure.""" + if event == ex.EXECUTED: + self._retry_action.complete_execution(retry, result) + else: + self._retry_action.complete_reversion(retry, result) + def resume(self): """Resumes nodes in the contained graph. @@ -148,6 +155,8 @@ class Completer(object): """ if isinstance(node, task_atom.BaseTask): self._complete_task(node, event, result) + else: + self._complete_retry(node, event, result) if isinstance(result, failure.Failure): if event == ex.EXECUTED: self._process_atom_failure(node, result) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 9dae8d46..ef557f93 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -41,13 +41,17 @@ LOG = logging.getLogger(__name__) @contextlib.contextmanager -def _start_stop(executor): - # A teenie helper context manager to safely start/stop a executor... - executor.start() +def _start_stop(task_executor, retry_executor): + # A teenie helper context manager to safely start/stop engine executors... + task_executor.start() try: - yield executor + retry_executor.start() + try: + yield (task_executor, retry_executor) + finally: + retry_executor.stop() finally: - executor.stop() + task_executor.stop() class ActionEngine(base.Engine): @@ -82,6 +86,9 @@ class ActionEngine(base.Engine): self._lock = threading.RLock() self._state_lock = threading.RLock() self._storage_ensured = False + # Retries are not *currently* executed out of the engines process + # or thread (this could change in the future if we desire it to). + self._retry_executor = executor.SerialRetryExecutor() def _check(self, name, check_compiled, check_storage_ensured): """Check (and raise) if the engine has not reached a certain stage.""" @@ -167,7 +174,7 @@ class ActionEngine(base.Engine): self.validate() runner = self._runtime.runner last_state = None - with _start_stop(self._task_executor): + with _start_stop(self._task_executor, self._retry_executor): self._change_state(states.RUNNING) try: closed = False @@ -305,7 +312,8 @@ class ActionEngine(base.Engine): self._runtime = runtime.Runtime(self._compilation, self.storage, self.atom_notifier, - self._task_executor) + self._task_executor, + self._retry_executor) self._runtime.compile() self._compiled = True diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index f03aa8e2..b47322d7 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -33,7 +33,6 @@ from taskflow import logging from taskflow import task as task_atom from taskflow.types import failure from taskflow.types import notifier -from taskflow.utils import async_utils from taskflow.utils import threading_utils # Execution and reversion events. @@ -58,6 +57,22 @@ _KIND_EVENT = 'event' LOG = logging.getLogger(__name__) +def _execute_retry(retry, arguments): + try: + result = retry.execute(**arguments) + except Exception: + result = failure.Failure() + return (EXECUTED, result) + + +def _revert_retry(retry, arguments): + try: + result = retry.revert(**arguments) + except Exception: + result = failure.Failure() + return (REVERTED, result) + + def _execute_task(task, arguments, progress_callback=None): with notifier.register_deregister(task.notifier, _UPDATE_PROGRESS, @@ -322,6 +337,33 @@ class _Dispatcher(object): self._dead.wait(leftover) +class SerialRetryExecutor(object): + """Executes and reverts retries.""" + + def __init__(self): + self._executor = futurist.SynchronousExecutor() + + def start(self): + """Prepare to execute retries.""" + self._executor.restart() + + def stop(self): + """Finalize retry executor.""" + self._executor.shutdown() + + def execute_retry(self, retry, arguments): + """Schedules retry execution.""" + fut = self._executor.submit(_execute_retry, retry, arguments) + fut.atom = retry + return fut + + def revert_retry(self, retry, arguments): + """Schedules retry reversion.""" + fut = self._executor.submit(_revert_retry, retry, arguments) + fut.atom = retry + return fut + + @six.add_metaclass(abc.ABCMeta) class TaskExecutor(object): """Executes and reverts tasks. @@ -341,10 +383,6 @@ class TaskExecutor(object): progress_callback=None): """Schedules task reversion.""" - def wait_for_any(self, fs, timeout=None): - """Wait for futures returned by this executor to complete.""" - return async_utils.wait_for_any(fs, timeout=timeout) - def start(self): """Prepare to execute tasks.""" diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py index e8cd1734..f02f3f09 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/runner.py @@ -117,7 +117,6 @@ class Runner(object): # Cache some local functions/methods... do_schedule = self._scheduler.schedule - wait_for_any = self._waiter.wait_for_any do_complete = self._completer.complete def iter_next_nodes(target_node=None): @@ -171,7 +170,7 @@ class Runner(object): # call sometime in the future, or equivalent that will work in # py2 and py3. if memory.not_done: - done, not_done = wait_for_any(memory.not_done, timeout) + done, not_done = self._waiter(memory.not_done, timeout=timeout) memory.done.update(done) memory.not_done = not_done return _ANALYZE diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 2616b868..38998b8c 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -26,6 +26,7 @@ from taskflow.engines.action_engine import scopes as sc from taskflow import flow as flow_type from taskflow import states as st from taskflow import task +from taskflow.utils import async_utils from taskflow.utils import misc @@ -37,9 +38,11 @@ class Runtime(object): action engine to run to completion. """ - def __init__(self, compilation, storage, atom_notifier, task_executor): + def __init__(self, compilation, storage, atom_notifier, + task_executor, retry_executor): self._atom_notifier = atom_notifier self._task_executor = task_executor + self._retry_executor = retry_executor self._storage = storage self._compilation = compilation self._atom_cache = {} @@ -111,7 +114,7 @@ class Runtime(object): @misc.cachedproperty def runner(self): - return ru.Runner(self, self._task_executor) + return ru.Runner(self, async_utils.wait_for_any) @misc.cachedproperty def completer(self): @@ -132,7 +135,8 @@ class Runtime(object): @misc.cachedproperty def retry_action(self): return ra.RetryAction(self._storage, - self._atom_notifier) + self._atom_notifier, + self._retry_executor) @misc.cachedproperty def task_action(self): diff --git a/taskflow/engines/action_engine/scheduler.py b/taskflow/engines/action_engine/scheduler.py index 4ab0b0e1..404781e6 100644 --- a/taskflow/engines/action_engine/scheduler.py +++ b/taskflow/engines/action_engine/scheduler.py @@ -37,13 +37,13 @@ class RetryScheduler(object): """ intention = self._storage.get_atom_intention(retry.name) if intention == st.EXECUTE: - return self._retry_action.execute(retry) + return self._retry_action.schedule_execution(retry) elif intention == st.REVERT: - return self._retry_action.revert(retry) + return self._retry_action.schedule_reversion(retry) elif intention == st.RETRY: self._retry_action.change_state(retry, st.RETRYING) self._runtime.retry_subflow(retry) - return self._retry_action.execute(retry) + return self._retry_action.schedule_execution(retry) else: raise excp.ExecutionFailure("Unknown how to schedule retry with" " intention: %s" % intention) diff --git a/taskflow/tests/unit/action_engine/test_runner.py b/taskflow/tests/unit/action_engine/test_runner.py index 7af2bd78..bff74cb9 100644 --- a/taskflow/tests/unit/action_engine/test_runner.py +++ b/taskflow/tests/unit/action_engine/test_runner.py @@ -42,10 +42,12 @@ class _RunnerTestMixin(object): store.set_flow_state(initial_state) task_notifier = notifier.Notifier() task_executor = executor.SerialTaskExecutor() + retry_executor = executor.SerialRetryExecutor() task_executor.start() self.addCleanup(task_executor.stop) r = runtime.Runtime(compilation, store, - task_notifier, task_executor) + task_notifier, task_executor, + retry_executor) r.compile() return r diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index 504433de..0fad2bd3 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -17,7 +17,6 @@ import threading import time -import futurist from oslo_utils import fixture from oslo_utils import timeutils @@ -58,8 +57,6 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.request_inst_mock.uuid = self.task_uuid self.request_inst_mock.expired = False self.request_inst_mock.task_cls = self.task.name - self.wait_for_any_mock = self.patch( - 'taskflow.engines.action_engine.executor.async_utils.wait_for_any') self.message_mock = mock.MagicMock(name='message') self.message_mock.properties = {'correlation_id': self.task_uuid, 'type': pr.RESPONSE} @@ -281,27 +278,6 @@ class TestWorkerTaskExecutor(test.MockTestCase): ] self.assertEqual(expected_calls, self.master_mock.mock_calls) - def test_wait_for_any(self): - fs = [futurist.Future(), futurist.Future()] - ex = self.executor() - ex.wait_for_any(fs) - - expected_calls = [ - mock.call(fs, timeout=None) - ] - self.assertEqual(self.wait_for_any_mock.mock_calls, expected_calls) - - def test_wait_for_any_with_timeout(self): - timeout = 30 - fs = [futurist.Future(), futurist.Future()] - ex = self.executor() - ex.wait_for_any(fs, timeout) - - master_mock_calls = [ - mock.call(fs, timeout=timeout) - ] - self.assertEqual(self.wait_for_any_mock.mock_calls, master_mock_calls) - def test_start_stop(self): ex = self.executor() ex.start() diff --git a/taskflow/tests/unit/worker_based/test_pipeline.py b/taskflow/tests/unit/worker_based/test_pipeline.py index 3030b831..a2075763 100644 --- a/taskflow/tests/unit/worker_based/test_pipeline.py +++ b/taskflow/tests/unit/worker_based/test_pipeline.py @@ -24,6 +24,7 @@ from taskflow.engines.worker_based import server as worker_server from taskflow import test from taskflow.tests import utils as test_utils from taskflow.types import failure +from taskflow.utils import async_utils from taskflow.utils import threading_utils @@ -77,7 +78,7 @@ class TestPipeline(test.TestCase): progress_callback = lambda *args, **kwargs: None f = executor.execute_task(t, uuidutils.generate_uuid(), {}, progress_callback=progress_callback) - executor.wait_for_any([f]) + async_utils.wait_for_any([f]) event, result = f.result() self.assertEqual(1, result) @@ -93,7 +94,7 @@ class TestPipeline(test.TestCase): progress_callback = lambda *args, **kwargs: None f = executor.execute_task(t, uuidutils.generate_uuid(), {}, progress_callback=progress_callback) - executor.wait_for_any([f]) + async_utils.wait_for_any([f]) action, result = f.result() self.assertIsInstance(result, failure.Failure)