diff --git a/murano/cmd/engine.py b/murano/cmd/engine.py index 0e4400f2..26c7de32 100644 --- a/murano/cmd/engine.py +++ b/murano/cmd/engine.py @@ -48,7 +48,7 @@ def main(): logging.setup(CONF, 'murano') launcher = service.ServiceLauncher(CONF) - launcher.launch_service(engine.get_rpc_service()) + launcher.launch_service(engine.EngineService()) launcher.wait() except RuntimeError as e: diff --git a/murano/common/engine.py b/murano/common/engine.py index fac372f1..77a59c75 100755 --- a/murano/common/engine.py +++ b/murano/common/engine.py @@ -23,6 +23,7 @@ from oslo_log import log as logging import oslo_messaging as messaging from oslo_messaging import target from oslo_serialization import jsonutils +from oslo_service import service from murano.common import auth_utils from murano.common.helpers import token_sanitizer @@ -42,7 +43,6 @@ from murano.policy import model_policy_enforcer as enforcer CONF = cfg.CONF -RPC_SERVICE = None PLUGIN_LOADER = None LOG = logging.getLogger(__name__) @@ -50,20 +50,33 @@ LOG = logging.getLogger(__name__) eventlet.debug.hub_exceptions(False) -def _prepare_rpc_service(server_id): - endpoints = [TaskProcessingEndpoint()] +# noinspection PyAbstractClass +class EngineService(service.Service): + def __init__(self): + super(EngineService, self).__init__() + self.server = None - transport = messaging.get_transport(CONF) - s_target = target.Target('murano', 'tasks', server=server_id) - return messaging.get_rpc_server(transport, s_target, endpoints, 'eventlet') + def start(self): + endpoints = [TaskProcessingEndpoint()] + transport = messaging.get_transport(CONF) + s_target = target.Target('murano', 'tasks', server=str(uuid.uuid4())) + self.server = messaging.get_rpc_server( + transport, s_target, endpoints, 'eventlet') + self.server.start() + super(EngineService, self).start() -def get_rpc_service(): - global RPC_SERVICE + def stop(self, graceful=False): + if self.server: + self.server.stop() + if graceful: + self.server.wait() + super(EngineService, self).stop() - if RPC_SERVICE is None: - RPC_SERVICE = _prepare_rpc_service(str(uuid.uuid4())) - return RPC_SERVICE + def reset(self): + if self.server: + self.server.reset() + super(EngineService, self).reset() def get_plugin_loader():