From b79f91e9ec5be543f857ad97c991739750caee6d Mon Sep 17 00:00:00 2001 From: Vitalii Solodilov Date: Tue, 30 Jan 2018 12:50:57 +0400 Subject: [PATCH] Propagated a task timeout to a action execution It shall be possible to specify timeout for Mistral actions in order to cancel some long-performed action so that to provide predictable execution time for client service. Currently Mistral allows configure timeout on task and automatically changes task status to error. However mistral don't interrupt action execution. We need Mistral to terminate timed out action execution, because there might be the following issues: * several the same action executions can run at the same time breaking data consistency * stale action executions may lead to the massive resources consumption (memory, cpu..) Change-Id: I2a960110663627a54b8150917fd01eec68e8933d Signed-off-by: Vitalii Solodilov --- mistral/engine/action_queue.py | 9 ++++--- mistral/engine/actions.py | 23 +++++++++++----- mistral/engine/default_engine.py | 6 +++-- mistral/engine/policies.py | 4 ++- mistral/engine/tasks.py | 22 +++++++++++++-- mistral/executors/base.py | 4 ++- mistral/executors/default_executor.py | 24 ++++++++++------- mistral/executors/executor_server.py | 16 ++++++++--- mistral/rpc/clients.py | 7 ++++- .../unit/api/v2/test_action_executions.py | 27 +++++++++++++++++++ mistral/tests/unit/engine/test_environment.py | 11 +++++--- mistral/tests/unit/engine/test_policies.py | 24 +++++++++++++++++ mistral/tests/unit/engine/test_run_action.py | 3 ++- mistral/tests/unit/engine/test_safe_rerun.py | 2 +- 14 files changed, 145 insertions(+), 37 deletions(-) diff --git a/mistral/engine/action_queue.py b/mistral/engine/action_queue.py index 3806ec567..d527d6ccc 100644 --- a/mistral/engine/action_queue.py +++ b/mistral/engine/action_queue.py @@ -57,7 +57,8 @@ def _process_queue(queue): for operation, args in queue: if operation == _RUN_ACTION: - action_ex, action_def, target, execution_context = args + action_ex, action_def, target, execution_context, \ + timeout = args executor.run_action( action_ex.id, @@ -67,6 +68,7 @@ def _process_queue(queue): action_ex.runtime_context.get('safe_rerun', False), execution_context, target=target, + timeout=timeout ) elif operation == _ON_ACTION_COMPLETE: action_ex_id, result, wf_action = args @@ -120,8 +122,9 @@ def process(func): return decorate -def schedule_run_action(action_ex, action_def, target, execution_context): - args = (action_ex, action_def, target, execution_context) +def schedule_run_action(action_ex, action_def, target, execution_context, + timeout): + args = (action_ex, action_def, target, execution_context, timeout) _get_queue().append((_RUN_ACTION, args)) diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index baa34bd3b..a0b0a4a46 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -91,7 +91,8 @@ class Action(object): self.action_ex.state = state @abc.abstractmethod - def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False): + def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, + timeout=None): """Schedule action run. This method is needed to schedule action run so its result can @@ -100,6 +101,8 @@ class Action(object): executor asynchrony when executor doesn't immediately send a result). + :param timeout: a period of time in seconds after which execution of + action will be interrupted :param input_dict: Action input. :param target: Target (group of action executors). :param index: Action execution index. Makes sense for some types. @@ -111,13 +114,15 @@ class Action(object): @abc.abstractmethod def run(self, input_dict, target, index=0, desc='', save=True, - safe_rerun=False): + safe_rerun=False, timeout=None): """Immediately run action. This method runs method w/o scheduling its run for a later time. From engine perspective action will be processed in synchronous mode. + :param timeout: a period of time in seconds after which execution of + action will be interrupted :param input_dict: Action input. :param target: Target (group of action executors). :param index: Action execution index. Makes sense for some types. @@ -225,7 +230,8 @@ class PythonAction(Action): self._log_result(prev_state, result) @profiler.trace('action-schedule', hide_args=True) - def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False): + def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, + timeout=None): assert not self.action_ex # Assign the action execution ID here to minimize database calls. @@ -248,11 +254,12 @@ class PythonAction(Action): self.action_def, target, execution_context, + timeout=timeout ) @profiler.trace('action-run', hide_args=True) def run(self, input_dict, target, index=0, desc='', save=True, - safe_rerun=False): + safe_rerun=False, timeout=None): assert not self.action_ex input_dict = self._prepare_input(input_dict) @@ -284,7 +291,8 @@ class PythonAction(Action): safe_rerun, execution_context, target=target, - async_=False + async_=False, + timeout=timeout ) return self._prepare_output(result) @@ -512,7 +520,8 @@ class WorkflowAction(Action): pass @profiler.trace('workflkow-action-schedule', hide_args=True) - def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False): + def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, + timeout=None): assert not self.action_ex parent_wf_ex = self.task_ex.workflow_execution @@ -564,7 +573,7 @@ class WorkflowAction(Action): @profiler.trace('workflow-action-run', hide_args=True) def run(self, input_dict, target, index=0, desc='', save=True, - safe_rerun=True): + safe_rerun=True, timeout=None): raise NotImplementedError('Does not apply to this WorkflowAction.') def is_sync(self, input_dict): diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index 47223b763..85583e01d 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -76,6 +76,7 @@ class DefaultEngine(base.Engine): sync = params.get('run_sync') save = params.get('save_result') target = params.get('target') + timeout = params.get('timeout') is_action_sync = action.is_sync(action_input) @@ -84,11 +85,12 @@ class DefaultEngine(base.Engine): "Action does not support synchronous execution.") if not sync and (save or not is_action_sync): - action.schedule(action_input, target) + action.schedule(action_input, target, timeout=timeout) return action.action_ex.get_clone() - output = action.run(action_input, target, save=False) + output = action.run(action_input, target, save=False, + timeout=timeout) state = states.SUCCESS if output.is_success() else states.ERROR diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 036f90823..42890cdaf 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -28,6 +28,8 @@ import six _CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task' _COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task' +_FAIL_IF_INCOMPLETE_TASK_PATH = \ + 'mistral.engine.policies._fail_task_if_incomplete' def _log_task_delay(task_ex, delay_sec): @@ -423,7 +425,7 @@ class TimeoutPolicy(base.TaskPolicy): scheduler.schedule_call( None, - 'mistral.engine.policies._fail_task_if_incomplete', + _FAIL_IF_INCOMPLETE_TASK_PATH, self.delay, task_ex_id=task_ex.id, timeout=self.delay diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index 146c7bb67..7b60c94a2 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -424,7 +424,8 @@ class RegularTask(Task): action.schedule( input_dict, target, - safe_rerun=self.task_spec.get_safe_rerun() + safe_rerun=self.task_spec.get_safe_rerun(), + timeout=self._get_timeout() ) @profiler.trace('regular-task-get-target', hide_args=True) @@ -502,6 +503,22 @@ class RegularTask(Task): return actions.PythonAction(action_def, task_ex=self.task_ex) + def _get_timeout(self): + timeout = self.task_spec.get_policies().get_timeout() + + if not isinstance(timeout, (int, float)): + wf_ex = self.task_ex.workflow_execution + + ctx_view = data_flow.ContextView( + self.task_ex.in_context, + wf_ex.context, + wf_ex.input + ) + + timeout = expr.evaluate_recursively(data=timeout, context=ctx_view) + + return timeout if timeout > 0 else None + class WithItemsTask(RegularTask): """With-items task. @@ -588,7 +605,8 @@ class WithItemsTask(RegularTask): input_dict, target, index=i, - safe_rerun=self.task_spec.get_safe_rerun() + safe_rerun=self.task_spec.get_safe_rerun(), + timeout=self._get_timeout() ) self._decrease_capacity(1) diff --git a/mistral/executors/base.py b/mistral/executors/base.py index 5941824c0..474d4e40b 100644 --- a/mistral/executors/base.py +++ b/mistral/executors/base.py @@ -50,9 +50,11 @@ class Executor(object): @abc.abstractmethod def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, params, safe_rerun, execution_context, redelivered=False, - target=None, async_=True): + target=None, async_=True, timeout=None): """Runs action. + :param timeout: a period of time in seconds after which execution of + action will be interrupted :param action_ex_id: Corresponding action execution id. :param action_cls_str: Path to action class in dot notation. :param action_cls_attrs: Attributes of action class which diff --git a/mistral/executors/default_executor.py b/mistral/executors/default_executor.py index c1fdebe04..a2674fbad 100644 --- a/mistral/executors/default_executor.py +++ b/mistral/executors/default_executor.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from eventlet import timeout as ev_timeout from mistral_lib import actions as mistral_lib from oslo_log import log as logging from osprofiler import profiler @@ -35,7 +36,7 @@ class DefaultExecutor(base.Executor): @profiler.trace('default-executor-run-action', hide_args=True) def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, params, safe_rerun, execution_context, redelivered=False, - target=None, async_=True): + target=None, async_=True, timeout=None): """Runs action. :param action_ex_id: Action execution id. @@ -51,6 +52,8 @@ class DefaultExecutor(base.Executor): :param target: Target (group of action executors). :param async_: If True, run action in asynchronous mode (w/o waiting for completion). + :param timeout: a period of time in seconds after which execution of + action will be interrupted :return: Action result. """ @@ -102,14 +105,15 @@ class DefaultExecutor(base.Executor): # Run action. try: - - # NOTE(d0ugal): If the action is a subclass of mistral-lib we know - # that it expects to be passed the context. - if isinstance(action, mistral_lib.Action): - action_ctx = context.create_action_context(execution_context) - result = action.run(action_ctx) - else: - result = action.run() + with ev_timeout.Timeout(seconds=timeout): + # NOTE(d0ugal): If the action is a subclass of mistral-lib we + # know that it expects to be passed the context. + if isinstance(action, mistral_lib.Action): + action_ctx = context.create_action_context( + execution_context) + result = action.run(action_ctx) + else: + result = action.run() # Note: it's made for backwards compatibility with already # existing Mistral actions which don't return result as @@ -117,7 +121,7 @@ class DefaultExecutor(base.Executor): if not isinstance(result, mistral_lib.Result): result = mistral_lib.Result(data=result) - except Exception as e: + except BaseException as e: msg = ( "Failed to run action [action_ex_id=%s, action_cls='%s', " "attributes='%s', params='%s']\n %s" % ( diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py index a0dfdf4fd..a07e3a770 100644 --- a/mistral/executors/executor_server.py +++ b/mistral/executors/executor_server.py @@ -60,9 +60,14 @@ class ExecutorServer(service_base.MistralService): self._rpc_server.stop(graceful) def run_action(self, rpc_ctx, action_ex_id, action_cls_str, - action_cls_attrs, params, safe_rerun, execution_context): + action_cls_attrs, params, safe_rerun, execution_context, + timeout): """Receives calls over RPC to run action on executor. + :param timeout: a period of time in seconds after which execution of + action will be interrupted + :param execution_context: A dict of values providing information about + the current execution. :param rpc_ctx: RPC request context dictionary. :param action_ex_id: Action execution id. :param action_cls_str: Action class name. @@ -74,11 +79,13 @@ class ExecutorServer(service_base.MistralService): LOG.info( "Received RPC request 'run_action'[action_ex_id=%s, " - "action_cls_str=%s, action_cls_attrs=%s, params=%s]", + "action_cls_str=%s, action_cls_attrs=%s, params=%s, " + "timeout=%s]", action_ex_id, action_cls_str, action_cls_attrs, - utils.cut(params) + utils.cut(params), + timeout ) redelivered = rpc_ctx.redelivered or False @@ -90,7 +97,8 @@ class ExecutorServer(service_base.MistralService): params, safe_rerun, execution_context, - redelivered + redelivered, + timeout=timeout ) diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index aafc9ccb2..865936c0a 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -313,7 +313,7 @@ class ExecutorClient(exe.Executor): @profiler.trace('executor-client-run-action') def run_action(self, action_ex_id, action_cls_str, action_cls_attrs, params, safe_rerun, execution_context, redelivered=False, - target=None, async_=True): + target=None, async_=True, timeout=None): """Sends a request to run action to executor. :param action_ex_id: Action execution id. @@ -322,11 +322,15 @@ class ExecutorClient(exe.Executor): :param params: Action input parameters. :param safe_rerun: If true, action would be re-run if executor dies during execution. + :param execution_context: A dict of values providing information about + the current execution. :param redelivered: Tells if given action was run before on another executor. :param target: Target (group of action executors). :param async_: If True, run action in asynchronous mode (w/o waiting for completion). + :param timeout: a period of time in seconds after which execution of + action will be interrupted :return: Action result. """ @@ -337,6 +341,7 @@ class ExecutorClient(exe.Executor): 'params': params, 'safe_rerun': safe_rerun, 'execution_context': execution_context, + 'timeout': timeout } rpc_client_method = (self._client.async_call diff --git a/mistral/tests/unit/api/v2/test_action_executions.py b/mistral/tests/unit/api/v2/test_action_executions.py index 9f2b6acfd..cfa82f370 100644 --- a/mistral/tests/unit/api/v2/test_action_executions.py +++ b/mistral/tests/unit/api/v2/test_action_executions.py @@ -288,6 +288,33 @@ class TestActionExecutionsController(base.APITest): run_sync=True ) + @mock.patch.object(rpc_clients.EngineClient, 'start_action') + def test_post_with_timeout(self, f): + f.return_value = ACTION_EX_DB.to_dict() + + resp = self.app.post_json( + '/v2/action_executions', + { + 'name': 'std.echo', + 'input': "{}", + 'params': '{"timeout": 2}' + } + ) + + self.assertEqual(201, resp.status_int) + + action_exec = copy.deepcopy(ACTION_EX) + del action_exec['task_name'] + + self.assertDictEqual(action_exec, resp.json) + + f.assert_called_once_with( + action_exec['name'], + json.loads(action_exec['input']), + description=None, + timeout=2 + ) + @mock.patch.object(rpc_clients.EngineClient, 'start_action') def test_post_json(self, f): f.return_value = ACTION_EX_DB.to_dict() diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 90056db61..43197c1da 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -80,8 +80,7 @@ workflows: def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs, params, safe_rerun, execution_context, target=None, - async_=True): - + async_=True, timeout=None): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor() @@ -91,7 +90,10 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs, action_cls_attrs, params, safe_rerun, - execution_context=execution_context + execution_context=execution_context, + target=target, + async_=async_, + timeout=timeout ) @@ -188,7 +190,8 @@ class EnvironmentTest(base.EngineTestCase): 'workflow_name': wf1_ex.name, 'action_execution_id': a_ex.id, }, - target=TARGET + target=TARGET, + timeout=None ) def test_subworkflow_env_task_input(self): diff --git a/mistral/tests/unit/engine/test_policies.py b/mistral/tests/unit/engine/test_policies.py index 7fa7ccc77..c65a54f3d 100644 --- a/mistral/tests/unit/engine/test_policies.py +++ b/mistral/tests/unit/engine/test_policies.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from eventlet import timeout import mock from oslo_config import cfg import requests @@ -1435,6 +1436,29 @@ class PoliciesTest(base.EngineTestCase): self.await_workflow_error(wf_ex.id) + def test_action_timeout(self): + wb = """--- + version: '2.0' + wf1: + tasks: + task1: + action: std.sleep seconds=10 + timeout: 2 + """ + + wf_service.create_workflows(wb) + wf_ex = self.engine.start_workflow('wf1') + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + action_ex = task_ex.action_executions[0] + + with timeout.Timeout(8): + self.await_workflow_error(wf_ex.id) + self.await_task_error(task_ex.id) + self.await_action_error(action_ex.id) + def test_pause_before_policy(self): wb_service.create_workbook_v2(PAUSE_BEFORE_WB) diff --git a/mistral/tests/unit/engine/test_run_action.py b/mistral/tests/unit/engine/test_run_action.py index 040ab4412..2ee7d3209 100644 --- a/mistral/tests/unit/engine/test_run_action.py +++ b/mistral/tests/unit/engine/test_run_action.py @@ -334,5 +334,6 @@ class RunActionEngineTest(base.EngineTestCase): run_mock.assert_called_once_with( {'input': 'Hello'}, None, - save=False + save=False, + timeout=None ) diff --git a/mistral/tests/unit/engine/test_safe_rerun.py b/mistral/tests/unit/engine/test_safe_rerun.py index 2aae32539..6e0439e62 100644 --- a/mistral/tests/unit/engine/test_safe_rerun.py +++ b/mistral/tests/unit/engine/test_safe_rerun.py @@ -26,7 +26,7 @@ from mistral.workflow import states def _run_at_target(action_ex_id, action_class_str, attributes, action_params, safe_rerun, execution_context, target=None, - async_=True): + async_=True, timeout=None): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor()