Make action heartbeats work for all executor types

* Previously action hearbeats didn't work in case of using local
  executors because the component responsible for sending heartbeats
  was started by the executor RPC server which doesn't make sense to
  initialize for a local executor. This patch refactors the code
  so that now heartbeats get sent for any type of executors. For
  local executors it is also useful because a cluster node that
  runs an engine and a local executor may also crash. With this
  change, remaining cluster nodes will be able to understand that
  the action will never complete and one of them will time it out.
  If all is fine with the node where the local executor is running
  then heartbeats will be sent normally and the action won't time
  out. Before this change, in case of local executors a long running
  action would always time out after a configured amount of time
  (by default, 60 mins) just because local executors never sent
  heartbeats.
* Made a lot of renamings to clearly see what component is
  responsible for.
* Wrote the tests that check the heartbeat sender, both positive
  and negative scenarios for local and remote executor types.

Closes-Bug: #1852722

Change-Id: I4d0fdff54de9bee70aeaf10a4ef483ad7000840b
This commit is contained in:
Renat Akhmerov 2019-11-13 16:34:44 +07:00
parent 0e758e16e1
commit 7ec4f26744
11 changed files with 227 additions and 48 deletions

View File

@ -134,7 +134,7 @@ class Engine(object):
raise NotImplementedError
@abc.abstractmethod
def report_running_actions(self, action_ex_ids):
def process_action_heartbeats(self, action_ex_ids):
"""Receives the heartbeat about the running actions.
:param action_ex_ids: The action execution ids.

View File

@ -223,14 +223,17 @@ class DefaultEngine(base.Engine):
@db_utils.retry_on_db_error
@post_tx_queue.run
def report_running_actions(self, action_ex_ids):
def process_action_heartbeats(self, action_ex_ids):
with db_api.transaction():
for exec_id in action_ex_ids:
try:
db_api.update_action_execution_heartbeat(exec_id)
except exceptions.DBEntityNotFoundError:
LOG.debug("Action execution heartbeat update failed. {}"
.format(exec_id), exc_info=True)
LOG.debug(
"Action execution heartbeat update failed. {}"
.format(exec_id),
exc_info=True
)
# Ignore this error and continue with the
# remaining ids.
pass

View File

@ -20,7 +20,8 @@ from mistral.engine import default_engine
from mistral.rpc import base as rpc
from mistral.scheduler import base as sched_base
from mistral.service import base as service_base
from mistral.services import action_execution_checker
from mistral.services import action_heartbeat_checker
from mistral.services import action_heartbeat_sender
from mistral.services import expiration_policy
from mistral.utils import profiler as profiler_utils
from mistral_lib import utils
@ -54,7 +55,17 @@ class EngineServer(service_base.MistralService):
self._expiration_policy_tg = expiration_policy.setup()
action_execution_checker.start()
action_heartbeat_checker.start()
# If the current engine instance uses a local action executor
# then we also need to initialize a heartbeat reporter for it.
# Heartbeats will be sent to the engine tier in the same way as
# with a remote executor. So if the current cluster node crashes
# in the middle of executing an action then one of the remaining
# engine instances will expire the action in a configured period
# of time.
if cfg.CONF.executor.type == 'local':
action_heartbeat_sender.start()
if self._setup_profiler:
profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
@ -71,7 +82,10 @@ class EngineServer(service_base.MistralService):
def stop(self, graceful=False):
super(EngineServer, self).stop(graceful)
action_execution_checker.stop(graceful)
action_heartbeat_checker.stop(graceful)
if cfg.CONF.executor.type == 'local':
action_heartbeat_sender.stop(graceful)
if self._scheduler:
self._scheduler.stop(graceful)
@ -275,7 +289,7 @@ class EngineServer(service_base.MistralService):
action_ex_ids
)
return self.engine.report_running_actions(action_ex_ids)
return self.engine.process_action_heartbeats(action_ex_ids)
def get_oslo_service(setup_profiler=True):

View File

@ -23,9 +23,9 @@ from mistral import context
from mistral import exceptions as exc
from mistral.executors import base
from mistral.rpc import clients as rpc
from mistral.services import action_heartbeat_sender
from mistral.utils import inspect_utils as i_u
LOG = logging.getLogger(__name__)
@ -57,6 +57,25 @@ class DefaultExecutor(base.Executor):
:return: Action result.
"""
try:
action_heartbeat_sender.add_action(action_ex_id)
return self._do_run_action(
action_cls_attrs,
action_cls_str,
action_ex_id,
execution_context,
params,
redelivered,
safe_rerun,
timeout
)
finally:
action_heartbeat_sender.remove_action(action_ex_id)
def _do_run_action(self, action_cls_attrs, action_cls_str, action_ex_id,
execution_context, params, redelivered, safe_rerun,
timeout):
def send_error_back(error_msg):
error_result = mistral_lib.Result(error=error_msg)

