Heartbeats for async actions added

Action heartbeats is a must-have mechanism to ensure
no mistral entity will be in RUNNING state forever.
Sync actions already have this feature, but async not.
Async actions completion is performed by external services,
so async action heartbeats should be sent by them as well.

Change-Id: Idd8b4070cbad0fc5aa9fc567b1d6903d107a736a
Signed-off-by: Oleg Ovcharuk <vgvoleg@gmail.com>
This commit is contained in:
Oleg Ovcharuk 2023-10-19 14:48:11 +03:00
parent cadfe9dca2
commit 8b23eedd58
18 changed files with 315 additions and 27 deletions

View File

@ -0,0 +1,55 @@
# Copyright 2023 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from pecan import rest
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import resource
from mistral.api.controllers.v2 import types
from mistral.rpc import clients as rpc
from mistral.utils import rest_utils
class AsyncActionsHeartbeats(resource.Resource):
"""AsyncActionsHeartbeats resource."""
action_ex_ids = [types.uuid]
@classmethod
def sample(cls):
return cls(
action_ex_ids=[
"0a68ce2a-e229-40a7-ab8b-256b212fe34d",
"0a68ce2a-e229-40a7-ab8b-256b212fe342"
]
)
class AsyncActionsHeartbeatsController(rest.RestController):
@rest_utils.wrap_wsme_controller_exception
@wsme_pecan.wsexpose(
AsyncActionsHeartbeats,
body=AsyncActionsHeartbeats,
status_code=200
)
def put(self, async_actions_heartbeats):
engine = rpc.get_engine_client()
engine.process_action_heartbeats(
async_actions_heartbeats.action_ex_ids
)
return async_actions_heartbeats

View File

@ -17,6 +17,7 @@ import pecan
from wsme import types as wtypes
import wsmeext.pecan as wsme_pecan
from mistral.api.controllers import async_actions_heartbeats
from mistral.api.controllers import info
from mistral.api.controllers import maintenance
from mistral.api.controllers import resource
@ -65,6 +66,8 @@ class APIVersions(resource.Resource):
class RootController(object):
v2 = v2_root.Controller()
info = info.InfoController()
async_actions_heartbeats = \
async_actions_heartbeats.AsyncActionsHeartbeatsController()
maintenance = maintenance.MaintenanceController()
@wsme_pecan.wsexpose(APIVersions)

View File

