From 2d74e6ebacf29379850d29bbe9317e930ff2da99 Mon Sep 17 00:00:00 2001
From: Renat Akhmerov <renat.akhmerov@gmail.com>
Date: Wed, 7 Nov 2018 15:00:40 +0700
Subject: [PATCH] Refactor action execution checker without using scheduler

* Removed using scheduler from action execution heartbeat checker
  in favor of regular threads.
* Added the new config options "batch_size" under [action_heartbeat]
  group to limit a number of action executions being processed during
  one iteration the checker.
* Added a test checking that an action execution is automatically
  failed by the heartbeat checker.

Closes-Bug: #1802065
Change-Id: I18c0c2c3159b9294c8af96c93c65a6edfc1de1a1
---
 mistral/config.py                             | 16 +++-
 mistral/db/v2/api.py                          |  8 +-
 mistral/db/v2/sqlalchemy/api.py               |  6 +-
 mistral/engine/engine_server.py               |  4 +-
 mistral/services/action_execution_checker.py  | 85 ++++++++++++-------
 .../unit/engine/test_action_heartbeat.py      | 79 +++++++++++++++++
 ...ts_without_scheduler-9c3500d6a2b25a4d.yaml | 12 +++
 7 files changed, 173 insertions(+), 37 deletions(-)
 create mode 100644 mistral/tests/unit/engine/test_action_heartbeat.py
 create mode 100644 releasenotes/notes/refactor_action_heartbeats_without_scheduler-9c3500d6a2b25a4d.yaml

diff --git a/mistral/config.py b/mistral/config.py
index 033f0f734..052fd44b4 100644
--- a/mistral/config.py
+++ b/mistral/config.py
@@ -421,19 +421,27 @@ action_heartbeat_opts = [
         min=0,
         default=15,
         help=_('The maximum amount of missed heartbeats to be allowed. '
-               'If set to 0 then this feature won\'t be enabled. '
+               'If set to 0 then this feature is disabled. '
                'See check_interval for more details.')
     ),
     cfg.IntOpt(
         'check_interval',
         min=0,
         default=20,
-        help=_('How often the action executions are checked (in seconds). '
-               'For example when check_interval = 10, check action '
+        help=_('How often (in seconds) action executions are checked. '
+               'For example when check_interval is 10, check action '
                'executions every 10 seconds. When the checker runs it will '
                'transit all running action executions to error if the last '
                'heartbeat received is older than 10 * max_missed_heartbeats '
-               'seconds. If set to 0 then this feature won\'t be enabled.')
+               'seconds. If set to 0 then this feature is disabled.')
+    ),
+    cfg.IntOpt(
+        'batch_size',
+        min=0,
+        default=10,
+        help=_('The maximum number of action executions processed during '
+               'one iteration of action execution heartbeat checker. If set '
+               'to 0 then there is no limit.')
     ),
     cfg.IntOpt(
         'first_heartbeat_timeout',
diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py
index 44fd64513..75fdfe531 100644
--- a/mistral/db/v2/api.py
+++ b/mistral/db/v2/api.py
@@ -451,8 +451,12 @@ def get_expired_executions(expiration_time, limit=None, columns=()):
     )
 
 