View File

@ -18,7 +18,7 @@ from mistral import config as cfg
from mistral.executors import default_executor as exe
from mistral.rpc import base as rpc
from mistral.service import base as service_base
from mistral.services import action_execution_reporter
from mistral.services import action_heartbeat_sender
from mistral.utils import profiler as profiler_utils
from mistral_lib import utils
@ -43,7 +43,7 @@ class ExecutorServer(service_base.MistralService):
def start(self):
super(ExecutorServer, self).start()
action_execution_reporter.start()
action_heartbeat_sender.start()
if self._setup_profiler:
profiler_utils.setup('mistral-executor', cfg.CONF.executor.host)
@ -60,7 +60,7 @@ class ExecutorServer(service_base.MistralService):
def stop(self, graceful=False):
super(ExecutorServer, self).stop(graceful)
action_execution_reporter.stop()
action_heartbeat_sender.stop()
if self._rpc_server:
self._rpc_server.stop(graceful)
@ -96,30 +96,25 @@ class ExecutorServer(service_base.MistralService):
redelivered = rpc_ctx.redelivered or False
try:
action_execution_reporter.add_action_ex_id(action_ex_id)
res = self.executor.run_action(
action_ex_id,
action_cls_str,
action_cls_attrs,
params,
safe_rerun,
execution_context,
redelivered,
timeout=timeout
)
res = self.executor.run_action(
action_ex_id,
action_cls_str,
action_cls_attrs,
params,
safe_rerun,
execution_context,
redelivered,
timeout=timeout
)
LOG.debug(
"Sending action result to engine"
" [action_ex_id=%s, action_cls=%s]",
action_ex_id,
action_cls_str
)
LOG.debug(
"Sending action result to engine"
" [action_ex_id=%s, action_cls=%s]",
action_ex_id,
action_cls_str
)
return res
finally:
action_execution_reporter.remove_action_ex_id(action_ex_id)
return res
def get_oslo_service(setup_profiler=True):

View File

