A mechanism to close stuck running action executions
If an executor dies while running an action execution, then the execution will remain in RUNNING state (because the dead executor can't signal the error). Implements blueprint: action-execution-reporting Change-Id: I51b4db6aa321d0e53bbb85a74f8ebaea0376d22e
This commit is contained in:
parent
13b2b0fbaf
commit
ce18c0bf67
@ -131,6 +131,43 @@ directory.
|
||||
For more details see `policy.json file
|
||||
<https://docs.openstack.org/oslo.policy/latest/admin/policy-json-file.html>`_.
|
||||
|
||||
#. Modify the action execution reporting configuration if needed.
|
||||
|
||||
It is possible that actions stuck in *"RUNNING"* state, for example if the
|
||||
assigned executor dies or the message that signals the completion of the
|
||||
action is lost. This section describes a heartbeat based solution to close
|
||||
these forgotten action executions. The related configuration options are
|
||||
``max_missed_heartbeats`` and ``check_interval``. Note that if either
|
||||
of these options are *"0"* then the feature won't be enabled.
|
||||
|
||||
The default configuration is the following::
|
||||
|
||||
[action_heartbeat]
|
||||
max_missed_heartbeats = 15
|
||||
check_interval = 20
|
||||
first_heartbeat_timeout = 3600
|
||||
|
||||
*"check_interval = 20"*, so check action executions every
|
||||
20 seconds. When the checker runs it will transit all running action
|
||||
executions to error if the last heartbeat received is older than *"20 \*
|
||||
15"* seconds. Note that *"first_heartbeat_timeout = 3600"*, so the action
|
||||
execution won't be closed for 3600 seconds if no heartbeat was received for
|
||||
it.
|
||||
|
||||
- **max_missed_heartbeats**
|
||||
|
||||
Defines the maximum amount of missed heartbeats to be allowed. If the number
|
||||
of missed heartbeats exceeds this number, then the related action execution
|
||||
will be transited to *"ERROR"* state with cause *"Heartbeat wasn't received."*.
|
||||
|
||||
- **check_interval**
|
||||
|
||||
The interval between checks (in seconds).
|
||||
|
||||
- **first_heartbeat_timeout**
|
||||
|
||||
The grace period for the first heartbeat (in seconds).
|
||||
|
||||
#. Finally, try to run mistral engine and verify that it is running without
|
||||
any error::
|
||||
|
||||
|
@ -337,7 +337,7 @@ execution_expiration_policy_opts = [
|
||||
'evaluation_interval',
|
||||
help=_('How often will the executions be evaluated '
|
||||
'(in minutes). For example for value 120 the interval '
|
||||
'will be 2 hours (every 2 hours).'
|
||||
'will be 2 hours (every 2 hours). '
|
||||
'Note that only final state executions will be removed: '
|
||||
'( SUCCESS / ERROR / CANCELLED ).')
|
||||
),
|
||||
@ -351,12 +351,12 @@ execution_expiration_policy_opts = [
|
||||
cfg.IntOpt(
|
||||
'max_finished_executions',
|
||||
default=0,
|
||||
help=_('The maximum number of finished workflow executions'
|
||||
'to be stored. For example when max_finished_executions = 100,'
|
||||
'only the 100 latest finished executions will be preserved.'
|
||||
'This means that even unexpired executions are eligible'
|
||||
'for deletion, to decrease the number of executions in the'
|
||||
'database. The default value is 0. If it is set to 0,'
|
||||
help=_('The maximum number of finished workflow executions '
|
||||
'to be stored. For example when max_finished_executions = 100, '
|
||||
'only the 100 latest finished executions will be preserved. '
|
||||
'This means that even unexpired executions are eligible '
|
||||
'for deletion, to decrease the number of executions in the '
|
||||
'database. The default value is 0. If it is set to 0, '
|
||||
'this constraint won\'t be applied.')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
@ -364,11 +364,44 @@ execution_expiration_policy_opts = [
|
||||
default=0,
|
||||
help=_('Size of batch of expired executions to be deleted.'
|
||||
'The default value is 0. If it is set to 0, '
|
||||
'size of batch is total number of expired executions'
|
||||
'size of batch is total number of expired executions '
|
||||
'that is going to be deleted.')
|
||||
)
|
||||
]
|
||||
|
||||
action_heartbeat_opts = [
|
||||
cfg.IntOpt(
|
||||
'max_missed_heartbeats',
|
||||
min=0,
|
||||
default=15,
|
||||
help=_('The maximum amount of missed heartbeats to be allowed. '
|
||||
'If set to 0 then this feature won\'t be enabled. '
|
||||
'See check_interval for more details.')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'check_interval',
|
||||
min=0,
|
||||
default=20,
|
||||
help=_('How often the action executions are checked (in seconds). '
|
||||
'For example when check_interval = 10, check action '
|
||||
'executions every 10 seconds. When the checker runs it will '
|
||||
'transit all running action executions to error if the last '
|
||||
'heartbeat received is older than 10 * max_missed_heartbeats '
|
||||
'seconds. If set to 0 then this feature won\'t be enabled.')
|
||||
),
|
||||
cfg.IntOpt(
|
||||
'first_heartbeat_timeout',
|
||||
min=0,
|
||||
default=3600,
|
||||
help=_('The first heartbeat is handled differently, to provide a '
|
||||
'grace period in case there is no available executor to handle '
|
||||
'the action execution. For example when '
|
||||
'first_heartbeat_timeout = 3600, wait 3600 seconds before '
|
||||
'closing the action executions that never received a heartbeat.'
|
||||
)
|
||||
)
|
||||
]
|
||||
|
||||
coordination_opts = [
|
||||
cfg.StrOpt(
|
||||
'backend_url',
|
||||
@ -514,6 +547,7 @@ NOTIFIER_GROUP = 'notifier'
|
||||
PECAN_GROUP = 'pecan'
|
||||
COORDINATION_GROUP = 'coordination'
|
||||
EXECUTION_EXPIRATION_POLICY_GROUP = 'execution_expiration_policy'
|
||||
ACTION_HEARTBEAT_GROUP = 'action_heartbeat'
|
||||
PROFILER_GROUP = profiler.list_opts()[0][0]
|
||||
KEYCLOAK_OIDC_GROUP = "keycloak_oidc"
|
||||
OPENSTACK_ACTIONS_GROUP = 'openstack_actions'
|
||||
@ -536,6 +570,10 @@ CONF.register_opts(
|
||||
execution_expiration_policy_opts,
|
||||
group=EXECUTION_EXPIRATION_POLICY_GROUP
|
||||
)
|
||||
CONF.register_opts(
|
||||
action_heartbeat_opts,
|
||||
group=ACTION_HEARTBEAT_GROUP
|
||||
)
|
||||
CONF.register_opts(event_engine_opts, group=EVENT_ENGINE_GROUP)
|
||||
CONF.register_opts(notifier_opts, group=NOTIFIER_GROUP)
|
||||
CONF.register_opts(pecan_opts, group=PECAN_GROUP)
|
||||
@ -591,6 +629,7 @@ def list_opts():
|
||||
(KEYCLOAK_OIDC_GROUP, keycloak_oidc_opts),
|
||||
(OPENSTACK_ACTIONS_GROUP, openstack_actions_opts),
|
||||
(YAQL_GROUP, yaql_opts),
|
||||
(ACTION_HEARTBEAT_GROUP, action_heartbeat_opts),
|
||||
(None, default_group_opts)
|
||||
]
|
||||
|
||||
|
@ -0,0 +1,51 @@
|
||||
# Copyright 2018 OpenStack Foundation.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Add last_heartbeat to action execution
|
||||
|
||||
Revision ID: 027
|
||||
Revises: 026
|
||||
Create Date: 2018-09-05 16:49:50.342349
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '027'
|
||||
down_revision = '026'
|
||||
|
||||
from alembic import op
|
||||
import datetime
|
||||
from mistral import utils
|
||||
from oslo_config import cfg
|
||||
from sqlalchemy import Column, DateTime, Boolean
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
def upgrade():
|
||||
op.add_column(
|
||||
'action_executions_v2',
|
||||
Column(
|
||||
'last_heartbeat',
|
||||
DateTime,
|
||||
default=lambda: utils.utc_now_sec() + datetime.timedelta(
|
||||
seconds=CONF.action_heartbeat.first_heartbeat_timeout
|
||||
)
|
||||
)
|
||||
)
|
||||
op.add_column(
|
||||
'action_executions_v2',
|
||||
Column('is_sync', Boolean, default=None, nullable=True)
|
||||
)
|
@ -211,8 +211,8 @@ def delete_action_definitions(**kwargs):
|
||||
|
||||
# Action executions.
|
||||
|
||||
def get_action_execution(id, fields=()):
|
||||
return IMPL.get_action_execution(id, fields=fields)
|
||||
def get_action_execution(id, fields=(), insecure=False):
|
||||
return IMPL.get_action_execution(id, fields=fields, insecure=insecure)
|
||||
|
||||
|
||||
def load_action_execution(name, fields=()):
|
||||
@ -228,8 +228,8 @@ def create_action_execution(values):
|
||||
return IMPL.create_action_execution(values)
|
||||
|
||||
|
||||
def update_action_execution(id, values):
|
||||
return IMPL.update_action_execution(id, values)
|
||||
def update_action_execution(id, values, insecure=False):
|
||||
return IMPL.update_action_execution(id, values, insecure)
|
||||
|
||||
|
||||
def create_or_update_action_execution(id, values):
|
||||
@ -413,6 +413,10 @@ def get_expired_executions(expiration_time, limit=None, columns=()):
|
||||
)
|
||||
|
||||
|
||||
def get_running_expired_sync_actions(expiration_time, session=None):
|
||||
return IMPL.get_running_expired_sync_actions(expiration_time)
|
||||
|
||||
|
||||
def get_superfluous_executions(max_finished_executions, limit=None,
|
||||
columns=()):
|
||||
return IMPL.get_superfluous_executions(
|
||||
|
@ -669,8 +669,9 @@ def delete_action_definitions(session=None, **kwargs):
|
||||
# Action executions.
|
||||
|
||||
@b.session_aware()
|
||||
def get_action_execution(id, fields=(), session=None):
|
||||
a_ex = _get_db_object_by_id(models.ActionExecution, id, columns=fields)
|
||||
def get_action_execution(id, insecure=False, fields=(), session=None):
|
||||
a_ex = _get_db_object_by_id(models.ActionExecution, id, insecure=insecure,
|
||||
columns=fields)
|
||||
|
||||
if not a_ex:
|
||||
raise exc.DBEntityNotFoundError(
|
||||
@ -707,8 +708,8 @@ def create_action_execution(values, session=None):
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def update_action_execution(id, values, session=None):
|
||||
a_ex = get_action_execution(id)
|
||||
def update_action_execution(id, values, insecure=False, session=None):
|
||||
a_ex = get_action_execution(id, insecure)
|
||||
|
||||
a_ex.update(values.copy())
|
||||
|
||||
@ -1098,6 +1099,18 @@ def get_expired_executions(expiration_time, limit=None, columns=(),
|
||||
return query.all()
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def get_running_expired_sync_actions(expiration_time, session=None):
|
||||
query = b.model_query(models.ActionExecution)
|
||||
query = query.filter(
|
||||
models.ActionExecution.last_heartbeat < expiration_time
|
||||
)
|
||||
query = query.filter_by(is_sync=True)
|
||||
query = query.filter(models.ActionExecution.state == states.RUNNING)
|
||||
|
||||
return query.all()
|
||||
|
||||
|
||||
@b.session_aware()
|
||||
def get_superfluous_executions(max_finished_executions, limit=None, columns=(),
|
||||
session=None):
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
import hashlib
|
||||
import json
|
||||
import sys
|
||||
@ -33,6 +34,7 @@ from mistral import utils
|
||||
|
||||
# Definition objects.
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -195,6 +197,13 @@ class ActionExecution(Execution):
|
||||
accepted = sa.Column(sa.Boolean(), default=False)
|
||||
input = sa.Column(st.JsonLongDictType(), nullable=True)
|
||||
output = sa.orm.deferred(sa.Column(st.JsonLongDictType(), nullable=True))
|
||||
last_heartbeat = sa.Column(
|
||||
sa.DateTime,
|
||||
default=lambda: utils.utc_now_sec() + datetime.timedelta(
|
||||
seconds=CONF.action_heartbeat.first_heartbeat_timeout
|
||||
)
|
||||
)
|
||||
is_sync = sa.Column(sa.Boolean(), default=None, nullable=True)
|
||||
|
||||
|
||||
class WorkflowExecution(Execution):
|
||||
|
@ -150,7 +150,7 @@ class Action(object):
|
||||
"""
|
||||
return True
|
||||
|
||||
def _create_action_execution(self, input_dict, runtime_ctx,
|
||||
def _create_action_execution(self, input_dict, runtime_ctx, is_sync,
|
||||
desc='', action_ex_id=None):
|
||||
action_ex_id = action_ex_id or utils.generate_unicode_uuid()
|
||||
|
||||
@ -161,7 +161,8 @@ class Action(object):
|
||||
'state': states.RUNNING,
|
||||
'input': input_dict,
|
||||
'runtime_context': runtime_ctx,
|
||||
'description': desc
|
||||
'description': desc,
|
||||
'is_sync': is_sync
|
||||
}
|
||||
|
||||
if self.task_ex:
|
||||
@ -246,6 +247,7 @@ class PythonAction(Action):
|
||||
self._create_action_execution(
|
||||
self._prepare_input(input_dict),
|
||||
self._prepare_runtime_context(index, safe_rerun),
|
||||
self.is_sync(input_dict),
|
||||
desc=desc,
|
||||
action_ex_id=action_ex_id
|
||||
)
|
||||
@ -278,6 +280,7 @@ class PythonAction(Action):
|
||||
self._create_action_execution(
|
||||
input_dict,
|
||||
runtime_ctx,
|
||||
self.is_sync(input_dict),
|
||||
desc=desc,
|
||||
action_ex_id=action_ex_id
|
||||
)
|
||||
|
@ -131,6 +131,14 @@ class Engine(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
"""Receives the heartbeat about the running actions.
|
||||
|
||||
:param action_ex_ids: The action execution ids.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class TaskPolicy(object):
|
||||
|
@ -16,6 +16,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from osprofiler import profiler
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
@ -34,6 +35,8 @@ from mistral.workflow import states
|
||||
# options required at top level of this __init__.py are not imported before
|
||||
# the submodules are referenced.
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DefaultEngine(base.Engine):
|
||||
@db_utils.retry_on_db_error
|
||||
@ -122,6 +125,7 @@ class DefaultEngine(base.Engine):
|
||||
'input': action_input,
|
||||
'output': output.to_dict(),
|
||||
'state': state,
|
||||
'is_sync': is_action_sync
|
||||
}
|
||||
|
||||
return db_api.create_action_execution(values)
|
||||
@ -201,3 +205,22 @@ class DefaultEngine(base.Engine):
|
||||
def rollback_workflow(self, wf_ex_id):
|
||||
# TODO(rakhmerov): Implement.
|
||||
raise NotImplementedError
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
with db_api.transaction():
|
||||
now = u.utc_now_sec()
|
||||
for exec_id in action_ex_ids:
|
||||
try:
|
||||
db_api.update_action_execution(
|
||||
exec_id,
|
||||
{"last_heartbeat": now},
|
||||
insecure=True
|
||||
)
|
||||
except exceptions.DBEntityNotFoundError:
|
||||
LOG.debug("Action execution heartbeat update failed. {}"
|
||||
.format(exec_id), exc_info=True)
|
||||
# Ignore this error and continue with the
|
||||
# remaining ids.
|
||||
pass
|
||||
|
@ -19,6 +19,7 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import default_engine
|
||||
from mistral.rpc import base as rpc
|
||||
from mistral.service import base as service_base
|
||||
from mistral.services import action_execution_checker
|
||||
from mistral.services import expiration_policy
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
@ -50,6 +51,7 @@ class EngineServer(service_base.MistralService):
|
||||
|
||||
self._scheduler = scheduler.start()
|
||||
self._expiration_policy_tg = expiration_policy.setup()
|
||||
action_execution_checker.setup()
|
||||
|
||||
if self._setup_profiler:
|
||||
profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
|
||||
@ -258,6 +260,19 @@ class EngineServer(service_base.MistralService):
|
||||
|
||||
return self.engine.rollback_workflow(wf_ex_id)
|
||||
|
||||
def report_running_actions(self, rpc_ctx, action_ex_ids):
|
||||
"""Receives calls over RPC to receive action execution heartbeats.
|
||||
|
||||
:param rpc_ctx: RPC request context.
|
||||
:param action_ex_ids: Action execution ids.
|
||||
"""
|
||||
LOG.info(
|
||||
"Received RPC request 'report_running_actions'[action_ex_ids=%s]",
|
||||
action_ex_ids
|
||||
)
|
||||
|
||||
return self.engine.report_running_actions(action_ex_ids)
|
||||
|
||||
|
||||
def get_oslo_service(setup_profiler=True):
|
||||
return EngineServer(
|
||||
|
@ -18,9 +18,11 @@ from mistral import config as cfg
|
||||
from mistral.executors import default_executor as exe
|
||||
from mistral.rpc import base as rpc
|
||||
from mistral.service import base as service_base
|
||||
from mistral.services import action_execution_reporter
|
||||
from mistral import utils
|
||||
from mistral.utils import profiler as profiler_utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -37,10 +39,15 @@ class ExecutorServer(service_base.MistralService):
|
||||
|
||||
self.executor = executor
|
||||
self._rpc_server = None
|
||||
self._reporter = None
|
||||
self._aer = None
|
||||
|
||||
def start(self):
|
||||
super(ExecutorServer, self).start()
|
||||
|
||||
self._aer = action_execution_reporter.ActionExecutionReporter(CONF)
|
||||
self._reporter = action_execution_reporter.setup(self._aer)
|
||||
|
||||
if self._setup_profiler:
|
||||
profiler_utils.setup('mistral-executor', cfg.CONF.executor.host)
|
||||
|
||||
@ -56,6 +63,9 @@ class ExecutorServer(service_base.MistralService):
|
||||
def stop(self, graceful=False):
|
||||
super(ExecutorServer, self).stop(graceful)
|
||||
|
||||
if self._reporter:
|
||||
self._reporter.stop(graceful)
|
||||
|
||||
if self._rpc_server:
|
||||
self._rpc_server.stop(graceful)
|
||||
|
||||
@ -90,16 +100,21 @@ class ExecutorServer(service_base.MistralService):
|
||||
|
||||
redelivered = rpc_ctx.redelivered or False
|
||||
|
||||
return self.executor.run_action(
|
||||
action_ex_id,
|
||||
action_cls_str,
|
||||
action_cls_attrs,
|
||||
params,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
redelivered,
|
||||
timeout=timeout
|
||||
)
|
||||
try:
|
||||
self._aer.add_action_ex_id(action_ex_id)
|
||||
|
||||
return self.executor.run_action(
|
||||
action_ex_id,
|
||||
action_cls_str,
|
||||
action_cls_attrs,
|
||||
params,
|
||||
safe_rerun,
|
||||
execution_context,
|
||||
redelivered,
|
||||
timeout=timeout
|
||||
)
|
||||
finally:
|
||||
self._aer.remove_action_ex_id(action_ex_id)
|
||||
|
||||
|
||||
def get_oslo_service(setup_profiler=True):
|
||||
|
@ -320,6 +320,18 @@ class EngineClient(eng.Engine):
|
||||
wf_ex_id=wf_ex_id
|
||||
)
|
||||
|
||||
@base.wrap_messaging_exception
|
||||
def report_running_actions(self, action_ex_ids):
|
||||
"""Receives action execution heartbeats.
|
||||
|
||||
:param action_ex_ids: Action execution ids.
|
||||
"""
|
||||
return self._client.async_call(
|
||||
auth_ctx.ctx(),
|
||||
'report_running_actions',
|
||||
action_ex_ids=action_ex_ids
|
||||
)
|
||||
|
||||
|
||||
class ExecutorClient(exe.Executor):
|
||||
"""RPC Executor client."""
|
||||
|
83
mistral/services/action_execution_checker.py
Normal file
83
mistral/services/action_execution_checker.py
Normal file
@ -0,0 +1,83 @@
|
||||
# Copyright 2018 Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
|
||||
from mistral.db import utils as db_utils
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.engine import action_handler
|
||||
from mistral.engine import action_queue
|
||||
from mistral.services import scheduler
|
||||
from mistral import utils
|
||||
from mistral_lib import actions as mistral_lib
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
SCHEDULER_KEY = 'handle_expired_actions_key'
|
||||
|
||||
|
||||
@db_utils.retry_on_db_error
|
||||
@action_queue.process
|
||||
def handle_expired_actions():
|
||||
LOG.debug("Running heartbeat checker...")
|
||||
|
||||
try:
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
exp_date = utils.utc_now_sec() - datetime.timedelta(
|
||||
seconds=max_missed * interval
|
||||
)
|
||||
|
||||
with db_api.transaction():
|
||||
action_exs = db_api.get_running_expired_sync_actions(exp_date)
|
||||
LOG.debug("Found {} running and expired actions.".format(
|
||||
len(action_exs))
|
||||
)
|
||||
if action_exs:
|
||||
LOG.info("Actions executions to transit to error, because "
|
||||
"heartbeat wasn't received: {}".format(action_exs))
|
||||
for action_ex in action_exs:
|
||||
result = mistral_lib.Result(
|
||||
error="Heartbeat wasn't received."
|
||||
)
|
||||
action_handler.on_action_complete(action_ex, result)
|
||||
finally:
|
||||
schedule(interval)
|
||||
|
||||
|
||||
def setup():
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
enabled = interval and max_missed
|
||||
if not enabled:
|
||||
LOG.info("Action heartbeat reporting disabled.")
|
||||
return
|
||||
|
||||
wait_time = interval * max_missed
|
||||
LOG.debug("First run of action execution checker, wait before "
|
||||
"checking to make sure executors have time to send "
|
||||
"heartbeats. ({} seconds)".format(wait_time))
|
||||
|
||||
schedule(wait_time)
|
||||
|
||||
|
||||
def schedule(run_after):
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
'mistral.services.action_execution_checker.handle_expired_actions',
|
||||
run_after=run_after,
|
||||
key=SCHEDULER_KEY
|
||||
)
|
93
mistral/services/action_execution_reporter.py
Normal file
93
mistral/services/action_execution_reporter.py
Normal file
@ -0,0 +1,93 @@
|
||||
# Copyright 2018 Nokia Networks.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import periodic_task
|
||||
from oslo_service import threadgroup
|
||||
|
||||
from mistral import context as auth_ctx
|
||||
from mistral.rpc import clients as rpc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class ActionExecutionReporter(periodic_task.PeriodicTasks):
|
||||
"""The reporter that reports the running action executions."""
|
||||
|
||||
def __init__(self, conf):
|
||||
super(ActionExecutionReporter, self).__init__(conf)
|
||||
self._engine_client = rpc.get_engine_client()
|
||||
self._running_actions = set()
|
||||
|
||||
self.interval = CONF.action_heartbeat.check_interval
|
||||
self.max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
self.enabled = self.interval and self.max_missed
|
||||
|
||||
_periodic_task = periodic_task.periodic_task(
|
||||
spacing=self.interval,
|
||||
run_immediately=True
|
||||
)
|
||||
self.add_periodic_task(
|
||||
_periodic_task(report)
|
||||
)
|
||||
|
||||
def add_action_ex_id(self, action_ex_id):
|
||||
# With run-action there is no actions_ex_id assigned
|
||||
if action_ex_id and self.enabled:
|
||||
self._engine_client.report_running_actions([action_ex_id])
|
||||
self._running_actions.add(action_ex_id)
|
||||
|
||||
def remove_action_ex_id(self, action_ex_id):
|
||||
if action_ex_id and self.enabled:
|
||||
self._running_actions.discard(action_ex_id)
|
||||
|
||||
|
||||
def report(reporter, ctx):
|
||||
LOG.debug("Running heartbeat reporter...")
|
||||
|
||||
if not reporter._running_actions:
|
||||
return
|
||||
|
||||
auth_ctx.set_ctx(ctx)
|
||||
reporter._engine_client.report_running_actions(reporter._running_actions)
|
||||
|
||||
|
||||
def setup(action_execution_reporter):
|
||||
interval = CONF.action_heartbeat.check_interval
|
||||
max_missed = CONF.action_heartbeat.max_missed_heartbeats
|
||||
enabled = interval and max_missed
|
||||
if not enabled:
|
||||
LOG.info("Action heartbeat reporting disabled.")
|
||||
return None
|
||||
|
||||
tg = threadgroup.ThreadGroup()
|
||||
|
||||
ctx = auth_ctx.MistralContext(
|
||||
user=None,
|
||||
tenant=None,
|
||||
auth_token=None,
|
||||
is_admin=True
|
||||
)
|
||||
|
||||
tg.add_dynamic_timer(
|
||||
action_execution_reporter.run_periodic_tasks,
|
||||
initial_delay=None,
|
||||
periodic_interval_max=1,
|
||||
context=ctx
|
||||
)
|
||||
|
||||
return tg
|
@ -625,6 +625,41 @@ class DefaultEngineTest(base.DbTestCase):
|
||||
# TODO(akhmerov): Implement.
|
||||
pass
|
||||
|
||||
def test_report_running_actions(self):
|
||||
wf_input = {'param1': 'Hey', 'param2': 'Hi'}
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'wb.wf',
|
||||
'',
|
||||
wf_input=wf_input,
|
||||
description='my execution',
|
||||
task_name='task2'
|
||||
)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertEqual(1, len(task_execs))
|
||||
|
||||
task_ex = task_execs[0]
|
||||
|
||||
action_execs = db_api.get_action_executions(
|
||||
task_execution_id=task_ex.id
|
||||
)
|
||||
|
||||
task_action_ex = action_execs[0]
|
||||
|
||||
self.engine.report_running_actions([])
|
||||
self.engine.report_running_actions([None, None])
|
||||
self.engine.report_running_actions([None, task_action_ex.id])
|
||||
|
||||
task_action_ex = db_api.get_action_execution(task_action_ex.id)
|
||||
|
||||
self.assertIsNotNone(task_action_ex.last_heartbeat)
|
||||
|
||||
|
||||
class DefaultEngineWithTransportTest(eng_test_base.EngineTestCase):
|
||||
def test_engine_client_remote_error(self):
|
||||
|
@ -17,6 +17,7 @@ import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from mistral.actions import std_actions
|
||||
from mistral import config
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import workbooks as wb_service
|
||||
@ -32,7 +33,9 @@ from mistral_lib import actions as actions_base
|
||||
|
||||
# Use the set_default method to set value otherwise in certain test cases
|
||||
# the change in value is not permanent.
|
||||
cfg.CONF.set_default('auth_enable', False, group='pecan')
|
||||
cfg.CONF.set_default('auth_enable', False, group=config.PECAN_GROUP)
|
||||
cfg.CONF.set_default('max_missed_heartbeats', 0,
|
||||
group=config.ACTION_HEARTBEAT_GROUP)
|
||||
|
||||
WB = """
|
||||
---
|
||||
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- >
|
||||
[`blueprint action-execution-reporting <https://blueprints.launchpad.net/mistral/+spec/action-execution-reporting>`_]
|
||||
Introduced a mechanism to close action executions that stuck in RUNNING state.
|
Loading…
Reference in New Issue
Block a user