diff --git a/mistral/config.py b/mistral/config.py index 033f0f734..052fd44b4 100644 --- a/mistral/config.py +++ b/mistral/config.py @@ -421,19 +421,27 @@ action_heartbeat_opts = [ min=0, default=15, help=_('The maximum amount of missed heartbeats to be allowed. ' - 'If set to 0 then this feature won\'t be enabled. ' + 'If set to 0 then this feature is disabled. ' 'See check_interval for more details.') ), cfg.IntOpt( 'check_interval', min=0, default=20, - help=_('How often the action executions are checked (in seconds). ' - 'For example when check_interval = 10, check action ' + help=_('How often (in seconds) action executions are checked. ' + 'For example when check_interval is 10, check action ' 'executions every 10 seconds. When the checker runs it will ' 'transit all running action executions to error if the last ' 'heartbeat received is older than 10 * max_missed_heartbeats ' - 'seconds. If set to 0 then this feature won\'t be enabled.') + 'seconds. If set to 0 then this feature is disabled.') + ), + cfg.IntOpt( + 'batch_size', + min=0, + default=10, + help=_('The maximum number of action executions processed during ' + 'one iteration of action execution heartbeat checker. If set ' + 'to 0 then there is no limit.') ), cfg.IntOpt( 'first_heartbeat_timeout', diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py index 44fd64513..75fdfe531 100644 --- a/mistral/db/v2/api.py +++ b/mistral/db/v2/api.py @@ -451,8 +451,12 @@ def get_expired_executions(expiration_time, limit=None, columns=()): ) -def get_running_expired_sync_actions(expiration_time, session=None): - return IMPL.get_running_expired_sync_actions(expiration_time) +def get_running_expired_sync_action_executions(expiration_time, + limit, session=None): + return IMPL.get_running_expired_sync_action_executions( + expiration_time, + limit + ) def get_superfluous_executions(max_finished_executions, limit=None, diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py index ee19d503d..fcedaee60 100644 --- a/mistral/db/v2/sqlalchemy/api.py +++ b/mistral/db/v2/sqlalchemy/api.py @@ -1282,7 +1282,8 @@ def get_expired_executions(expiration_time, limit=None, columns=(), @b.session_aware() -def get_running_expired_sync_actions(expiration_time, session=None): +def get_running_expired_sync_action_executions(expiration_time, + limit, session=None): query = b.model_query(models.ActionExecution) query = query.filter( @@ -1291,6 +1292,9 @@ def get_running_expired_sync_actions(expiration_time, session=None): query = query.filter_by(is_sync=True) query = query.filter(models.ActionExecution.state == states.RUNNING) + if limit: + query.limit(limit) + return query.all() diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py index db8e20321..a76b2aadf 100644 --- a/mistral/engine/engine_server.py +++ b/mistral/engine/engine_server.py @@ -52,7 +52,7 @@ class EngineServer(service_base.MistralService): self._scheduler = scheduler.start() self._expiration_policy_tg = expiration_policy.setup() - action_execution_checker.setup() + action_execution_checker.start() if self._setup_profiler: profiler_utils.setup('mistral-engine', cfg.CONF.engine.host) @@ -69,6 +69,8 @@ class EngineServer(service_base.MistralService): def stop(self, graceful=False): super(EngineServer, self).stop(graceful) + action_execution_checker.stop(graceful) + if self._scheduler: scheduler.stop_scheduler(self._scheduler, graceful) diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_execution_checker.py index c888214ab..af4bb0cff 100644 --- a/mistral/services/action_execution_checker.py +++ b/mistral/services/action_execution_checker.py @@ -13,20 +13,23 @@ # limitations under the License. import datetime +import eventlet +import sys from mistral.db import utils as db_utils from mistral.db.v2 import api as db_api from mistral.engine import action_handler from mistral.engine import post_tx_queue -from mistral.services import scheduler from mistral import utils from mistral_lib import actions as mistral_lib from oslo_config import cfg from oslo_log import log as logging LOG = logging.getLogger(__name__) + CONF = cfg.CONF -SCHEDULER_KEY = 'handle_expired_actions_key' + +_stopped = True @db_utils.retry_on_db_error @@ -41,37 +44,60 @@ def handle_expired_actions(): seconds=max_missed * interval ) - try: - with db_api.transaction(): - action_exs = db_api.get_running_expired_sync_actions(exp_date) + with db_api.transaction(): + action_exs = db_api.get_running_expired_sync_action_executions( + exp_date, + CONF.action_heartbeat.batch_size + ) - LOG.debug("Found {} running and expired actions.".format( - len(action_exs)) + LOG.debug("Found {} running and expired actions.".format( + len(action_exs)) + ) + + if action_exs: + LOG.info( + "Actions executions to transit to error, because " + "heartbeat wasn't received: {}".format(action_exs) ) - if action_exs: - LOG.info( - "Actions executions to transit to error, because " - "heartbeat wasn't received: {}".format(action_exs) + for action_ex in action_exs: + result = mistral_lib.Result( + error="Heartbeat wasn't received." ) - for action_ex in action_exs: - result = mistral_lib.Result( - error="Heartbeat wasn't received." - ) - - action_handler.on_action_complete(action_ex, result) - finally: - schedule(interval) + action_handler.on_action_complete(action_ex, result) -def setup(): +def _loop(): + global _stopped + + while not _stopped: + try: + handle_expired_actions() + except Exception: + LOG.exception( + 'Action execution checker iteration failed' + ' due to unexpected exception.' + ) + + # For some mysterious reason (probably eventlet related) + # the exception is not cleared from the context automatically. + # This results in subsequent log.warning calls to show invalid + # info. + if sys.version_info < (3,): + sys.exc_clear() + + eventlet.sleep(CONF.action_heartbeat.check_interval) + + +def start(): interval = CONF.action_heartbeat.check_interval max_missed = CONF.action_heartbeat.max_missed_heartbeats + enabled = interval and max_missed if not enabled: - LOG.info("Action heartbeat reporting disabled.") + LOG.info("Action heartbeat reporting is disabled.") return @@ -83,13 +109,14 @@ def setup(): "heartbeats. ({} seconds)".format(wait_time) ) - schedule(wait_time) + global _stopped + + _stopped = False + + eventlet.spawn_after(wait_time, _loop) -def schedule(run_after): - scheduler.schedule_call( - None, - 'mistral.services.action_execution_checker.handle_expired_actions', - run_after=run_after, - key=SCHEDULER_KEY - ) +def stop(graceful=False): + global _stopped + + _stopped = True diff --git a/mistral/tests/unit/engine/test_action_heartbeat.py b/mistral/tests/unit/engine/test_action_heartbeat.py new file mode 100644 index 000000000..87a45119f --- /dev/null +++ b/mistral/tests/unit/engine/test_action_heartbeat.py @@ -0,0 +1,79 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from oslo_config import cfg + +from mistral.db.v2 import api as db_api +from mistral.rpc import clients as rpc_clients +from mistral.services import workflows as wf_service +from mistral.tests.unit.engine import base +from mistral.workflow import states + + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +class ActionHeartbeatTest(base.EngineTestCase): + def setUp(self): + # We need to override configuration values before starting engine. + self.override_config('check_interval', 1, 'action_heartbeat') + self.override_config('max_missed_heartbeats', 1, 'action_heartbeat') + self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat') + + super(ActionHeartbeatTest, self).setUp() + + # Make sure actions are not sent to an executor. + @mock.patch.object( + rpc_clients.ExecutorClient, + 'run_action', + mock.MagicMock() + ) + def test_fail_action_with_missing_heartbeats(self): + wf_text = """--- + version: '2.0' + + wf: + tasks: + task1: + action: std.noop + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf') + + # The workflow should fail because the action of "task1" should be + # failed automatically by the action execution heartbeat checker. + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + t_execs = wf_ex.task_executions + + t_ex = self._assert_single_item( + t_execs, + name='task1', + state=states.ERROR + ) + + a_execs = db_api.get_action_executions(task_execution_id=t_ex.id) + + self._assert_single_item( + a_execs, + name='std.noop', + state=states.ERROR + ) diff --git a/releasenotes/notes/refactor_action_heartbeats_without_scheduler-9c3500d6a2b25a4d.yaml b/releasenotes/notes/refactor_action_heartbeats_without_scheduler-9c3500d6a2b25a4d.yaml new file mode 100644 index 000000000..d54864b30 --- /dev/null +++ b/releasenotes/notes/refactor_action_heartbeats_without_scheduler-9c3500d6a2b25a4d.yaml @@ -0,0 +1,12 @@ +--- +fixes: + - | + Action heartbeat checker was using scheduler to process expired action + executions periodically. The side effect was that upon system reboot + there may have been duplicating delayed calls in the database. So over + time, the number of such calls could be significant and those jobs could + even affect performance. This has now been fixed with regular threads + without using scheduler at all. Additionally, the new configuration + property "batch_size" has been added under the group "action_heartbeat" + to control the maximum number of action executions processed during one + iteration of the action execution heartbeat checker.