@ -328,7 +328,7 @@ class EngineClient(eng.Engine):
)
@base.wrap_messaging_exception
def report_running_actions(self, action_ex_ids):
def process_action_heartbeats(self, action_ex_ids):
"""Receives action execution heartbeats.
:param action_ex_ids: Action execution ids.

View File

@ -32,32 +32,32 @@ _stopped = True
_running_actions = set()
def add_action_ex_id(action_ex_id):
def add_action(action_ex_id):
global _enabled
# With run-action there is no actions_ex_id assigned.
if action_ex_id and _enabled:
rpc.get_engine_client().report_running_actions([action_ex_id])
rpc.get_engine_client().process_action_heartbeats([action_ex_id])
_running_actions.add(action_ex_id)
def remove_action_ex_id(action_ex_id):
def remove_action(action_ex_id):
global _enabled
if action_ex_id and _enabled:
_running_actions.discard(action_ex_id)
def report_running_actions():
LOG.debug("Running heartbeat reporter...")
def send_action_heartbeats():
LOG.debug('Running heartbeat reporter...')
global _running_actions
if not _running_actions:
return
rpc.get_engine_client().report_running_actions(_running_actions)
rpc.get_engine_client().process_action_heartbeats(_running_actions)
def _loop():
@ -76,10 +76,10 @@ def _loop():
while not _stopped:
try:
report_running_actions()
send_action_heartbeats()
except Exception:
LOG.exception(
'Action execution reporter iteration failed'
'Action heartbeat sender iteration failed'
' due to an unexpected exception.'
)

View File

@ -27,14 +27,14 @@ from mistral.workflow import states
cfg.CONF.set_default('auth_enable', False, group='pecan')
class ActionHeartbeatTest(base.EngineTestCase):
class ActionHeartbeatCheckerTest(base.EngineTestCase):
def setUp(self):
# We need to override configuration values before starting engine.
self.override_config('check_interval', 1, 'action_heartbeat')
self.override_config('max_missed_heartbeats', 1, 'action_heartbeat')
self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat')
super(ActionHeartbeatTest, self).setUp()
super(ActionHeartbeatCheckerTest, self).setUp()
# Make sure actions are not sent to an executor.
@mock.patch.object(

View File

@ -0,0 +1,148 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.rpc import clients as rpc_clients
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
from mistral.workflow import states
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
class ActionHeartbeatSenderBaseTest(base.EngineTestCase):
def setUp(self):
# We need to set all required configuration values before starting
# an engine and an executor.
self.get_configuration()
super(ActionHeartbeatSenderBaseTest, self).setUp()
def get_configuration(self):
# We need to override configuration values before starting engine.
# Subclasses can override this method and add/change their own
# config options.
self.override_config('check_interval', 1, 'action_heartbeat')
self.override_config('max_missed_heartbeats', 1, 'action_heartbeat')
self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat')
def _do_long_action_success_test(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.sleep seconds=4
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
t_execs = wf_ex.task_executions
t_ex = self._assert_single_item(
t_execs,
name='task1',
state=states.SUCCESS
)
a_execs = db_api.get_action_executions(task_execution_id=t_ex.id)
self._assert_single_item(
a_execs,
name='std.sleep',
state=states.SUCCESS
)
# Disable the ability to send action heartbeats.
@mock.patch.object(
rpc_clients.EngineClient,
'process_action_heartbeats',
mock.MagicMock()
)
def _do_long_action_failure_test_with_disabled_sender(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.sleep seconds=4
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
t_execs = wf_ex.task_executions
t_ex = self._assert_single_item(
t_execs,
name='task1',
state=states.ERROR
)
a_execs = db_api.get_action_executions(task_execution_id=t_ex.id)
self._assert_single_item(
a_execs,
name='std.sleep',
state=states.ERROR
)
class ActionHeartbeatSenderLocalExecutorTest(ActionHeartbeatSenderBaseTest):
def get_configuration(self):
super(ActionHeartbeatSenderLocalExecutorTest, self).get_configuration()
self.override_config('type', 'local', 'executor')
def test_long_action_success(self):
self._do_long_action_success_test()
def test_long_action_failure_with_disabled_sender(self):
self._do_long_action_failure_test_with_disabled_sender()
class ActionHeartbeatSenderRemoteExecutorTest(ActionHeartbeatSenderBaseTest):
def get_configuration(self):
super(
ActionHeartbeatSenderRemoteExecutorTest,
self
).get_configuration()
self.override_config('type', 'remote', 'executor')
def test_long_action_success(self):
self._do_long_action_success_test()
def test_long_action_failure_with_disabled_sender(self):
self._do_long_action_failure_test_with_disabled_sender()

View File

@ -656,9 +656,9 @@ class DefaultEngineTest(base.DbTestCase):
task_action_ex = action_execs[0]
self.engine.report_running_actions([])
self.engine.report_running_actions([None, None])
self.engine.report_running_actions([None, task_action_ex.id])
self.engine.process_action_heartbeats([])
self.engine.process_action_heartbeats([None, None])
self.engine.process_action_heartbeats([None, task_action_ex.id])
task_action_ex = db_api.get_action_execution(task_action_ex.id)