Refactor action execution checker without using scheduler

* Removed using scheduler from action execution heartbeat checker
  in favor of regular threads.
* Added the new config options "batch_size" under [action_heartbeat]
  group to limit a number of action executions being processed during
  one iteration the checker.
* Added a test checking that an action execution is automatically
  failed by the heartbeat checker.

Closes-Bug: #1802065
Change-Id: I18c0c2c3159b9294c8af96c93c65a6edfc1de1a1
This commit is contained in:
Renat Akhmerov 2018-11-07 15:00:40 +07:00
parent 3b4136ff1e
commit 2d74e6ebac
7 changed files with 173 additions and 37 deletions

View File

@ -421,19 +421,27 @@ action_heartbeat_opts = [
min=0,
default=15,
help=_('The maximum amount of missed heartbeats to be allowed. '
'If set to 0 then this feature won\'t be enabled. '
'If set to 0 then this feature is disabled. '
'See check_interval for more details.')
),
cfg.IntOpt(
'check_interval',
min=0,
default=20,
help=_('How often the action executions are checked (in seconds). '
'For example when check_interval = 10, check action '
help=_('How often (in seconds) action executions are checked. '
'For example when check_interval is 10, check action '
'executions every 10 seconds. When the checker runs it will '
'transit all running action executions to error if the last '
'heartbeat received is older than 10 * max_missed_heartbeats '
'seconds. If set to 0 then this feature won\'t be enabled.')
'seconds. If set to 0 then this feature is disabled.')
),
cfg.IntOpt(
'batch_size',
min=0,
default=10,
help=_('The maximum number of action executions processed during '
'one iteration of action execution heartbeat checker. If set '
'to 0 then there is no limit.')
),
cfg.IntOpt(
'first_heartbeat_timeout',

View File

@ -451,8 +451,12 @@ def get_expired_executions(expiration_time, limit=None, columns=()):
)
def get_running_expired_sync_actions(expiration_time, session=None):
return IMPL.get_running_expired_sync_actions(expiration_time)
def get_running_expired_sync_action_executions(expiration_time,
limit, session=None):
return IMPL.get_running_expired_sync_action_executions(
expiration_time,
limit
)
def get_superfluous_executions(max_finished_executions, limit=None,

View File

@ -1282,7 +1282,8 @@ def get_expired_executions(expiration_time, limit=None, columns=(),
@b.session_aware()
def get_running_expired_sync_actions(expiration_time, session=None):
def get_running_expired_sync_action_executions(expiration_time,
limit, session=None):
query = b.model_query(models.ActionExecution)
query = query.filter(
@ -1291,6 +1292,9 @@ def get_running_expired_sync_actions(expiration_time, session=None):
query = query.filter_by(is_sync=True)
query = query.filter(models.ActionExecution.state == states.RUNNING)
if limit:
query.limit(limit)
return query.all()

View File

@ -52,7 +52,7 @@ class EngineServer(service_base.MistralService):
self._scheduler = scheduler.start()
self._expiration_policy_tg = expiration_policy.setup()
action_execution_checker.setup()
action_execution_checker.start()
if self._setup_profiler:
profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
@ -69,6 +69,8 @@ class EngineServer(service_base.MistralService):
def stop(self, graceful=False):
super(EngineServer, self).stop(graceful)
action_execution_checker.stop(graceful)
if self._scheduler:
scheduler.stop_scheduler(self._scheduler, graceful)

View File

@ -13,20 +13,23 @@
# limitations under the License.
import datetime
import eventlet
import sys
from mistral.db import utils as db_utils
from mistral.db.v2 import api as db_api
from mistral.engine import action_handler
from mistral.engine import post_tx_queue
from mistral.services import scheduler
from mistral import utils
from mistral_lib import actions as mistral_lib
from oslo_config import cfg
from oslo_log import log as logging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
SCHEDULER_KEY = 'handle_expired_actions_key'
_stopped = True
@db_utils.retry_on_db_error
@ -41,37 +44,60 @@ def handle_expired_actions():
seconds=max_missed * interval
)
try:
with db_api.transaction():
action_exs = db_api.get_running_expired_sync_actions(exp_date)
with db_api.transaction():
action_exs = db_api.get_running_expired_sync_action_executions(
exp_date,
CONF.action_heartbeat.batch_size
)
LOG.debug("Found {} running and expired actions.".format(
len(action_exs))
LOG.debug("Found {} running and expired actions.".format(
len(action_exs))
)
if action_exs:
LOG.info(
"Actions executions to transit to error, because "
"heartbeat wasn't received: {}".format(action_exs)
)
if action_exs:
LOG.info(
"Actions executions to transit to error, because "
"heartbeat wasn't received: {}".format(action_exs)
for action_ex in action_exs:
result = mistral_lib.Result(
error="Heartbeat wasn't received."
)
for action_ex in action_exs:
result = mistral_lib.Result(
error="Heartbeat wasn't received."
)
action_handler.on_action_complete(action_ex, result)
finally:
schedule(interval)
action_handler.on_action_complete(action_ex, result)
def setup():
def _loop():
global _stopped
while not _stopped:
try:
handle_expired_actions()
except Exception:
LOG.exception(
'Action execution checker iteration failed'
' due to unexpected exception.'
)
# For some mysterious reason (probably eventlet related)
# the exception is not cleared from the context automatically.
# This results in subsequent log.warning calls to show invalid
# info.
if sys.version_info < (3,):
sys.exc_clear()
eventlet.sleep(CONF.action_heartbeat.check_interval)
def start():
interval = CONF.action_heartbeat.check_interval
max_missed = CONF.action_heartbeat.max_missed_heartbeats
enabled = interval and max_missed
if not enabled:
LOG.info("Action heartbeat reporting disabled.")
LOG.info("Action heartbeat reporting is disabled.")
return
@ -83,13 +109,14 @@ def setup():
"heartbeats. ({} seconds)".format(wait_time)
)
schedule(wait_time)
global _stopped
_stopped = False
eventlet.spawn_after(wait_time, _loop)
def schedule(run_after):
scheduler.schedule_call(
None,
'mistral.services.action_execution_checker.handle_expired_actions',
run_after=run_after,
key=SCHEDULER_KEY
)
def stop(graceful=False):
global _stopped
_stopped = True

View File

@ -0,0 +1,79 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from mistral.db.v2 import api as db_api
from mistral.rpc import clients as rpc_clients
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
from mistral.workflow import states
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
class ActionHeartbeatTest(base.EngineTestCase):
def setUp(self):
# We need to override configuration values before starting engine.
self.override_config('check_interval', 1, 'action_heartbeat')
self.override_config('max_missed_heartbeats', 1, 'action_heartbeat')
self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat')
super(ActionHeartbeatTest, self).setUp()
# Make sure actions are not sent to an executor.
@mock.patch.object(
rpc_clients.ExecutorClient,
'run_action',
mock.MagicMock()
)
def test_fail_action_with_missing_heartbeats(self):
wf_text = """---
version: '2.0'
wf:
tasks:
task1:
action: std.noop
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
# The workflow should fail because the action of "task1" should be
# failed automatically by the action execution heartbeat checker.
self.await_workflow_error(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
t_execs = wf_ex.task_executions
t_ex = self._assert_single_item(
t_execs,
name='task1',
state=states.ERROR
)
a_execs = db_api.get_action_executions(task_execution_id=t_ex.id)
self._assert_single_item(
a_execs,
name='std.noop',
state=states.ERROR
)

View File

@ -0,0 +1,12 @@
---
fixes:
- |
Action heartbeat checker was using scheduler to process expired action
executions periodically. The side effect was that upon system reboot
there may have been duplicating delayed calls in the database. So over
time, the number of such calls could be significant and those jobs could
even affect performance. This has now been fixed with regular threads
without using scheduler at all. Additionally, the new configuration
property "batch_size" has been added under the group "action_heartbeat"
to control the maximum number of action executions processed during one
iteration of the action execution heartbeat checker.