-def get_running_expired_sync_actions(expiration_time, session=None):
-    return IMPL.get_running_expired_sync_actions(expiration_time)
+def get_running_expired_sync_action_executions(expiration_time,
+                                               limit, session=None):
+    return IMPL.get_running_expired_sync_action_executions(
+        expiration_time,
+        limit
+    )
 
 
 def get_superfluous_executions(max_finished_executions, limit=None,
diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py
index ee19d503d..fcedaee60 100644
--- a/mistral/db/v2/sqlalchemy/api.py
+++ b/mistral/db/v2/sqlalchemy/api.py
@@ -1282,7 +1282,8 @@ def get_expired_executions(expiration_time, limit=None, columns=(),
 
 
 @b.session_aware()
-def get_running_expired_sync_actions(expiration_time, session=None):
+def get_running_expired_sync_action_executions(expiration_time,
+                                               limit, session=None):
     query = b.model_query(models.ActionExecution)
 
     query = query.filter(
@@ -1291,6 +1292,9 @@ def get_running_expired_sync_actions(expiration_time, session=None):
     query = query.filter_by(is_sync=True)
     query = query.filter(models.ActionExecution.state == states.RUNNING)
 
+    if limit:
+        query.limit(limit)
+
     return query.all()
 
 
diff --git a/mistral/engine/engine_server.py b/mistral/engine/engine_server.py
index db8e20321..a76b2aadf 100644
--- a/mistral/engine/engine_server.py
+++ b/mistral/engine/engine_server.py
@@ -52,7 +52,7 @@ class EngineServer(service_base.MistralService):
         self._scheduler = scheduler.start()
         self._expiration_policy_tg = expiration_policy.setup()
 
-        action_execution_checker.setup()
+        action_execution_checker.start()
 
         if self._setup_profiler:
             profiler_utils.setup('mistral-engine', cfg.CONF.engine.host)
@@ -69,6 +69,8 @@ class EngineServer(service_base.MistralService):
     def stop(self, graceful=False):
         super(EngineServer, self).stop(graceful)
 
+        action_execution_checker.stop(graceful)
+
         if self._scheduler:
             scheduler.stop_scheduler(self._scheduler, graceful)
 
diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_execution_checker.py
index c888214ab..af4bb0cff 100644
--- a/mistral/services/action_execution_checker.py
+++ b/mistral/services/action_execution_checker.py
@@ -13,20 +13,23 @@
 #    limitations under the License.
 
 import datetime
+import eventlet
+import sys
 
 from mistral.db import utils as db_utils
 from mistral.db.v2 import api as db_api
 from mistral.engine import action_handler
 from mistral.engine import post_tx_queue
-from mistral.services import scheduler
 from mistral import utils
 from mistral_lib import actions as mistral_lib
 from oslo_config import cfg
 from oslo_log import log as logging
 
 LOG = logging.getLogger(__name__)
+
 CONF = cfg.CONF
-SCHEDULER_KEY = 'handle_expired_actions_key'
+
+_stopped = True
 
 
 @db_utils.retry_on_db_error
@@ -41,37 +44,60 @@ def handle_expired_actions():
         seconds=max_missed * interval
     )
 
-    try:
-        with db_api.transaction():
-            action_exs = db_api.get_running_expired_sync_actions(exp_date)
+    with db_api.transaction():
+        action_exs = db_api.get_running_expired_sync_action_executions(
+            exp_date,
+            CONF.action_heartbeat.batch_size
+        )
 
-            LOG.debug("Found {} running and expired actions.".format(
-                len(action_exs))
+        LOG.debug("Found {} running and expired actions.".format(
+            len(action_exs))
+        )
+
+        if action_exs:
+            LOG.info(
+                "Actions executions to transit to error, because "
+                "heartbeat wasn't received: {}".format(action_exs)
             )
 
-            if action_exs:
-                LOG.info(
-                    "Actions executions to transit to error, because "
-                    "heartbeat wasn't received: {}".format(action_exs)
+            for action_ex in action_exs:
+                result = mistral_lib.Result(
+                    error="Heartbeat wasn't received."
                 )
 
-                for action_ex in action_exs:
-                    result = mistral_lib.Result(
-                        error="Heartbeat wasn't received."
-                    )
-
-                    action_handler.on_action_complete(action_ex, result)
-    finally:
-        schedule(interval)
+                action_handler.on_action_complete(action_ex, result)
 
 
-def setup():
+def _loop():
+    global _stopped
+
+    while not _stopped:
+        try:
+            handle_expired_actions()
+        except Exception:
+            LOG.exception(
+                'Action execution checker iteration failed'
+                ' due to unexpected exception.'
+            )
+
+            # For some mysterious reason (probably eventlet related)
+            # the exception is not cleared from the context automatically.
+            # This results in subsequent log.warning calls to show invalid
+            # info.
+            if sys.version_info < (3,):
+                sys.exc_clear()
+
+        eventlet.sleep(CONF.action_heartbeat.check_interval)
+
+
+def start():
     interval = CONF.action_heartbeat.check_interval
     max_missed = CONF.action_heartbeat.max_missed_heartbeats
+
     enabled = interval and max_missed
 
     if not enabled:
-        LOG.info("Action heartbeat reporting disabled.")
+        LOG.info("Action heartbeat reporting is disabled.")
 
         return
 
@@ -83,13 +109,14 @@ def setup():
         "heartbeats. ({} seconds)".format(wait_time)
     )
 
-    schedule(wait_time)
+    global _stopped
+
+    _stopped = False
+
+    eventlet.spawn_after(wait_time, _loop)
 
 
-def schedule(run_after):
-    scheduler.schedule_call(
-        None,
-        'mistral.services.action_execution_checker.handle_expired_actions',
-        run_after=run_after,
-        key=SCHEDULER_KEY
-    )
+def stop(graceful=False):
+    global _stopped
+
+    _stopped = True
diff --git a/mistral/tests/unit/engine/test_action_heartbeat.py b/mistral/tests/unit/engine/test_action_heartbeat.py
new file mode 100644
index 000000000..87a45119f
--- /dev/null
+++ b/mistral/tests/unit/engine/test_action_heartbeat.py
@@ -0,0 +1,79 @@
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+
+import mock
+
+from oslo_config import cfg
+
+from mistral.db.v2 import api as db_api
+from mistral.rpc import clients as rpc_clients
+from mistral.services import workflows as wf_service
+from mistral.tests.unit.engine import base
+from mistral.workflow import states
+
+
+# Use the set_default method to set value otherwise in certain test cases
+# the change in value is not permanent.
+cfg.CONF.set_default('auth_enable', False, group='pecan')
+
+
+class ActionHeartbeatTest(base.EngineTestCase):
+    def setUp(self):
+        # We need to override configuration values before starting engine.
+        self.override_config('check_interval', 1, 'action_heartbeat')
+        self.override_config('max_missed_heartbeats', 1, 'action_heartbeat')
+        self.override_config('first_heartbeat_timeout', 0, 'action_heartbeat')
+
+        super(ActionHeartbeatTest, self).setUp()
+
+    # Make sure actions are not sent to an executor.
+    @mock.patch.object(
+        rpc_clients.ExecutorClient,
+        'run_action',
+        mock.MagicMock()
+    )
+    def test_fail_action_with_missing_heartbeats(self):
+        wf_text = """---
+        version: '2.0'
+
+        wf:
+          tasks:
+            task1:
+              action: std.noop
+        """
+
+        wf_service.create_workflows(wf_text)
+
+        wf_ex = self.engine.start_workflow('wf')
+
+        # The workflow should fail because the action of "task1" should be
+        # failed automatically by the action execution heartbeat checker.
+        self.await_workflow_error(wf_ex.id)
+
+        with db_api.transaction():
+            wf_ex = db_api.get_workflow_execution(wf_ex.id)
+
+            t_execs = wf_ex.task_executions
+
+            t_ex = self._assert_single_item(
+                t_execs,
+                name='task1',
+                state=states.ERROR
+            )
+
+            a_execs = db_api.get_action_executions(task_execution_id=t_ex.id)
+
+            self._assert_single_item(
+                a_execs,
+                name='std.noop',
+                state=states.ERROR
+            )
diff --git a/releasenotes/notes/refactor_action_heartbeats_without_scheduler-9c3500d6a2b25a4d.yaml b/releasenotes/notes/refactor_action_heartbeats_without_scheduler-9c3500d6a2b25a4d.yaml
new file mode 100644
index 000000000..d54864b30
--- /dev/null
+++ b/releasenotes/notes/refactor_action_heartbeats_without_scheduler-9c3500d6a2b25a4d.yaml
@@ -0,0 +1,12 @@
+---
+fixes:
+  - |
+    Action heartbeat checker was using scheduler to process expired action
+    executions periodically. The side effect was that upon system reboot
+    there may have been duplicating delayed calls in the database. So over
+    time, the number of such calls could be significant and those jobs could
+    even affect performance. This has now been fixed with regular threads
+    without using scheduler at all. Additionally, the new configuration
+    property "batch_size" has been added under the group "action_heartbeat"
+    to control the maximum number of action executions processed during one
+    iteration of the action execution heartbeat checker.