Propagated a task timeout to a action execution
It shall be possible to specify timeout for Mistral actions in order to cancel some long-performed action so that to provide predictable execution time for client service. Currently Mistral allows configure timeout on task and automatically changes task status to error. However mistral don't interrupt action execution. We need Mistral to terminate timed out action execution, because there might be the following issues: * several the same action executions can run at the same time breaking data consistency * stale action executions may lead to the massive resources consumption (memory, cpu..) Change-Id: I2a960110663627a54b8150917fd01eec68e8933d Signed-off-by: Vitalii Solodilov <mcdkr@yandex.ru>
This commit is contained in:
parent
540f8d67e7
commit
b79f91e9ec
|
@ -57,7 +57,8 @@ def _process_queue(queue):
|
|||
|
||||
for operation, args in queue:
|
||||
if operation == _RUN_ACTION:
|
||||
action_ex, action_def, target, execution_context = args
|
||||
action_ex, action_def, target, execution_context, \
|
||||
timeout = args
|
||||
|
||||
executor.run_action(
|
||||
action_ex.id,
|
||||
|
@ -67,6 +68,7 @@ def _process_queue(queue):
|
|||
action_ex.runtime_context.get('safe_rerun', False),
|
||||
execution_context,
|
||||
target=target,
|
||||
timeout=timeout
|
||||
)
|
||||
elif operation == _ON_ACTION_COMPLETE:
|
||||
action_ex_id, result, wf_action = args
|
||||
|
@ -120,8 +122,9 @@ def process(func):
|
|||
return decorate
|
||||
|
||||
|
||||
def schedule_run_action(action_ex, action_def, target, execution_context):
|
||||
args = (action_ex, action_def, target, execution_context)
|
||||
def schedule_run_action(action_ex, action_def, target, execution_context,
|
||||
timeout):
|
||||
args = (action_ex, action_def, target, execution_context, timeout)
|
||||
_get_queue().append((_RUN_ACTION, args))
|
||||
|
||||
|
||||
|
|
|
@ -91,7 +91,8 @@ class Action(object):
|
|||
self.action_ex.state = state
|
||||
|
||||
@abc.abstractmethod
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
|
||||
timeout=None):
|
||||
"""Schedule action run.
|
||||
|
||||
This method is needed to schedule action run so its result can
|
||||
|
@ -100,6 +101,8 @@ class Action(object):
|
|||
executor asynchrony when executor doesn't immediately send a
|
||||
result).
|
||||
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:param input_dict: Action input.
|
||||
:param target: Target (group of action executors).
|
||||
:param index: Action execution index. Makes sense for some types.
|
||||
|
@ -111,13 +114,15 @@ class Action(object):
|
|||
|
||||
@abc.abstractmethod
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
safe_rerun=False):
|
||||
safe_rerun=False, timeout=None):
|
||||
"""Immediately run action.
|
||||
|
||||
This method runs method w/o scheduling its run for a later time.
|
||||
From engine perspective action will be processed in synchronous
|
||||
mode.
|
||||
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:param input_dict: Action input.
|
||||
:param target: Target (group of action executors).
|
||||
:param index: Action execution index. Makes sense for some types.
|
||||
|
@ -225,7 +230,8 @@ class PythonAction(Action):
|
|||
self._log_result(prev_state, result)
|
||||
|
||||
@profiler.trace('action-schedule', hide_args=True)
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
|
||||
timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
# Assign the action execution ID here to minimize database calls.
|
||||
|
@ -248,11 +254,12 @@ class PythonAction(Action):
|
|||
self.action_def,
|
||||
target,
|
||||
execution_context,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
@profiler.trace('action-run', hide_args=True)
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
safe_rerun=False):
|
||||
safe_rerun=False, timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
input_dict = self._prepare_input(input_dict)
|
||||
|
@ -284,7 +291,8 @@ class PythonAction(Action):
|
|||
safe_rerun,
|
||||
execution_context,
|
||||
target=target,
|
||||
async_=False
|
||||
async_=False,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
return self._prepare_output(result)
|
||||
|
@ -512,7 +520,8 @@ class WorkflowAction(Action):
|
|||
pass
|
||||
|
||||
@profiler.trace('workflkow-action-schedule', hide_args=True)
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False):
|
||||
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
|
||||
timeout=None):
|
||||
assert not self.action_ex
|
||||
|
||||
parent_wf_ex = self.task_ex.workflow_execution
|
||||
|
@ -564,7 +573,7 @@ class WorkflowAction(Action):
|
|||
|
||||
@profiler.trace('workflow-action-run', hide_args=True)
|
||||
def run(self, input_dict, target, index=0, desc='', save=True,
|
||||
safe_rerun=True):
|
||||
safe_rerun=True, timeout=None):
|
||||
raise NotImplementedError('Does not apply to this WorkflowAction.')
|
||||
|
||||
def is_sync(self, input_dict):
|
||||
|
|
|
@ -76,6 +76,7 @@ class DefaultEngine(base.Engine):
|
|||
sync = params.get('run_sync')
|
||||
save = params.get('save_result')
|
||||
target = params.get('target')
|
||||
timeout = params.get('timeout')
|
||||
|
||||
is_action_sync = action.is_sync(action_input)
|
||||
|
||||
|
@ -84,11 +85,12 @@ class DefaultEngine(base.Engine):
|
|||
"Action does not support synchronous execution.")
|
||||
|
||||
if not sync and (save or not is_action_sync):
|
||||
action.schedule(action_input, target)
|
||||
action.schedule(action_input, target, timeout=timeout)
|
||||
|
||||
return action.action_ex.get_clone()
|
||||
|
||||
output = action.run(action_input, target, save=False)
|
||||
output = action.run(action_input, target, save=False,
|
||||
timeout=timeout)
|
||||
|
||||
state = states.SUCCESS if output.is_success() else states.ERROR
|
||||
|
||||
|
|
|
@ -28,6 +28,8 @@ import six
|
|||
|
||||
_CONTINUE_TASK_PATH = 'mistral.engine.policies._continue_task'
|
||||
_COMPLETE_TASK_PATH = 'mistral.engine.policies._complete_task'
|
||||
_FAIL_IF_INCOMPLETE_TASK_PATH = \
|
||||
'mistral.engine.policies._fail_task_if_incomplete'
|
||||
|
||||
|
||||
def _log_task_delay(task_ex, delay_sec):
|
||||
|
@ -423,7 +425,7 @@ class TimeoutPolicy(base.TaskPolicy):
|
|||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.engine.policies._fail_task_if_incomplete',
|
||||
_FAIL_IF_INCOMPLETE_TASK_PATH,
|
||||
self.delay,
|
||||
task_ex_id=task_ex.id,
|
||||
timeout=self.delay
|
||||
|
|
|
@ -424,7 +424,8 @@ class RegularTask(Task):
|
|||
action.schedule(
|
||||
input_dict,
|
||||
target,
|
||||
safe_rerun=self.task_spec.get_safe_rerun()
|
||||
safe_rerun=self.task_spec.get_safe_rerun(),
|
||||
timeout=self._get_timeout()
|
||||
)
|
||||
|
||||
@profiler.trace('regular-task-get-target', hide_args=True)
|
||||
|
@ -502,6 +503,22 @@ class RegularTask(Task):
|
|||
|
||||
return actions.PythonAction(action_def, task_ex=self.task_ex)
|
||||
|
||||
def _get_timeout(self):
|
||||
timeout = self.task_spec.get_policies().get_timeout()
|
||||
|
||||
if not isinstance(timeout, (int, float)):
|
||||
wf_ex = self.task_ex.workflow_execution
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
self.task_ex.in_context,
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
|
||||
timeout = expr.evaluate_recursively(data=timeout, context=ctx_view)
|
||||
|
||||
return timeout if timeout > 0 else None
|
||||
|
||||
|
||||
class WithItemsTask(RegularTask):
|
||||
"""With-items task.
|
||||
|
@ -588,7 +605,8 @@ class WithItemsTask(RegularTask):
|
|||
input_dict,
|
||||
target,
|
||||
index=i,
|
||||
safe_rerun=self.task_spec.get_safe_rerun()
|
||||
safe_rerun=self.task_spec.get_safe_rerun(),
|
||||
timeout=self._get_timeout()
|
||||
)
|
||||
|
||||
self._decrease_capacity(1)
|
||||
|
|
|
@ -50,9 +50,11 @@ class Executor(object):
|
|||
@abc.abstractmethod
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
target=None, async_=True, timeout=None):
|
||||
"""Runs action.
|
||||
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:param action_ex_id: Corresponding action execution id.
|
||||
:param action_cls_str: Path to action class in dot notation.
|
||||
:param action_cls_attrs: Attributes of action class which
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from eventlet import timeout as ev_timeout
|
||||
from mistral_lib import actions as mistral_lib
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
@ -35,7 +36,7 @@ class DefaultExecutor(base.Executor):
|
|||
@profiler.trace('default-executor-run-action', hide_args=True)
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
target=None, async_=True, timeout=None):
|
||||
"""Runs action.
|
||||
|
||||
:param action_ex_id: Action execution id.
|
||||
|
@ -51,6 +52,8 @@ class DefaultExecutor(base.Executor):
|
|||
:param target: Target (group of action executors).
|
||||
:param async_: If True, run action in asynchronous mode (w/o waiting
|
||||
for completion).
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:return: Action result.
|
||||
"""
|
||||
|
||||
|
@ -102,11 +105,12 @@ class DefaultExecutor(base.Executor):
|
|||
|
||||
# Run action.
|
||||
try:
|
||||
|
||||
# NOTE(d0ugal): If the action is a subclass of mistral-lib we know
|
||||
# that it expects to be passed the context.
|
||||
with ev_timeout.Timeout(seconds=timeout):
|
||||
# NOTE(d0ugal): If the action is a subclass of mistral-lib we
|
||||
# know that it expects to be passed the context.
|
||||
if isinstance(action, mistral_lib.Action):
|
||||
action_ctx = context.create_action_context(execution_context)
|
||||
action_ctx = context.create_action_context(
|
||||
execution_context)
|
||||
result = action.run(action_ctx)
|
||||
else:
|
||||
result = action.run()
|
||||
|
@ -117,7 +121,7 @@ class DefaultExecutor(base.Executor):
|
|||
if not isinstance(result, mistral_lib.Result):
|
||||
result = mistral_lib.Result(data=result)
|
||||
|
||||
except Exception as e:
|
||||
except BaseException as e:
|
||||
msg = (
|
||||
"Failed to run action [action_ex_id=%s, action_cls='%s', "
|
||||
"attributes='%s', params='%s']\n %s" % (
|
||||
|
|
|
@ -60,9 +60,14 @@ class ExecutorServer(service_base.MistralService):
|
|||
self._rpc_server.stop(graceful)
|
||||
|
||||
def run_action(self, rpc_ctx, action_ex_id, action_cls_str,
|
||||
action_cls_attrs, params, safe_rerun, execution_context):
|
||||
action_cls_attrs, params, safe_rerun, execution_context,
|
||||
timeout):
|
||||
"""Receives calls over RPC to run action on executor.
|
||||
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:param execution_context: A dict of values providing information about
|
||||
the current execution.
|
||||
:param rpc_ctx: RPC request context dictionary.
|
||||
:param action_ex_id: Action execution id.
|
||||
:param action_cls_str: Action class name.
|
||||
|
@ -74,11 +79,13 @@ class ExecutorServer(service_base.MistralService):
|
|||
|
||||
LOG.info(
|
||||
"Received RPC request 'run_action'[action_ex_id=%s, "
|
||||
"action_cls_str=%s, action_cls_attrs=%s, params=%s]",
|
||||
"action_cls_str=%s, action_cls_attrs=%s, params=%s, "
|
||||
"timeout=%s]",
|
||||
action_ex_id,
|
||||
action_cls_str,
|
||||
action_cls_attrs,
|
||||
utils.cut(params)
|
||||
utils.cut(params),
|
||||
timeout
|
||||
)
|
||||
|
||||
redelivered = rpc_ctx.redelivered or False
|
||||
|
@ -90,7 +97,8 @@ class ExecutorServer(service_base.MistralService):
|
|||
params,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
redelivered
|
||||
redelivered,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
|
||||
|
|
|
@ -313,7 +313,7 @@ class ExecutorClient(exe.Executor):
|
|||
@profiler.trace('executor-client-run-action')
|
||||
def run_action(self, action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, execution_context, redelivered=False,
|
||||
target=None, async_=True):
|
||||
target=None, async_=True, timeout=None):
|
||||
"""Sends a request to run action to executor.
|
||||
|
||||
:param action_ex_id: Action execution id.
|
||||
|
@ -322,11 +322,15 @@ class ExecutorClient(exe.Executor):
|
|||
:param params: Action input parameters.
|
||||
:param safe_rerun: If true, action would be re-run if executor dies
|
||||
during execution.
|
||||
:param execution_context: A dict of values providing information about
|
||||
the current execution.
|
||||
:param redelivered: Tells if given action was run before on another
|
||||
executor.
|
||||
:param target: Target (group of action executors).
|
||||
:param async_: If True, run action in asynchronous mode (w/o waiting
|
||||
for completion).
|
||||
:param timeout: a period of time in seconds after which execution of
|
||||
action will be interrupted
|
||||
:return: Action result.
|
||||
"""
|
||||
|
||||
|
@ -337,6 +341,7 @@ class ExecutorClient(exe.Executor):
|
|||
'params': params,
|
||||
'safe_rerun': safe_rerun,
|
||||
'execution_context': execution_context,
|
||||
'timeout': timeout
|
||||
}
|
||||
|
||||
rpc_client_method = (self._client.async_call
|
||||
|
|
|
@ -288,6 +288,33 @@ class TestActionExecutionsController(base.APITest):
|
|||
run_sync=True
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
|
||||
def test_post_with_timeout(self, f):
|
||||
f.return_value = ACTION_EX_DB.to_dict()
|
||||
|
||||
resp = self.app.post_json(
|
||||
'/v2/action_executions',
|
||||
{
|
||||
'name': 'std.echo',
|
||||
'input': "{}",
|
||||
'params': '{"timeout": 2}'
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(201, resp.status_int)
|
||||
|
||||
action_exec = copy.deepcopy(ACTION_EX)
|
||||
del action_exec['task_name']
|
||||
|
||||
self.assertDictEqual(action_exec, resp.json)
|
||||
|
||||
f.assert_called_once_with(
|
||||
action_exec['name'],
|
||||
json.loads(action_exec['input']),
|
||||
description=None,
|
||||
timeout=2
|
||||
)
|
||||
|
||||
@mock.patch.object(rpc_clients.EngineClient, 'start_action')
|
||||
def test_post_json(self, f):
|
||||
f.return_value = ACTION_EX_DB.to_dict()
|
||||
|
|
|
@ -80,8 +80,7 @@ workflows:
|
|||
|
||||
def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
|
||||
params, safe_rerun, execution_context, target=None,
|
||||
async_=True):
|
||||
|
||||
async_=True, timeout=None):
|
||||
# We'll just call executor directly for testing purposes.
|
||||
executor = d_exe.DefaultExecutor()
|
||||
|
||||
|
@ -91,7 +90,10 @@ def _run_at_target(action_ex_id, action_cls_str, action_cls_attrs,
|
|||
action_cls_attrs,
|
||||
params,
|
||||
safe_rerun,
|
||||
execution_context=execution_context
|
||||
execution_context=execution_context,
|
||||
target=target,
|
||||
async_=async_,
|
||||
timeout=timeout
|
||||
)
|
||||
|
||||
|
||||
|
@ -188,7 +190,8 @@ class EnvironmentTest(base.EngineTestCase):
|
|||
'workflow_name': wf1_ex.name,
|
||||
'action_execution_id': a_ex.id,
|
||||
},
|
||||
target=TARGET
|
||||
target=TARGET,
|
||||
timeout=None
|
||||
)
|
||||
|
||||
def test_subworkflow_env_task_input(self):
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from eventlet import timeout
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
import requests
|
||||
|
@ -1435,6 +1436,29 @@ class PoliciesTest(base.EngineTestCase):
|
|||
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
|
||||
def test_action_timeout(self):
|
||||
wb = """---
|
||||
version: '2.0'
|
||||
wf1:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.sleep seconds=10
|
||||
timeout: 2
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wb)
|
||||
wf_ex = self.engine.start_workflow('wf1')
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
action_ex = task_ex.action_executions[0]
|
||||
|
||||
with timeout.Timeout(8):
|
||||
self.await_workflow_error(wf_ex.id)
|
||||
self.await_task_error(task_ex.id)
|
||||
self.await_action_error(action_ex.id)
|
||||
|
||||
def test_pause_before_policy(self):
|
||||
wb_service.create_workbook_v2(PAUSE_BEFORE_WB)
|
||||
|
||||
|
|
|
@ -334,5 +334,6 @@ class RunActionEngineTest(base.EngineTestCase):
|
|||
run_mock.assert_called_once_with(
|
||||
{'input': 'Hello'},
|
||||
None,
|
||||
save=False
|
||||
save=False,
|
||||
timeout=None
|
||||
)
|
||||
|
|
|
@ -26,7 +26,7 @@ from mistral.workflow import states
|
|||
|
||||
def _run_at_target(action_ex_id, action_class_str, attributes,
|
||||
action_params, safe_rerun, execution_context, target=None,
|
||||
async_=True):
|
||||
async_=True, timeout=None):
|
||||
# We'll just call executor directly for testing purposes.
|
||||
executor = d_exe.DefaultExecutor()
|
||||
|
||||
|
|
Loading…
Reference in New Issue