From 0e758e16e1abdb2440e28d270457f7329b876708 Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Mon, 11 Nov 2019 11:36:49 +0700 Subject: [PATCH] Refactor action execution reporter * Moved away from using Oslo periodic tasks in the action execution reporter since in this case they don't make the code more readable. Also, now it is symmetric with other similar components like action execution checker. * Refactored action execution checker w/o using classes since having many instances of it doesn't make sense. * Small style changes Change-Id: I9a97c40222e8dc4870c9b6a7c5f5e3c14f37bdd6 --- mistral/executors/executor_server.py | 12 +- mistral/services/action_execution_checker.py | 2 +- mistral/services/action_execution_reporter.py | 130 +++++++++++------- 3 files changed, 82 insertions(+), 62 deletions(-) diff --git a/mistral/executors/executor_server.py b/mistral/executors/executor_server.py index 4ac4aa214..9374e6be6 100644 --- a/mistral/executors/executor_server.py +++ b/mistral/executors/executor_server.py @@ -39,14 +39,11 @@ class ExecutorServer(service_base.MistralService): self.executor = executor self._rpc_server = None - self._reporter = None - self._aer = None def start(self): super(ExecutorServer, self).start() - self._aer = action_execution_reporter.ActionExecutionReporter(CONF) - self._reporter = action_execution_reporter.setup(self._aer) + action_execution_reporter.start() if self._setup_profiler: profiler_utils.setup('mistral-executor', cfg.CONF.executor.host) @@ -63,8 +60,7 @@ class ExecutorServer(service_base.MistralService): def stop(self, graceful=False): super(ExecutorServer, self).stop(graceful) - if self._reporter: - self._reporter.stop(graceful) + action_execution_reporter.stop() if self._rpc_server: self._rpc_server.stop(graceful) @@ -101,7 +97,7 @@ class ExecutorServer(service_base.MistralService): redelivered = rpc_ctx.redelivered or False try: - self._aer.add_action_ex_id(action_ex_id) + action_execution_reporter.add_action_ex_id(action_ex_id) res = self.executor.run_action( action_ex_id, @@ -123,7 +119,7 @@ class ExecutorServer(service_base.MistralService): return res finally: - self._aer.remove_action_ex_id(action_ex_id) + action_execution_reporter.remove_action_ex_id(action_ex_id) def get_oslo_service(setup_profiler=True): diff --git a/mistral/services/action_execution_checker.py b/mistral/services/action_execution_checker.py index dabb51454..f1ce6a666 100644 --- a/mistral/services/action_execution_checker.py +++ b/mistral/services/action_execution_checker.py @@ -89,7 +89,7 @@ def _loop(): except Exception: LOG.exception( 'Action execution checker iteration failed' - ' due to unexpected exception.' + ' due to an unexpected exception.' ) # For some mysterious reason (probably eventlet related) diff --git a/mistral/services/action_execution_reporter.py b/mistral/services/action_execution_reporter.py index d6852d5a7..441f5df79 100644 --- a/mistral/services/action_execution_reporter.py +++ b/mistral/services/action_execution_reporter.py @@ -12,10 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet +import sys + from oslo_config import cfg from oslo_log import log as logging -from oslo_service import periodic_task -from oslo_service import threadgroup from mistral import context as auth_ctx from mistral.rpc import clients as rpc @@ -25,69 +26,92 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF -class ActionExecutionReporter(periodic_task.PeriodicTasks): - """The reporter that reports the running action executions.""" +_enabled = False +_stopped = True - def __init__(self, conf): - super(ActionExecutionReporter, self).__init__(conf) - self._engine_client = rpc.get_engine_client() - self._running_actions = set() - - self.interval = CONF.action_heartbeat.check_interval - self.max_missed = CONF.action_heartbeat.max_missed_heartbeats - self.enabled = self.interval and self.max_missed - - _periodic_task = periodic_task.periodic_task( - spacing=self.interval, - run_immediately=True - ) - self.add_periodic_task( - _periodic_task(report) - ) - - def add_action_ex_id(self, action_ex_id): - # With run-action there is no actions_ex_id assigned - if action_ex_id and self.enabled: - self._engine_client.report_running_actions([action_ex_id]) - self._running_actions.add(action_ex_id) - - def remove_action_ex_id(self, action_ex_id): - if action_ex_id and self.enabled: - self._running_actions.discard(action_ex_id) +_running_actions = set() -def report(reporter, ctx): +def add_action_ex_id(action_ex_id): + global _enabled + + # With run-action there is no actions_ex_id assigned. + if action_ex_id and _enabled: + rpc.get_engine_client().report_running_actions([action_ex_id]) + + _running_actions.add(action_ex_id) + + +def remove_action_ex_id(action_ex_id): + global _enabled + + if action_ex_id and _enabled: + _running_actions.discard(action_ex_id) + + +def report_running_actions(): LOG.debug("Running heartbeat reporter...") - if not reporter._running_actions: + global _running_actions + + if not _running_actions: return - auth_ctx.set_ctx(ctx) - reporter._engine_client.report_running_actions(reporter._running_actions) + rpc.get_engine_client().report_running_actions(_running_actions) -def setup(action_execution_reporter): +def _loop(): + global _stopped + + # This is an administrative thread so we need to set an admin + # security context. + auth_ctx.set_ctx( + auth_ctx.MistralContext( + user=None, + tenant=None, + auth_token=None, + is_admin=True + ) + ) + + while not _stopped: + try: + report_running_actions() + except Exception: + LOG.exception( + 'Action execution reporter iteration failed' + ' due to an unexpected exception.' + ) + + # For some mysterious reason (probably eventlet related) + # the exception is not cleared from the context automatically. + # This results in subsequent log.warning calls to show invalid + # info. + if sys.version_info < (3,): + sys.exc_clear() + + eventlet.sleep(CONF.action_heartbeat.check_interval) + + +def start(): + global _stopped, _enabled + interval = CONF.action_heartbeat.check_interval max_missed = CONF.action_heartbeat.max_missed_heartbeats - enabled = interval and max_missed - if not enabled: - LOG.info("Action heartbeat reporting disabled.") - return None - tg = threadgroup.ThreadGroup() + _enabled = interval and max_missed - ctx = auth_ctx.MistralContext( - user=None, - tenant=None, - auth_token=None, - is_admin=True - ) + if not _enabled: + LOG.info("Action heartbeat reporting is disabled.") - tg.add_dynamic_timer( - action_execution_reporter.run_periodic_tasks, - initial_delay=None, - periodic_interval_max=1, - context=ctx - ) + return - return tg + _stopped = False + + eventlet.spawn(_loop) + + +def stop(graceful=False): + global _stopped + + _stopped = True