diff --git a/doc/source/user/main_features.rst b/doc/source/user/main_features.rst index 2812bd163..3a5b2891a 100644 --- a/doc/source/user/main_features.rst +++ b/doc/source/user/main_features.rst @@ -380,3 +380,70 @@ Task skip could be performed by following request:: "id": "", "state": "SKIPPED" } + + +Mechanism to Close Stuck Running Action Executions +-------------------------------------------------- + +It is possible that actions can be stuck in the ``RUNNING`` state, for example, +if the assigned executor dies, or the message that signals the completion of +the action is lost. This section describes a heartbeat-based solution to close +these forgotten action executions. The related configuration options are +``max_missed_heartbeats`` and ``evaluation_interval``. + +**Note**: If either of these options are 0 then the feature is **not enabled**. + +The default configuration is as follows: + +.. code-block:: cfg + + [action_heartbeat] + max_missed_heartbeats = 15 + evaluation_interval = 20 + first_heartbeat_timeout = 3600 + +- **max_missed_heartbeats** + + Defines the maximum amount of missed heartbeats to be allowed. + If the number of missed heartbeats exceeds this number, then the related + action execution state is changed to ``ERROR`` with the cause + ``Heartbeat wasn't received``. + +- **evaluation_interval** + + The interval between evaluations in seconds. + +- **first_heartbeat_timeout** + + The first heartbeat is handled differently, to provide a grace period + in case there is no available executor to handle the action execution. + For example when first_heartbeat_timeout = 3600, wait 3600 seconds + before closing the action executions that never received a heartbeat. + +It is also possible to configure same mechanism for async actions. To include +async action to heartbeat check rotation, you must mark +it using ``async-heartbeats-enabled`` parameter. +Example: + +.. code-block:: yaml + + version: '2.0' + wf_with_async_heartbeats: + tasks: + t0: + action: std.async_noop + async-heartbeats-enabled: true + +In this case async action will be failed if no heartbeats were sent for a long +time. Since async actions updates are out of Mistral responsibility, +to update heartbeats of marked async actions send follow request:: + + PUT /async_actions_heartbeats + + { + "action_ex_ids": [ + "", + "", + "", + ] + } diff --git a/mistral/api/controllers/async_actions_heartbeats.py b/mistral/api/controllers/async_actions_heartbeats.py new file mode 100644 index 000000000..cab4e6115 --- /dev/null +++ b/mistral/api/controllers/async_actions_heartbeats.py @@ -0,0 +1,55 @@ +# Copyright 2023 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from pecan import rest +import wsmeext.pecan as wsme_pecan + +from mistral.api.controllers import resource +from mistral.api.controllers.v2 import types + +from mistral.rpc import clients as rpc +from mistral.utils import rest_utils + + +class AsyncActionsHeartbeats(resource.Resource): + """AsyncActionsHeartbeats resource.""" + + action_ex_ids = [types.uuid] + + @classmethod + def sample(cls): + return cls( + action_ex_ids=[ + "0a68ce2a-e229-40a7-ab8b-256b212fe34d", + "0a68ce2a-e229-40a7-ab8b-256b212fe342" + ] + ) + + +class AsyncActionsHeartbeatsController(rest.RestController): + + @rest_utils.wrap_wsme_controller_exception + @wsme_pecan.wsexpose( + AsyncActionsHeartbeats, + body=AsyncActionsHeartbeats, + status_code=200 + ) + def put(self, async_actions_heartbeats): + engine = rpc.get_engine_client() + engine.process_action_heartbeats( + async_actions_heartbeats.action_ex_ids + ) + + return async_actions_heartbeats diff --git a/mistral/api/controllers/root.py b/mistral/api/controllers/root.py index 39d084fee..ce49b8e40 100644 --- a/mistral/api/controllers/root.py +++ b/mistral/api/controllers/root.py @@ -17,6 +17,7 @@ import pecan from wsme import types as wtypes import wsmeext.pecan as wsme_pecan +from mistral.api.controllers import async_actions_heartbeats from mistral.api.controllers import info from mistral.api.controllers import maintenance from mistral.api.controllers import resource @@ -65,6 +66,8 @@ class APIVersions(resource.Resource): class RootController(object): v2 = v2_root.Controller() info = info.InfoController() + async_actions_heartbeats = \ + async_actions_heartbeats.AsyncActionsHeartbeatsController() maintenance = maintenance.MaintenanceController() @wsme_pecan.wsexpose(APIVersions) diff --git a/mistral/db/sqlalchemy/migration/alembic_migrations/versions/045_async_heatbeats_flag_for_action_executions.py b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/045_async_heatbeats_flag_for_action_executions.py new file mode 100644 index 000000000..b8e0c20f5 --- /dev/null +++ b/mistral/db/sqlalchemy/migration/alembic_migrations/versions/045_async_heatbeats_flag_for_action_executions.py @@ -0,0 +1,42 @@ +# Copyright 2023 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Async heartbeats flag for action executions + +Revision ID: 045 +Revises: 044 +Create Date: 2023-05-29 12:45:14.041458 + +""" + +# revision identifiers, used by Alembic. +revision = '045' +down_revision = '044' + +from alembic import op +from mistral.db.utils import column_exists +import sqlalchemy as sa + + +def upgrade(): + if not column_exists('action_executions_v2', 'async_heartbeats_enabled'): + op.add_column( + 'action_executions_v2', + sa.Column( + 'async_heartbeats_enabled', + sa.Boolean(), + nullable=True + ) + ) diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 6fd806d19..ceda7349c 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -565,6 +565,14 @@ def get_running_expired_sync_action_executions(expiration_time, ) +def get_running_expired_async_action_executions(expiration_time, + limit, session=None): + return IMPL.get_running_expired_async_action_executions( + expiration_time, + limit + ) + + def get_superfluous_executions(max_finished_executions, limit=None, columns=()): return IMPL.get_superfluous_executions( diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index 87d438491..3d2e27083 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -994,10 +994,15 @@ def update_action_execution_heartbeat(id, session=None): raise exc.DBEntityNotFoundError now = utils.utc_now_sec() - session.query(models.ActionExecution). \ + count = session.query(models.ActionExecution). \ filter(models.ActionExecution.id == id). \ update({'last_heartbeat': now}) + if count == 0: + raise exc.DBEntityNotFoundError( + "ActionExecution not found [id=%s]" % id + ) + @b.session_aware() def delete_action_execution(id, session=None): @@ -1576,6 +1581,24 @@ def get_running_expired_sync_action_executions(expiration_time, return query.all() +@b.session_aware() +def get_running_expired_async_action_executions(expiration_time, + limit, session=None): + query = b.model_query(models.ActionExecution) + + query = query.filter( + models.ActionExecution.last_heartbeat < expiration_time + ) + query = query.filter_by(is_sync=False) + query = query.filter_by(async_heartbeats_enabled=True) + query = query.filter(models.ActionExecution.state == states.RUNNING) + + if limit: + query.limit(limit) + + return query.all() + + @b.session_aware() def get_superfluous_executions(max_finished_executions, limit=None, columns=(), session=None): diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py index a3d4d012d..6c4bd3493 100644 --- a/mistral/db/v2/sqlalchemy/models.py +++ b/mistral/db/v2/sqlalchemy/models.py @@ -312,6 +312,9 @@ class ActionExecution(Execution): ) is_sync = sa.Column(sa.Boolean(), default=None, nullable=True) + # Whether the heartbeats for async action should be processed. + async_heartbeats_enabled = sa.Column(sa.BOOLEAN, default=False) + class WorkflowExecution(Execution): """Contains workflow execution information.""" diff --git a/mistral/engine/actions.py b/mistral/engine/actions.py index a259faa3e..f986f33ad 100644 --- a/mistral/engine/actions.py +++ b/mistral/engine/actions.py @@ -96,7 +96,7 @@ class Action(object, metaclass=abc.ABCMeta): @abc.abstractmethod def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, - timeout=None): + timeout=None, async_heartbeats_enabled=False): """Schedule action run. This method is needed to schedule action run so its result can @@ -113,6 +113,8 @@ class Action(object, metaclass=abc.ABCMeta): :param desc: Action execution description. :param safe_rerun: If true, action would be re-run if executor dies during execution. + :param async_heartbeats_enabled: If true, heartbeats will be processed + for async action execution. """ raise NotImplementedError @@ -157,7 +159,8 @@ class Action(object, metaclass=abc.ABCMeta): return res def _create_action_execution(self, input_dict, runtime_ctx, - desc='', action_ex_id=None, is_sync=True): + desc='', action_ex_id=None, is_sync=True, + async_heartbeats_enabled=False): action_ex_id = action_ex_id or utils.generate_unicode_uuid() values = { @@ -168,7 +171,8 @@ class Action(object, metaclass=abc.ABCMeta): 'runtime_context': runtime_ctx, 'workflow_namespace': self.namespace, 'description': desc, - 'is_sync': is_sync + 'is_sync': is_sync, + 'async_heartbeats_enabled': async_heartbeats_enabled } if self.task_ex: @@ -244,7 +248,7 @@ class RegularAction(Action): @profiler.trace('action-schedule', hide_args=True) def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, - timeout=None): + timeout=None, async_heartbeats_enabled=False): assert not self.action_ex self.action_desc.check_parameters(input_dict) @@ -277,7 +281,8 @@ class RegularAction(Action): self._prepare_runtime_context(index, safe_rerun), desc=desc, action_ex_id=action_ex_id, - is_sync=action.is_sync() + is_sync=action.is_sync(), + async_heartbeats_enabled=async_heartbeats_enabled ) def _run_action(): @@ -289,7 +294,8 @@ class RegularAction(Action): safe_rerun, self._prepare_execution_context(), target=target, - timeout=timeout + timeout=timeout, + async_heartbeats_enabled=async_heartbeats_enabled ) # Register an asynchronous command to run the action @@ -324,7 +330,8 @@ class RegularAction(Action): self._prepare_runtime_context(index, safe_rerun), desc=desc, action_ex_id=action_ex_id, - is_sync=action.is_sync() + is_sync=action.is_sync(), + async_heartbeats_enabled=False ) executor = exe.get_executor(cfg.CONF.executor.type) @@ -364,7 +371,7 @@ class WorkflowAction(Action): @profiler.trace('workflow-action-schedule', hide_args=True) def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False, - timeout=None): + timeout=None, async_heartbeats_enabled=False): assert not self.action_ex self.validate_input(input_dict) diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index fcab63e56..7ca6a9d50 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -559,6 +559,24 @@ class Task(object, metaclass=abc.ABCMeta): return False + def _get_async_heartbeats_enabled(self): + async_heartbeats_enabled = \ + self.task_spec.get_async_heartbeats_enabled() + + if async_heartbeats_enabled is not None: + return async_heartbeats_enabled + + task_default = self.wf_spec.get_task_defaults() + + if task_default: + default_async_heartbeats_enabled = \ + task_default.get_async_heartbeats_enabled() + + if default_async_heartbeats_enabled is not None: + return default_async_heartbeats_enabled + + return False + def _get_action_defaults(self): action_name = self.task_spec.get_action_name() @@ -703,7 +721,8 @@ class RegularTask(Task): input_dict, target, safe_rerun=self._get_safe_rerun(), - timeout=self._get_timeout() + timeout=self._get_timeout(), + async_heartbeats_enabled=self._get_async_heartbeats_enabled() ) except exc.MistralException as e: self.complete(states.ERROR, e.message) diff --git a/mistral/executors/base.py b/mistral/executors/base.py index 9c1fcf560..fbe51c6b6 100644 --- a/mistral/executors/base.py +++ b/mistral/executors/base.py @@ -44,7 +44,8 @@ class Executor(object, metaclass=abc.ABCMeta): @abc.abstractmethod def run_action(self, action, action_ex_id, safe_rerun, exec_ctx, - redelivered=False, target=None, async_=True, timeout=None): + redelivered=False, target=None, async_=True, timeout=None, + async_heartbeats_enabled=False): """Runs the given action. :param action: Action to run. @@ -60,6 +61,9 @@ class Executor(object, metaclass=abc.ABCMeta): for completion). :param timeout: a period of time in seconds after which execution of action will be interrupted + :param async_heartbeats_enabled: If true, executor will not send + any heartbeats automatically, because they should be received + by api. :return: Action result. """ raise NotImplementedError() diff --git a/mistral/executors/default_executor.py b/mistral/executors/default_executor.py index 8f4e088d1..7b39cbbcb 100644 --- a/mistral/executors/default_executor.py +++ b/mistral/executors/default_executor.py @@ -33,7 +33,8 @@ class DefaultExecutor(base.Executor): @profiler.trace('default-executor-run-action', hide_args=True) def run_action(self, action, action_ex_id, safe_rerun, exec_ctx, - redelivered=False, target=None, async_=True, timeout=None): + redelivered=False, target=None, async_=True, timeout=None, + async_heartbeats_enabled=False): """Runs action. :param action: Action to run. @@ -47,12 +48,16 @@ class DefaultExecutor(base.Executor): :param async_: If True, run action in asynchronous mode (w/o waiting for completion). :param timeout: a period of time in seconds after which execution of - action will be interrupted + action will be interrupted. + :param async_heartbeats_enabled: If true, executor will not send + any heartbeats automatically, because they should be received + by api. :return: Action result. """ try: - action_heartbeat_sender.add_action(action_ex_id) + if not async_heartbeats_enabled: + action_heartbeat_sender.add_action(action_ex_id) return self._do_run_action( action, @@ -63,7 +68,8 @@ class DefaultExecutor(base.Executor): timeout ) finally: - action_heartbeat_sender.remove_action(action_ex_id) + if not async_heartbeats_enabled: + action_heartbeat_sender.remove_action(action_ex_id) def _do_run_action(self, action, action_ex_id, exec_ctx, redelivered, safe_rerun, diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py index dabddd461..a3ce56e3b 100644 --- a/mistral/executors/executor_server.py +++ b/mistral/executors/executor_server.py @@ -71,7 +71,7 @@ class ExecutorServer(service_base.MistralService): self._rpc_server.stop(graceful) def run_action(self, rpc_ctx, action, action_ex_id, safe_rerun, exec_ctx, - timeout): + timeout, async_heartbeats_enabled): """Receives calls over RPC to run action on executor. @@ -83,14 +83,19 @@ class ExecutorServer(service_base.MistralService): the current execution. :param timeout: a period of time in seconds after which execution of action will be interrupted + :param async_heartbeats_enabled: If true, executor will not send + any heartbeats automatically, because they should be received + by api. :return: Action result. """ LOG.debug( "Received RPC request 'run_action'" - "[action=%s, action_ex_id=%s, timeout=%s]", + "[action=%s, action_ex_id=%s, timeout=%s, " + "async_heartbeats_enabled=%s]", action, action_ex_id, - timeout + timeout, + async_heartbeats_enabled ) redelivered = rpc_ctx.redelivered or False @@ -101,7 +106,8 @@ class ExecutorServer(service_base.MistralService): safe_rerun, exec_ctx, redelivered, - timeout=timeout + timeout=timeout, + async_heartbeats_enabled=async_heartbeats_enabled ) LOG.debug( diff --git a/mistral/lang/v2/task_defaults.py b/mistral/lang/v2/task_defaults.py index 9b0af333e..8b2f3505d 100644 --- a/mistral/lang/v2/task_defaults.py +++ b/mistral/lang/v2/task_defaults.py @@ -42,6 +42,7 @@ class TaskDefaultsSpec(base.BaseSpec): "on-error": on_clause.OnClauseSpec.get_schema(), "on-skip": on_clause.OnClauseSpec.get_schema(), "safe-rerun": types.EXPRESSION_OR_BOOLEAN, + "async-heartbeats-enabled": types.EXPRESSION_OR_BOOLEAN, "requires": { "oneOf": [types.NONEMPTY_STRING, types.UNIQUE_STRING_LIST] } @@ -75,6 +76,7 @@ class TaskDefaultsSpec(base.BaseSpec): self._on_skip = self._spec_property('on-skip', on_spec_cls) self._safe_rerun = data.get('safe-rerun') + self._async_heartbeats_enabled = data.get('async-heartbeats-enabled') # TODO(rakhmerov): 'requires' should reside in a different spec for # reverse workflows. @@ -84,6 +86,7 @@ class TaskDefaultsSpec(base.BaseSpec): super(TaskDefaultsSpec, self).validate_schema() self.validate_expr(self._data.get('safe-rerun', {})) + self.validate_expr(self._data.get('async-heartbeats-enabled', {})) def validate_semantics(self): # Validate YAQL expressions. @@ -119,6 +122,9 @@ class TaskDefaultsSpec(base.BaseSpec): def get_safe_rerun(self): return self._safe_rerun + def get_async_heartbeats_enabled(self): + return self._async_heartbeats_enabled + def get_requires(self): if isinstance(self._requires, str): return [self._requires] diff --git a/mistral/lang/v2/tasks.py b/mistral/lang/v2/tasks.py index 8bfaedda7..1123a830e 100644 --- a/mistral/lang/v2/tasks.py +++ b/mistral/lang/v2/tasks.py @@ -85,7 +85,8 @@ class TaskSpec(base.BaseSpec): "fail-on": types.EXPRESSION_OR_BOOLEAN, "target": types.NONEMPTY_STRING, "keep-result": types.EXPRESSION_OR_BOOLEAN, - "safe-rerun": types.EXPRESSION_OR_BOOLEAN + "safe-rerun": types.EXPRESSION_OR_BOOLEAN, + "async-heartbeats-enabled": types.EXPRESSION_OR_BOOLEAN }, "additionalProperties": False, "anyOf": [ @@ -136,6 +137,7 @@ class TaskSpec(base.BaseSpec): self._target = data.get('target') self._keep_result = data.get('keep-result', True) self._safe_rerun = data.get('safe-rerun') + self._async_heartbeats_enabled = data.get('async-heartbeats-enabled') self._process_action_and_workflow() @@ -158,6 +160,7 @@ class TaskSpec(base.BaseSpec): self.validate_expr(self._data.get('publish-on-skip', {})) self.validate_expr(self._data.get('keep-result', {})) self.validate_expr(self._data.get('safe-rerun', {})) + self.validate_expr(self._data.get('async-heartbeats-enabled', {})) def _validate_name(self): task_name = self._data.get('name') @@ -276,6 +279,9 @@ class TaskSpec(base.BaseSpec): def get_safe_rerun(self): return self._safe_rerun + def get_async_heartbeats_enabled(self): + return self._async_heartbeats_enabled + def get_type(self): return (WORKFLOW_TASK_TYPE if self._workflow else ACTION_TASK_TYPE) diff --git a/mistral/rpc/clients.py b/mistral/rpc/clients.py index f64110b1b..0e596d266 100644 --- a/mistral/rpc/clients.py +++ b/mistral/rpc/clients.py @@ -433,7 +433,8 @@ class ExecutorClient(exe.Executor): @profiler.trace('executor-client-run-action') def run_action(self, action, action_ex_id, safe_rerun, exec_ctx, - redelivered=False, target=None, async_=True, timeout=None): + redelivered=False, target=None, async_=True, timeout=None, + async_heartbeats_enabled=False): """Sends a request to run action to executor. :param action: Action to run. @@ -448,7 +449,10 @@ class ExecutorClient(exe.Executor): :param async_: If True, run action in asynchronous mode (w/o waiting for completion). :param timeout: a period of time in seconds after which execution of - action will be interrupted + action will be interrupted. + :param async_heartbeats_enabled: If true, executor will not send + any heartbeats automatically, because they should be received + by api. :return: Action result. """ rpc_kwargs = { @@ -456,7 +460,8 @@ class ExecutorClient(exe.Executor): 'action_ex_id': action_ex_id, 'safe_rerun': safe_rerun, 'exec_ctx': exec_ctx, - 'timeout': timeout + 'timeout': timeout, + 'async_heartbeats_enabled': async_heartbeats_enabled } rpc_client_method = ( diff --git a/mistral/services/action_heartbeat_checker.py b/mistral/services/action_heartbeat_checker.py index f1fed2288..06457c5b8 100644 --- a/mistral/services/action_heartbeat_checker.py +++ b/mistral/services/action_heartbeat_checker.py @@ -51,6 +51,11 @@ def handle_expired_actions(): CONF.action_heartbeat.batch_size ) + action_exs.extend(db_api.get_running_expired_async_action_executions( + exp_date, + CONF.action_heartbeat.batch_size + )) + LOG.debug("Found {} running and expired actions.", len(action_exs)) if action_exs: diff --git a/mistral/tests/unit/api/v2/test_async_actions_heartbeats.py b/mistral/tests/unit/api/v2/test_async_actions_heartbeats.py new file mode 100644 index 000000000..5654f5c2f --- /dev/null +++ b/mistral/tests/unit/api/v2/test_async_actions_heartbeats.py @@ -0,0 +1,86 @@ +# Copyright 2019 - Nokia Networks, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mistral.services import workflows as wf_service +from mistral.tests.unit.api import base +from mistral.tests.unit.engine import base as engine_base + + +class TestAsyncActionsHeartbeats(engine_base.EngineTestCase, base.APITest): + def setUp(self): + # We need to override configuration values before starting engine. + self.override_config('check_interval', 1, 'action_heartbeat') + self.override_config('max_missed_heartbeats', 1, 'action_heartbeat') + self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat') + + super(TestAsyncActionsHeartbeats, self).setUp() + + self.override_config('type', 'remote', 'executor') + + def test_put_empty(self): + resp = self.app.put_json( + '/async_actions_heartbeats', + headers={'Accept': 'application/json'}, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn('Missing argument', resp.json['faultstring']) + + def test_put_wrong_uuid_format(self): + resp = self.app.put_json( + '/async_actions_heartbeats', + headers={'Accept': 'application/json'}, + params={"action_ex_ids": ["qwe"]}, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn('Expected a uuid', resp.json['faultstring']) + + def test_put_no_action_ids(self): + resp = self.app.put_json( + '/async_actions_heartbeats', + headers={'Accept': 'application/json'}, + params={"action_ex_ids": []} + ) + + self.assertEqual(200, resp.status_int) + + def test_put_normal_action_ids(self): + resp = self.app.put_json( + '/async_actions_heartbeats', + headers={'Accept': 'application/json'}, + params={"action_ex_ids": ["0b4ce884-9154-47b0-aec3-e8aba1a6febc"]} + ) + + self.assertEqual(200, resp.status_int) + + def test_action_fail_without_heartbeats(self): + wf_text = """--- + version: '2.0' + wf: + tasks: + task1: + action: std.async_noop + async-heartbeats-enabled: true + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + # The workflow should fail because the action of "task1" should be + # failed automatically by the action execution heartbeat checker. + self.await_workflow_error(wf_ex.id) diff --git a/mistral/tests/unit/engine/test_environment.py b/mistral/tests/unit/engine/test_environment.py index 6d034fadf..25a1fa1ad 100644 --- a/mistral/tests/unit/engine/test_environment.py +++ b/mistral/tests/unit/engine/test_environment.py @@ -81,7 +81,8 @@ workflows: def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, target=None, - async_=True, timeout=None): + async_=True, timeout=None, + async_heartbeats_enabled=False): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor() @@ -92,7 +93,8 @@ def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, target=None, exec_ctx=exec_ctx, target=target, async_=async_, - timeout=timeout + timeout=timeout, + async_heartbeats_enabled=async_heartbeats_enabled ) @@ -183,7 +185,8 @@ class EnvironmentTest(base.EngineTestCase): 'action_execution_id': a_ex.id, }, target=TARGET, - timeout=None + timeout=None, + async_heartbeats_enabled=False, ) def test_subworkflow_env_task_input(self): diff --git a/mistral/tests/unit/engine/test_safe_rerun.py b/mistral/tests/unit/engine/test_safe_rerun.py index 3edcce4f8..152c5ce78 100644 --- a/mistral/tests/unit/engine/test_safe_rerun.py +++ b/mistral/tests/unit/engine/test_safe_rerun.py @@ -25,7 +25,8 @@ from mistral.workflow import states def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, - target=None, async_=True, timeout=None): + target=None, async_=True, timeout=None, + async_heartbeats_enabled=False): # We'll just call executor directly for testing purposes. executor = d_exe.DefaultExecutor()