From 05edab366164ebd4ac0743252776a9e00579650c Mon Sep 17 00:00:00 2001 From: Stan Lagun Date: Wed, 9 Dec 2015 12:30:49 +0300 Subject: [PATCH] Do not wait for MessageHandlingServer Previously oslo.messaging's MessageHandlingServer was used as as a murano-engine service in service launcher. As a result it's wait() method was called. But after recent changes it's wait method is supposed to be call only after stop() to wait for graceful message processing to end. As a result a warning was printed that stop() didn't finish after 30 sec from wait() invocation (because it was never called). This commit adds EngineService service implementation that has its own wait() and encapsulates MessageHandlingServer instance. Change-Id: Ie553e0b27cc1c261b963907b4f12f89795b99a12 Closes-Bug: #1521087 --- murano/cmd/engine.py | 2 +- murano/common/engine.py | 35 ++++++++++++++++++++++++----------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/murano/cmd/engine.py b/murano/cmd/engine.py index 0e4400f2..26c7de32 100644 --- a/murano/cmd/engine.py +++ b/murano/cmd/engine.py @@ -48,7 +48,7 @@ def main(): logging.setup(CONF, 'murano') launcher = service.ServiceLauncher(CONF) - launcher.launch_service(engine.get_rpc_service()) + launcher.launch_service(engine.EngineService()) launcher.wait() except RuntimeError as e: diff --git a/murano/common/engine.py b/murano/common/engine.py index fac372f1..77a59c75 100755 --- a/murano/common/engine.py +++ b/murano/common/engine.py @@ -23,6 +23,7 @@ from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging import target from oslo_serialization import jsonutils +from oslo_service import service from murano.common import auth_utils from murano.common.helpers import token_sanitizer @@ -42,7 +43,6 @@ from murano.policy import model_policy_enforcer as enforcer CONF = cfg.CONF -RPC_SERVICE = None PLUGIN_LOADER = None LOG = logging.getLogger(__name__) @@ -50,20 +50,33 @@ LOG = logging.getLogger(__name__) eventlet.debug.hub_exceptions(False) -def _prepare_rpc_service(server_id): - endpoints = [TaskProcessingEndpoint()] +# noinspection PyAbstractClass +class EngineService(service.Service): + def __init__(self): + super(EngineService, self).__init__() + self.server = None - transport = messaging.get_transport(CONF) - s_target = target.Target('murano', 'tasks', server=server_id) - return messaging.get_rpc_server(transport, s_target, endpoints, 'eventlet') + def start(self): + endpoints = [TaskProcessingEndpoint()] + transport = messaging.get_transport(CONF) + s_target = target.Target('murano', 'tasks', server=str(uuid.uuid4())) + self.server = messaging.get_rpc_server( + transport, s_target, endpoints, 'eventlet') + self.server.start() + super(EngineService, self).start() -def get_rpc_service(): - global RPC_SERVICE + def stop(self, graceful=False): + if self.server: + self.server.stop() + if graceful: + self.server.wait() + super(EngineService, self).stop() - if RPC_SERVICE is None: - RPC_SERVICE = _prepare_rpc_service(str(uuid.uuid4())) - return RPC_SERVICE + def reset(self): + if self.server: + self.server.reset() + super(EngineService, self).reset() def get_plugin_loader():