@ -0,0 +1,42 @@
# Copyright 2023 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Async heartbeats flag for action executions
Revision ID: 045
Revises: 044
Create Date: 2023-05-29 12:45:14.041458
"""
# revision identifiers, used by Alembic.
revision = '045'
down_revision = '044'
from alembic import op
from mistral.db.utils import column_exists
import sqlalchemy as sa
def upgrade():
if not column_exists('action_executions_v2', 'async_heartbeats_enabled'):
op.add_column(
'action_executions_v2',
sa.Column(
'async_heartbeats_enabled',
sa.Boolean(),
nullable=True
)
)

View File

@ -565,6 +565,14 @@ def get_running_expired_sync_action_executions(expiration_time,
)
def get_running_expired_async_action_executions(expiration_time,
limit, session=None):
return IMPL.get_running_expired_async_action_executions(
expiration_time,
limit
)
def get_superfluous_executions(max_finished_executions, limit=None,
columns=()):
return IMPL.get_superfluous_executions(

View File

@ -994,10 +994,15 @@ def update_action_execution_heartbeat(id, session=None):
raise exc.DBEntityNotFoundError
now = utils.utc_now_sec()
session.query(models.ActionExecution). \
count = session.query(models.ActionExecution). \
filter(models.ActionExecution.id == id). \
update({'last_heartbeat': now})
if count == 0:
raise exc.DBEntityNotFoundError(
"ActionExecution not found [id=%s]" % id
)
@b.session_aware()
def delete_action_execution(id, session=None):
@ -1576,6 +1581,24 @@ def get_running_expired_sync_action_executions(expiration_time,
return query.all()
@b.session_aware()
def get_running_expired_async_action_executions(expiration_time,
limit, session=None):
query = b.model_query(models.ActionExecution)
query = query.filter(
models.ActionExecution.last_heartbeat < expiration_time
)
query = query.filter_by(is_sync=False)
query = query.filter_by(async_heartbeats_enabled=True)
query = query.filter(models.ActionExecution.state == states.RUNNING)
if limit:
query.limit(limit)
return query.all()
@b.session_aware()
def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
session=None):

View File

@ -312,6 +312,9 @@ class ActionExecution(Execution):
)
is_sync = sa.Column(sa.Boolean(), default=None, nullable=True)
# Whether the heartbeats for async action should be processed.
async_heartbeats_enabled = sa.Column(sa.BOOLEAN, default=False)
class WorkflowExecution(Execution):
"""Contains workflow execution information."""

View File

@ -96,7 +96,7 @@ class Action(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
timeout=None, async_heartbeats_enabled=False):
"""Schedule action run.
This method is needed to schedule action run so its result can
@ -113,6 +113,8 @@ class Action(object, metaclass=abc.ABCMeta):
:param desc: Action execution description.
:param safe_rerun: If true, action would be re-run if executor dies
during execution.
:param async_heartbeats_enabled: If true, heartbeats will be processed
for async action execution.
"""
raise NotImplementedError
@ -157,7 +159,8 @@ class Action(object, metaclass=abc.ABCMeta):
return res
def _create_action_execution(self, input_dict, runtime_ctx,
desc='', action_ex_id=None, is_sync=True):
desc='', action_ex_id=None, is_sync=True,
async_heartbeats_enabled=False):
action_ex_id = action_ex_id or utils.generate_unicode_uuid()
values = {
@ -168,7 +171,8 @@ class Action(object, metaclass=abc.ABCMeta):
'runtime_context': runtime_ctx,
'workflow_namespace': self.namespace,
'description': desc,
'is_sync': is_sync
'is_sync': is_sync,
'async_heartbeats_enabled': async_heartbeats_enabled
}
if self.task_ex:
@ -244,7 +248,7 @@ class RegularAction(Action):
@profiler.trace('action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
timeout=None, async_heartbeats_enabled=False):
assert not self.action_ex
self.action_desc.check_parameters(input_dict)
@ -277,7 +281,8 @@ class RegularAction(Action):
self._prepare_runtime_context(index, safe_rerun),
desc=desc,
action_ex_id=action_ex_id,
is_sync=action.is_sync()
is_sync=action.is_sync(),
async_heartbeats_enabled=async_heartbeats_enabled
)
def _run_action():
@ -289,7 +294,8 @@ class RegularAction(Action):
safe_rerun,
self._prepare_execution_context(),
target=target,
timeout=timeout
timeout=timeout,
async_heartbeats_enabled=async_heartbeats_enabled
)
# Register an asynchronous command to run the action
@ -324,7 +330,8 @@ class RegularAction(Action):
self._prepare_runtime_context(index, safe_rerun),
desc=desc,
action_ex_id=action_ex_id,
is_sync=action.is_sync()
is_sync=action.is_sync(),
async_heartbeats_enabled=False
)
executor = exe.get_executor(cfg.CONF.executor.type)
@ -364,7 +371,7 @@ class WorkflowAction(Action):
@profiler.trace('workflow-action-schedule', hide_args=True)
def schedule(self, input_dict, target, index=0, desc='', safe_rerun=False,
timeout=None):
timeout=None, async_heartbeats_enabled=False):
assert not self.action_ex
self.validate_input(input_dict)

View File

@ -559,6 +559,24 @@ class Task(object, metaclass=abc.ABCMeta):
return False
def _get_async_heartbeats_enabled(self):
async_heartbeats_enabled = \
self.task_spec.get_async_heartbeats_enabled()
if async_heartbeats_enabled is not None:
return async_heartbeats_enabled
task_default = self.wf_spec.get_task_defaults()
if task_default:
default_async_heartbeats_enabled = \
task_default.get_async_heartbeats_enabled()
if default_async_heartbeats_enabled is not None:
return default_async_heartbeats_enabled
return False
def _get_action_defaults(self):
action_name = self.task_spec.get_action_name()
@ -703,7 +721,8 @@ class RegularTask(Task):
input_dict,
target,
safe_rerun=self._get_safe_rerun(),
timeout=self._get_timeout()
timeout=self._get_timeout(),
async_heartbeats_enabled=self._get_async_heartbeats_enabled()
)
except exc.MistralException as e:
self.complete(states.ERROR, e.message)

View File

@ -44,7 +44,8 @@ class Executor(object, metaclass=abc.ABCMeta):
@abc.abstractmethod
def run_action(self, action, action_ex_id, safe_rerun, exec_ctx,
redelivered=False, target=None, async_=True, timeout=None):
redelivered=False, target=None, async_=True, timeout=None,
async_heartbeats_enabled=False):
"""Runs the given action.
:param action: Action to run.
@ -60,6 +61,9 @@ class Executor(object, metaclass=abc.ABCMeta):
for completion).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param async_heartbeats_enabled: If true, executor will not send
any heartbeats automatically, because they should be received
by api.
:return: Action result.
"""
raise NotImplementedError()

View File

@ -33,7 +33,8 @@ class DefaultExecutor(base.Executor):
@profiler.trace('default-executor-run-action', hide_args=True)
def run_action(self, action, action_ex_id, safe_rerun, exec_ctx,
redelivered=False, target=None, async_=True, timeout=None):
redelivered=False, target=None, async_=True, timeout=None,
async_heartbeats_enabled=False):
"""Runs action.
:param action: Action to run.
@ -47,12 +48,16 @@ class DefaultExecutor(base.Executor):
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
action will be interrupted.
:param async_heartbeats_enabled: If true, executor will not send
any heartbeats automatically, because they should be received
by api.
:return: Action result.
"""
try:
action_heartbeat_sender.add_action(action_ex_id)
if not async_heartbeats_enabled:
action_heartbeat_sender.add_action(action_ex_id)
return self._do_run_action(
action,
@ -63,7 +68,8 @@ class DefaultExecutor(base.Executor):
timeout
)
finally:
action_heartbeat_sender.remove_action(action_ex_id)
if not async_heartbeats_enabled:
action_heartbeat_sender.remove_action(action_ex_id)
def _do_run_action(self, action, action_ex_id, exec_ctx,
redelivered, safe_rerun,

View File

@ -71,7 +71,7 @@ class ExecutorServer(service_base.MistralService):
self._rpc_server.stop(graceful)
def run_action(self, rpc_ctx, action, action_ex_id, safe_rerun, exec_ctx,
timeout):
timeout, async_heartbeats_enabled):
"""Receives calls over RPC to run action on executor.
@ -83,14 +83,19 @@ class ExecutorServer(service_base.MistralService):
the current execution.
:param timeout: a period of time in seconds after which execution of
action will be interrupted
:param async_heartbeats_enabled: If true, executor will not send
any heartbeats automatically, because they should be received
by api.
:return: Action result.
"""
LOG.debug(
"Received RPC request 'run_action'"
"[action=%s, action_ex_id=%s, timeout=%s]",
"[action=%s, action_ex_id=%s, timeout=%s, "
"async_heartbeats_enabled=%s]",
action,
action_ex_id,
timeout
timeout,
async_heartbeats_enabled
)
redelivered = rpc_ctx.redelivered or False
@ -101,7 +106,8 @@ class ExecutorServer(service_base.MistralService):
safe_rerun,
exec_ctx,
redelivered,
timeout=timeout
timeout=timeout,
async_heartbeats_enabled=async_heartbeats_enabled
)
LOG.debug(

View File

@ -42,6 +42,7 @@ class TaskDefaultsSpec(base.BaseSpec):
"on-error": on_clause.OnClauseSpec.get_schema(),
"on-skip": on_clause.OnClauseSpec.get_schema(),
"safe-rerun": types.EXPRESSION_OR_BOOLEAN,
"async-heartbeats-enabled": types.EXPRESSION_OR_BOOLEAN,
"requires": {
"oneOf": [types.NONEMPTY_STRING, types.UNIQUE_STRING_LIST]
}
@ -75,6 +76,7 @@ class TaskDefaultsSpec(base.BaseSpec):
self._on_skip = self._spec_property('on-skip', on_spec_cls)
self._safe_rerun = data.get('safe-rerun')
self._async_heartbeats_enabled = data.get('async-heartbeats-enabled')
# TODO(rakhmerov): 'requires' should reside in a different spec for
# reverse workflows.
@ -84,6 +86,7 @@ class TaskDefaultsSpec(base.BaseSpec):
super(TaskDefaultsSpec, self).validate_schema()
self.validate_expr(self._data.get('safe-rerun', {}))
self.validate_expr(self._data.get('async-heartbeats-enabled', {}))
def validate_semantics(self):
# Validate YAQL expressions.
@ -119,6 +122,9 @@ class TaskDefaultsSpec(base.BaseSpec):
def get_safe_rerun(self):
return self._safe_rerun
def get_async_heartbeats_enabled(self):
return self._async_heartbeats_enabled
def get_requires(self):
if isinstance(self._requires, str):
return [self._requires]

View File

@ -85,7 +85,8 @@ class TaskSpec(base.BaseSpec):
"fail-on": types.EXPRESSION_OR_BOOLEAN,
"target": types.NONEMPTY_STRING,
"keep-result": types.EXPRESSION_OR_BOOLEAN,
"safe-rerun": types.EXPRESSION_OR_BOOLEAN
"safe-rerun": types.EXPRESSION_OR_BOOLEAN,
"async-heartbeats-enabled": types.EXPRESSION_OR_BOOLEAN
},
"additionalProperties": False,
"anyOf": [
@ -136,6 +137,7 @@ class TaskSpec(base.BaseSpec):
self._target = data.get('target')
self._keep_result = data.get('keep-result', True)
self._safe_rerun = data.get('safe-rerun')
self._async_heartbeats_enabled = data.get('async-heartbeats-enabled')
self._process_action_and_workflow()
@ -158,6 +160,7 @@ class TaskSpec(base.BaseSpec):
self.validate_expr(self._data.get('publish-on-skip', {}))
self.validate_expr(self._data.get('keep-result', {}))
self.validate_expr(self._data.get('safe-rerun', {}))
self.validate_expr(self._data.get('async-heartbeats-enabled', {}))
def _validate_name(self):
task_name = self._data.get('name')
@ -276,6 +279,9 @@ class TaskSpec(base.BaseSpec):
def get_safe_rerun(self):
return self._safe_rerun
def get_async_heartbeats_enabled(self):
return self._async_heartbeats_enabled
def get_type(self):
return (WORKFLOW_TASK_TYPE if self._workflow
else ACTION_TASK_TYPE)

View File

@ -433,7 +433,8 @@ class ExecutorClient(exe.Executor):
@profiler.trace('executor-client-run-action')
def run_action(self, action, action_ex_id, safe_rerun, exec_ctx,
redelivered=False, target=None, async_=True, timeout=None):
redelivered=False, target=None, async_=True, timeout=None,
async_heartbeats_enabled=False):
"""Sends a request to run action to executor.
:param action: Action to run.
@ -448,7 +449,10 @@ class ExecutorClient(exe.Executor):
:param async_: If True, run action in asynchronous mode (w/o waiting
for completion).
:param timeout: a period of time in seconds after which execution of
action will be interrupted
action will be interrupted.
:param async_heartbeats_enabled: If true, executor will not send
any heartbeats automatically, because they should be received
by api.
:return: Action result.
"""
rpc_kwargs = {
@ -456,7 +460,8 @@ class ExecutorClient(exe.Executor):
'action_ex_id': action_ex_id,
'safe_rerun': safe_rerun,
'exec_ctx': exec_ctx,
'timeout': timeout
'timeout': timeout,
'async_heartbeats_enabled': async_heartbeats_enabled
}
rpc_client_method = (

View File

@ -51,6 +51,11 @@ def handle_expired_actions():
CONF.action_heartbeat.batch_size
)
action_exs.extend(db_api.get_running_expired_async_action_executions(
exp_date,
CONF.action_heartbeat.batch_size
))
LOG.debug("Found {} running and expired actions.", len(action_exs))
if action_exs:

View File

@ -0,0 +1,86 @@
# Copyright 2019 - Nokia Networks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.services import workflows as wf_service
from mistral.tests.unit.api import base
from mistral.tests.unit.engine import base as engine_base
class TestAsyncActionsHeartbeats(engine_base.EngineTestCase, base.APITest):
def setUp(self):
# We need to override configuration values before starting engine.
self.override_config('check_interval', 1, 'action_heartbeat')
self.override_config('max_missed_heartbeats', 1, 'action_heartbeat')
self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat')
super(TestAsyncActionsHeartbeats, self).setUp()
self.override_config('type', 'remote', 'executor')
def test_put_empty(self):
resp = self.app.put_json(
'/async_actions_heartbeats',
headers={'Accept': 'application/json'},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
self.assertIn('Missing argument', resp.json['faultstring'])
def test_put_wrong_uuid_format(self):
resp = self.app.put_json(
'/async_actions_heartbeats',
headers={'Accept': 'application/json'},
params={"action_ex_ids": ["qwe"]},
expect_errors=True
)
self.assertEqual(400, resp.status_int)
self.assertIn('Expected a uuid', resp.json['faultstring'])
def test_put_no_action_ids(self):
resp = self.app.put_json(
'/async_actions_heartbeats',
headers={'Accept': 'application/json'},
params={"action_ex_ids": []}
)
self.assertEqual(200, resp.status_int)
def test_put_normal_action_ids(self):
resp = self.app.put_json(
'/async_actions_heartbeats',
headers={'Accept': 'application/json'},
params={"action_ex_ids": ["0b4ce884-9154-47b0-aec3-e8aba1a6febc"]}
)
self.assertEqual(200, resp.status_int)
def test_action_fail_without_heartbeats(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.async_noop
async-heartbeats-enabled: true
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
# The workflow should fail because the action of "task1" should be
# failed automatically by the action execution heartbeat checker.
self.await_workflow_error(wf_ex.id)

View File

@ -81,7 +81,8 @@ workflows:
def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, target=None,
async_=True, timeout=None):
async_=True, timeout=None,
async_heartbeats_enabled=False):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()
@ -92,7 +93,8 @@ def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx, target=None,
exec_ctx=exec_ctx,
target=target,
async_=async_,
timeout=timeout
timeout=timeout,
async_heartbeats_enabled=async_heartbeats_enabled
)
@ -183,7 +185,8 @@ class EnvironmentTest(base.EngineTestCase):
'action_execution_id': a_ex.id,
},
target=TARGET,
timeout=None
timeout=None,
async_heartbeats_enabled=False,
)
def test_subworkflow_env_task_input(self):

View File

@ -25,7 +25,8 @@ from mistral.workflow import states
def _run_at_target(action, action_ex_id, safe_rerun, exec_ctx,
target=None, async_=True, timeout=None):
target=None, async_=True, timeout=None,
async_heartbeats_enabled=False):
# We'll just call executor directly for testing purposes.
executor = d_exe.DefaultExecutor()