From 1d8bbc3af812c012a2fa0c05b6fb4a5b3aa71b07 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Thu, 19 Oct 2023 14:48:11 +0300 Subject: [PATCH] Heartbeats for async actions added Action heartbeats is a must-have mechanism to ensure no mistral entity will be in RUNNING state forever. Sync actions already have this feature, but async not. Async actions completion is performed by external services, so async action heartbeats should be sent by them as well. Change-Id: Idd8b4070cbad0fc5aa9fc567b1d6903d107a736a Signed-off-by: Oleg Ovcharuk --- doc/source/user/main_features.rst | 67 +++++++++++++++ .../controllers/async_actions_heartbeats.py | 55 ++++++++++++ mistral/api/controllers/root.py | 3 + ...nc_heatbeats_flag_for_action_executions.py | 42 +++++++++ mistral/db/v2/api.py | 8 ++ mistral/db/v2/sqlalchemy/api.py | 25 +++++- mistral/db/v2/sqlalchemy/models.py | 3 + mistral/engine/actions.py | 23 +++-- mistral/engine/tasks.py | 21 ++++- mistral/executors/base.py | 6 +- mistral/executors/default_executor.py | 14 ++- mistral/executors/executor_server.py | 14 ++- mistral/lang/v2/task_defaults.py | 6 ++ mistral/lang/v2/tasks.py | 8 +- mistral/rpc/clients.py | 11 ++- mistral/services/action_heartbeat_checker.py | 5 ++ .../api/v2/test_async_actions_heartbeats.py | 86 +++++++++++++++++++ mistral/tests/unit/engine/test_environment.py | 9 +- mistral/tests/unit/engine/test_safe_rerun.py | 3 +- 19 files changed, 382 insertions(+), 27 deletions(-) create mode 100644 mistral/api/controllers/async_actions_heartbeats.py create mode 100644 mistral/db/sqlalchemy/migration/alembic_migrations/versions/045_async_heatbeats_flag_for_action_executions.py create mode 100644 mistral/tests/unit/api/v2/test_async_actions_heartbeats.py diff --git a/doc/source/user/main_features.rst b/doc/source/user/main_features.rst index 2812bd163..3a5b2891a 100644 --- a/doc/source/user/main_features.rst +++ b/doc/source/user/main_features.rst @@ -380,3 +380,70 @@ Task skip could be performed by following request:: "id": "", "state": "SKIPPED" } + + +Mechanism to Close Stuck Running Action Executions +-------------------------------------------------- + +It is possible that actions can be stuck in the ``RUNNING`` state, for example, +if the assigned executor dies, or the message that signals the completion of +the action is lost. This section describes a heartbeat-based solution to close +these forgotten action executions. The related configuration options are +``max_missed_heartbeats`` and ``evaluation_interval``. + +**Note**: If either of these options are 0 then the feature is **not enabled**. + +The default configuration is as follows: + +.. code-block:: cfg + + [action_heartbeat] + max_missed_heartbeats = 15 + evaluation_interval = 20 + first_heartbeat_timeout = 3600 + +- **max_missed_heartbeats** + + Defines the maximum amount of missed heartbeats to be allowed. + If the number of missed heartbeats exceeds this number, then the related + action execution state is changed to ``ERROR`` with the cause + ``Heartbeat wasn't received``. + +- **evaluation_interval** + + The interval between evaluations in seconds. + +- **first_heartbeat_timeout** + + The first heartbeat is handled differently, to provide a grace period + in case there is no available executor to handle the action execution. + For example when first_heartbeat_timeout = 3600, wait 3600 seconds + before closing the action executions that never received a heartbeat. + +It is also possible to configure same mechanism for async actions. To include +async action to heartbeat check rotation, you must mark +it using ``async-heartbeats-enabled`` parameter. +Example: + +.. code-block:: yaml + + version: '2.0' + wf_with_async_heartbeats: + tasks: + t0: + action: std.async_noop + async-heartbeats-enabled: true + +In this case async action will be failed if no heartbeats were sent for a long +time. Since async actions updates are out of Mistral responsibility, +to update heartbeats of marked async actions send follow request:: + + PUT /async_actions_heartbeats + + { + "action_ex_ids": [ + "", + "", + "", + ] + } diff --git a/mistral/api/controllers/async_actions_heartbeats.py b/mistral/api/controllers/async_actions_heartbeats.py new file mode 100644 index 000000000..cab4e6115 --- /dev/null +++ b/mistral/api/controllers/async_actions_heartbeats.py @@ -0,0 +1,55 @@ +# Copyright 2023 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from mistral.api.controllers import resource +from mistral.api.controllers.v2 import types + +from mistral.rpc import clients as rpc +from mistral.utils import rest_utils + + +class AsyncActionsHeartbeats(resource.Resource): + """AsyncActionsHeartbeats resource.""" + + action_ex_ids = [types.uuid] + + @classmethod + def sample(cls): + return cls( + action_ex_ids=[ + "0a68ce2a-e229-40a7-ab8b-256b212fe34d", + "0a68ce2a-e229-40a7-ab8b-256b212fe342" + ] + ) + + +class AsyncActionsHeartbeatsController(rest.RestController): + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + AsyncActionsHeartbeats, + body=AsyncActionsHeartbeats, + status_code=200 + ) + def put(self, async_actions_heartbeats): + engine = rpc.get_engine_client() + engine.process_action_heartbeats( + async_actions_heartbeats.action_ex_ids + ) + + return async_actions_heartbeats diff --git a/mistral/api/controllers/root.py b/mistral/api/controllers/root.py index 39d084fee..ce49b8e40 100644 --- a/mistral/api/controllers/root.py +++ b/mistral/api/controllers/root.py @@ -17,6 +17,7 @@ import pecan from wsme import types as wtypes import wsmeext.pecan as wsme_pecan +from mistral.api.controllers import async_actions_heartbeats from mistral.api.controllers import info from mistral.api.controllers import maintenance from mistral.api.controllers import resource @@ -65,6 +66,8 @@ class APIVersions(resource.Resource): class RootController(object): v2 = v2_root.Controller() info = info.InfoController() + async_actions_heartbeats = \ + async_actions_heartbeats.AsyncActionsHeartbeatsController() maintenance = maintenance.MaintenanceController() @wsme_pecan.wsexpose(APIVersions) diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/045_async_heatbeats_flag_for_action_executions.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/045_async_heatbeats_flag_for_action_executions.py new file mode 100644 index 000000000..b8e0c20f5 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/045_async_heatbeats_flag_for_action_executions.py @@ -0,0 +1,42 @@ +# Copyright 2023 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Async heartbeats flag for action executions + +Revision ID: 045 +Revises: 044 +Create Date: 2023-05-29 12:45:14.041458 + +""" + +# revision identifiers, used by Alembic. +revision = '045' +down_revision = '044' + +from alembic import op +from mistral.db.utils import column_exists +import sqlalchemy as sa + + +def upgrade(): + if not column_exists('action_executions_v2', 'async_heartbeats_enabled'): + op.add_column( + 'action_executions_v2', + sa.Column( + 'async_heartbeats_enabled', + sa.Boolean(), + nullable=True + ) + ) diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 6fd806d19..ceda7349c 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -565,6 +565,14 @@ def get_running_expired_sync_action_executions(expiration_time, ) +def get_running_expired_async_action_executions(expiration_time, + limit, session=None): + return IMPL.get_running_expired_async_action_executions( + expiration_time, + limit + ) + + def get_superfluous_executions(max_finished_executions, limit=None, columns=()): return IMPL.get_superfluous_executions( diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 87d438491..3d2e27083 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -994,10 +994,15 @@ def update_action_execution_heartbeat(id, session=None): raise exc.DBEntityNotFoundError now = utils.utc_now_sec() - session.query(models.ActionExecution). \ + count = session.query(models.ActionExecution). \ filter(models.ActionExecution.id == id). \ update({'last_heartbeat': now}) + if count == 0: + raise exc.DBEntityNotFoundError( + "ActionExecution not found [id=%s]" % id + ) + @b.session_aware() def delete_action_execution(id, session=None): @@ -1576,6 +1581,24 @@ def get_running_expired_sync_action_executions(expiration_time, return query.all() +@b.session_aware() +def get_running_expired_async_action_executions(expiration_time, + limit, session=None): + query = b.model_query(models.ActionExecution) + + query = query.filter( + models.ActionExecution.last_heartbeat < expiration_time + ) + query = query.filter_by(is_sync=False) + query = query.filter_by(async_heartbeats_enabled=True) + query = query.filter(models.ActionExecution.state == states.RUNNING) + + if limit: + query.limit(limit) + + return query.all() + + @b.session_aware() def get_superfluous_executions(max_finished_executions, limit=None, columns=(), session=None): diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index a3d4d012d..6c4bd3493 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -312,6 +312,9 @@ class ActionExecution(Execution): ) is_sync = sa.Column(sa.Boolean(), default=None, nullable=True) + # Whether the heartbeats for async action should be processed. + async_heartbeats_enabled = sa.Column(sa.BOOLEAN, default=False) + class WorkflowExecution(Execution): """Contains workflow execution information.""" diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index a259faa3e..f986f33ad 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -96,7 +96,7 @@ class Action(object, metaclass=abc.ABCMeta): @abc.abstractmethod def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, - timeout=None): + timeout=None, async_heartbeats_enabled=False): """Schedule action run. This method is needed to schedule action run so its result can @@ -113,6 +113,8 @@ class Action(object, metaclass=abc.ABCMeta): :param desc: Action execution description. :param safe_rerun: If true, action would be re-run if executor dies during execution. + :param async_heartbeats_enabled: If true, heartbeats will be processed + for async action execution. """ raise NotImplementedError @@ -157,7 +159,8 @@ class Action(object, metaclass=abc.ABCMeta): return res def _create_action_execution(self, input_dict, runtime_ctx, - desc='', action_ex_id=None, is_sync=True): + desc='', action_ex_id=None, is_sync=True, + async_heartbeats_enabled=False): action_ex_id = action_ex_id or utils.generate_unicode_uuid() values = { @@ -168,7 +171,8 @@ class Action(object, metaclass=abc.ABCMeta): 'runtime_context': runtime_ctx, 'workflow_namespace': self.namespace, 'description': desc, - 'is_sync': is_sync + 'is_sync': is_sync, + 'async_heartbeats_enabled': async_heartbeats_enabled } if self.task_ex: @@ -244,7 +248,7 @@ class RegularAction(Action): @profiler.trace('action-schedule', hide_args=True) def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, - timeout=None): + timeout=None, async_heartbeats_enabled=False): assert not self.action_ex self.action_desc.check_parameters(input_dict) @@ -277,7 +281,8 @@ class RegularAction(Action): self._prepare_runtime_context(index, safe_rerun), desc=desc, action_ex_id=action_ex_id, - is_sync=action.is_sync() + is_sync=action.is_sync(), + async_heartbeats_enabled=async_heartbeats_enabled ) def _run_action(): @@ -289,7 +294,8 @@ class RegularAction(Action): safe_rerun, self._prepare_execution_context(), target=target, - timeout=timeout + timeout=timeout, + async_heartbeats_enabled=async_heartbeats_enabled ) # Register an asynchronous command to run the action @@ -324,7 +330,8 @@ class RegularAction(Action): self._prepare_runtime_context(index, safe_rerun), desc=desc, action_ex_id=action_ex_id, - is_sync=action.is_sync() + is_sync=action.is_sync(), + async_heartbeats_enabled=False ) executor = exe.get_executor(cfg.CONF.executor.type) @@ -364,7 +371,7 @@ class WorkflowAction(Action): @profiler.trace('workflow-action-schedule', hide_args=True) def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, - timeout=None): + timeout=None, async_heartbeats_enabled=False): assert not self.action_ex self.validate_input(input_dict) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index fcab63e56..7ca6a9d50 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -559,6 +559,24 @@ class Task(object, metaclass=abc.ABCMeta): return False + def _get_async_heartbeats_enabled(self): + async_heartbeats_enabled = \ + self.task_spec.get_async_heartbeats_enabled() + + if async_heartbeats_enabled is not None: + return async_heartbeats_enabled + + task_default = self.wf_spec.get_task_defaults() + + if task_default: + default_async_heartbeats_enabled = \ + task_default.get_async_heartbeats_enabled() + + if default_async_heartbeats_enabled is not None: + return default_async_heartbeats_enabled + + return False + def _get_action_defaults(self): action_name = self.task_spec.get_action_name() @@ -703,7 +721,8 @@ class RegularTask(Task): input_dict, target, safe_rerun=self._get_safe_rerun(), - timeout=self._get_timeout() + timeout=self._get_timeout(), + async_heartbeats_enabled=self._get_async_heartbeats_enabled() ) except exc.MistralException as e: self.complete(states.ERROR, e.message) diff --git a/mistral/executors/base.py b/mistral/executors/base.py index 9c1fcf560..fbe51c6b6 100644 --- a/mistral/executors/base.py +++ b/mistral/executors/base.py @@ -44,7 +44,8 @@ class Executor(object, metaclass=abc.ABCMeta): @abc.abstractmethod def run_action(self, action, action_ex_id, safe_rerun, exec_ctx, - redelivered=False, target=None, async_=True, timeout=None): + redelivered=False, target=None, async_=True, timeout=None, + async_heartbeats_enabled=False): """Runs the given action. :param action: Action to run. @@ -60,6 +61,9 @@ class Executor(object, metaclass=abc.ABCMeta): for completion). :param timeout: a period of time in seconds after which execution of action will be interrupted + :param async_heartbeats_enabled: If true, executor will not send + any heartbeats automatically, because they should be received + by api. :return: Action result. """ raise NotImplementedError() diff --git a/mistral/executors/default_executor.py b/mistral/executors/default_executor.py index 8f4e088d1..7b39cbbcb 100644 --- a/mistral/executors/default_executor.py +++ b/mistral/executors/default_executor.py @@ -33,7 +33,8 @@ class DefaultExecutor(base.Executor): @profiler.trace('default-executor-run-action', hide_args=True) def run_action(self, action, action_ex_id, safe_rerun, exec_ctx, - redelivered=False, target=None, async_=True, timeout=None): + redelivered=False, target=None, async_=True, timeout=None, + async_heartbeats_enabled=False): """Runs action. :param action: Action to run. @@ -47,12 +48,16 @@ class DefaultExecutor(base.Executor): :param async_: If True, run action in asynchronous mode (w/o waiting for completion). :param timeout: a period of time in seconds after which execution of - action will be interrupted + action will be interrupted. + :param async_heartbeats_enabled: If true, executor will not send + any heartbeats automatically, because they should be received + by api. :return: Action result. """ try: - action_heartbeat_sender.add_action(action_ex_id) + if not async_heartbeats_enabled: + action_heartbeat_sender.add_action(action_ex_id) return self._do_run_action( action, @@ -63,7 +68,8 @@ class DefaultExecutor(base.Executor): timeout ) finally: - action_heartbeat_sender.remove_action(action_ex_id) + if not async_heartbeats_enabled: + action_heartbeat_sender.remove_action(action_ex_id) def _do_run_action(self, action, action_ex_id, exec_ctx, redelivered, safe_rerun, diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py index dabddd461..a3ce56e3b 100644 --- a/mistral/executors/executor_server.py +++ b/mistral/executors/executor_server.py @@ -71,7 +71,7 @@ class ExecutorServer(service_base.MistralService): self._rpc_server.stop(graceful) def run_action(self, rpc_ctx, action, action_ex_id, safe_rerun, exec_ctx, - timeout): + timeout, async_heartbeats_enabled): """Receives calls over RPC to run action on executor. @@ -83,14 +83,19 @@ class ExecutorServer(service_base.MistralService): the current execution. :param timeout: a period of time in seconds after which execution of action will be interrupted + :param async_heartbeats_enabled: If true, executor will not send + any heartbeats automatically, because they should be received + by api. :return: Action result. """ LOG.debug( "Received RPC request 'run_action'" - "[action=%s, action_ex_id=%s, timeout=%s]", + "[action=%s, action_ex_id=%s, timeout=%s, " + "async_heartbeats_enabled=%s]", action, action_ex_id, - timeout + timeout, + async_heartbeats_enabled ) redelivered = rpc_ctx.redelivered or False @@ -101,7 +106,8 @@ class ExecutorServer(service_base.MistralService): safe_rerun, exec_ctx, redelivered, - timeout=timeout + timeout=timeout, + async_heartbeats_enabled=async_heartbeats_enabled ) LOG.debug( diff --git a/mistral/lang/v2/task_defaults.py b/mistral/lang/v2/task_defaults.py index 9b0af333e..8b2f3505d 100644 --- a/mistral/lang/v2/task_defaults.py +++ b/mistral/lang/v2/task_defaults.py @@ -42,6 +42,7 @@ class TaskDefaultsSpec(base.BaseSpec): "on-error": on_clause.OnClauseSpec.get_schema(), "on-skip": on_clause.OnClauseSpec.get_schema(), "safe-rerun": types.EXPRESSION_OR_BOOLEAN, + "async-heartbeats-enabled": types.EXPRESSION_OR_BOOLEAN, "requires": { "oneOf": [types.NONEMPTY_STRING, types.UNIQUE_STRING_LIST] } @@ -75,6 +76,7 @@ class TaskDefaultsSpec(base.BaseSpec): self._on_skip = self._spec_property('on-skip', on_spec_cls) self._safe_rerun = data.get('safe-rerun') + self._async_heartbeats_enabled = data.get('async-heartbeats-enabled') # TODO(rakhmerov): 'requires' should reside in a different spec for # reverse workflows. @@ -84,6 +86,7 @@ class TaskDefaultsSpec(base.BaseSpec): super(TaskDefaultsSpec, self).validate_schema() self.validate_expr(self._data.get('safe-rerun', {})) + self.validate_expr(self._data.get('async-heartbeats-enabled', {})) def validate_semantics(self): # Validate YAQL expressions. @@ -119,6 +122,9 @@ class TaskDefaultsSpec(base.BaseSpec): def get_safe_rerun(self): return self._safe_rerun + def get_async_heartbeats_enabled(self): + return self._async_heartbeats_enabled + def get_requires(self): if isinstance(self._requires, str): return [self._requires] diff --git a/mistral/lang/v2/tasks.py b/mistral/lang/v2/tasks.py index 8bfaedda7..1123a830e 100644 --- a/mistral/lang/v2/tasks.py +++ b/mistral/lang/v2/tasks.py @@ -85,7 +85,8 @@ class TaskSpec(base.BaseSpec): "fail-on": types.EXPRESSION_OR_BOOLEAN, "target": types.NONEMPTY_STRING, "keep-result": types.EXPRESSION_OR_BOOLEAN, - "safe-rerun": types.EXPRESSION_OR_BOOLEAN + "safe-rerun": types.EXPRESSION_OR_BOOLEAN, + "async-heartbeats-enabled": types.EXPRESSION_OR_BOOLEAN }, "additionalProperties": False, "anyOf": [ @@ -136,6 +137,7 @@ class TaskSpec(base.BaseSpec): self._target = data.get('target') self._keep_result = data.get('keep-result', True) self._safe_rerun = data.get('safe-rerun') + self._async_heartbeats_enabled = data.get('async-heartbeats-enabled') self._process_action_and_workflow() @@ -158,6 +160,7 @@ class TaskSpec(base.BaseSpec): self.validate_expr(self._data.get('publish-on-skip', {})) self.validate_expr(self._data.get('keep-result', {})) self.validate_expr(self._data.get('safe-rerun', {})) + self.validate_expr(self._data.get('async-heartbeats-enabled', {})) def _validate_name(self): task_name = self._data.get('name') @@ -276,6 +279,9 @@ class TaskSpec(base.BaseSpec): def get_safe_rerun(self): return self._safe_rerun + def get_async_heartbeats_enabled(self): + return self._async_heartbeats_enabled + def get_type(self): return (WORKFLOW_TASK_TYPE if self._workflow else ACTION_TASK_TYPE) diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index f64110b1b..0e596d266 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -433,7 +433,8 @@ class ExecutorClient(exe.Executor): @profiler.trace('executor-client-run-action') def run_action(self, action, action_ex_id, safe_rerun, exec_ctx, - redelivered=False, target=None, async_=True, timeout=None): + redelivered=False, target=None, async_=True, timeout=None, + async_heartbeats_enabled=False): """Sends a request to run action to executor. :param action: Action to run. @@ -448,7 +449,10 @@ class ExecutorClient(exe.Executor): :param async_: If True, run action in asynchronous mode (w/o waiting for completion). :param timeout: a period of time in seconds after which execution of - action will be interrupted + action will be interrupted. + :param async_heartbeats_enabled: If true, executor will not send + any heartbeats automatically, because they should be received + by api. :return: Action result. """ rpc_kwargs = { @@ -456,7 +460,8 @@ class ExecutorClient(exe.Executor): 'action_ex_id': action_ex_id, 'safe_rerun': safe_rerun, 'exec_ctx': exec_ctx, - 'timeout': timeout + 'timeout': timeout, + 'async_heartbeats_enabled': async_heartbeats_enabled } rpc_client_method = ( diff --git a/mistral/services/action_heartbeat_checker.py b/mistral/services/action_heartbeat_checker.py index f1fed2288..06457c5b8 100644 --- a/mistral/services/action_heartbeat_checker.py +++ b/mistral/services/action_heartbeat_checker.py @@ -51,6 +51,11 @@ def handle_expired_actions(): CONF.action_heartbeat.batch_size ) + action_exs.extend(db_api.get_running_expired_async_action_executions( + exp_date, + CONF.action_heartbeat.batch_size + )) + LOG.debug("Found {} running and expired actions.", len(action_exs)) if action_exs: diff --git a/mistral/tests/unit/api/v2/test_async_actions_heartbeats.py b/mistral/tests/unit/api/v2/test_async_actions_heartbeats.py new file mode 100644 index 000000000..5654f5c2f --- /dev/null +++ b/mistral/tests/unit/api/v2/test_async_actions_heartbeats.py @@ -0,0 +1,86 @@ +# Copyright 2019 - Nokia Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.services import workflows as wf_service +from mistral.tests.unit.api import base +from mistral.tests.unit.engine import base as engine_base + + +class TestAsyncActionsHeartbeats(engine_base.EngineTestCase, base.APITest): + def setUp(self): + # We need to override configuration values before starting engine. + self.override_config('check_interval', 1, 'action_heartbeat') + self.override_config('max_missed_heartbeats', 1, 'action_heartbeat') + self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat') + + super(TestAsyncActionsHeartbeats, self).setUp() + + self.override_config('type', 'remote', 'executor') + + def test_put_empty(self): + resp = self.app.put_json( + '/async_actions_heartbeats', + headers={'Accept': 'application/json'}, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn('Missing argument', resp.json['faultstring']) + + def test_put_wrong_uuid_format(self): + resp = self.app.put_json( + '/async_actions_heartbeats', + headers={'Accept': 'application/json'}, + params={"action_ex_ids": ["qwe"]}, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn('Expected a uuid', resp.json['faultstring']) + + def test_put_no_action_ids(self): + resp = self.app.put_json( + '/async_actions_heartbeats', + headers={'Accept': 'application/json'}, + params={"action_ex_ids": []} + ) + + self.assertEqual(200, resp.status_int) + + def test_put_normal_action_ids(self): + resp = self.app.put_json( + '/async_actions_heartbeats', + headers={'Accept': 'application/json'}, + params={"action_ex_ids": ["0b4ce884-9154-47b0-aec3-e8aba1a6febc"]} + ) + + self.assertEqual(200, resp.status_int) + + def test_action_fail_without_heartbeats(self): + wf_text = """--- + version: '2.0' + wf: + tasks: + task1: + action: std.async_noop + async-heartbeats-enabled: true + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + # The workflow should fail because the action of "task1" should be + # failed automatically by the action execution heartbeat checker. + self.await_workflow_error(wf_ex.id) diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 6d034fadf..25a1fa1ad 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -81,7 +81,8 @@ workflows: def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, target=None, - async_=True, timeout=None): + async_=True, timeout=None, + async_heartbeats_enabled=False): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor() @@ -92,7 +93,8 @@ def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, target=None, exec_ctx=exec_ctx, target=target, async_=async_, - timeout=timeout + timeout=timeout, + async_heartbeats_enabled=async_heartbeats_enabled ) @@ -183,7 +185,8 @@ class EnvironmentTest(base.EngineTestCase): 'action_execution_id': a_ex.id, }, target=TARGET, - timeout=None + timeout=None, + async_heartbeats_enabled=False, ) def test_subworkflow_env_task_input(self): diff --git a/mistral/tests/unit/engine/test_safe_rerun.py b/mistral/tests/unit/engine/test_safe_rerun.py index 3edcce4f8..152c5ce78 100644 --- a/mistral/tests/unit/engine/test_safe_rerun.py +++ b/mistral/tests/unit/engine/test_safe_rerun.py @@ -25,7 +25,8 @@ from mistral.workflow import states def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, - target=None, async_=True, timeout=None): + target=None, async_=True, timeout=None, + async_heartbeats_enabled=False): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor()