Do not wait for MessageHandlingServer

Previously oslo.messaging's MessageHandlingServer
was used as as a murano-engine service in service launcher.
As a result it's wait() method was called. But after recent
changes it's wait method is supposed to be call only
after stop() to wait for graceful message processing to end.
As a result a warning was printed that stop() didn't finish
after 30 sec from wait() invocation (because it was never called).

This commit adds EngineService service implementation
that has its own wait() and encapsulates MessageHandlingServer
instance.

Change-Id: Ie553e0b27cc1c261b963907b4f12f89795b99a12
Closes-Bug: #1521087
This commit is contained in:
Stan Lagun 2015-12-09 12:30:49 +03:00
parent 5c14f29f9b
commit 05edab3661
2 changed files with 25 additions and 12 deletions

View File

@ -48,7 +48,7 @@ def main():
logging.setup(CONF, 'murano')
launcher = service.ServiceLauncher(CONF)
launcher.launch_service(engine.get_rpc_service())
launcher.launch_service(engine.EngineService())
launcher.wait()
except RuntimeError as e:

View File

@ -23,6 +23,7 @@ from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging import target
from oslo_serialization import jsonutils
from oslo_service import service
from murano.common import auth_utils
from murano.common.helpers import token_sanitizer
@ -42,7 +43,6 @@ from murano.policy import model_policy_enforcer as enforcer
CONF = cfg.CONF
RPC_SERVICE = None
PLUGIN_LOADER = None
LOG = logging.getLogger(__name__)
@ -50,20 +50,33 @@ LOG = logging.getLogger(__name__)
eventlet.debug.hub_exceptions(False)
def _prepare_rpc_service(server_id):
endpoints = [TaskProcessingEndpoint()]
# noinspection PyAbstractClass
class EngineService(service.Service):
def __init__(self):
super(EngineService, self).__init__()
self.server = None
transport = messaging.get_transport(CONF)
s_target = target.Target('murano', 'tasks', server=server_id)
return messaging.get_rpc_server(transport, s_target, endpoints, 'eventlet')
def start(self):
endpoints = [TaskProcessingEndpoint()]
transport = messaging.get_transport(CONF)
s_target = target.Target('murano', 'tasks', server=str(uuid.uuid4()))
self.server = messaging.get_rpc_server(
transport, s_target, endpoints, 'eventlet')
self.server.start()
super(EngineService, self).start()
def get_rpc_service():
global RPC_SERVICE
def stop(self, graceful=False):
if self.server:
self.server.stop()
if graceful:
self.server.wait()
super(EngineService, self).stop()
if RPC_SERVICE is None:
RPC_SERVICE = _prepare_rpc_service(str(uuid.uuid4()))
return RPC_SERVICE
def reset(self):
if self.server:
self.server.reset()
super(EngineService, self).reset()
def get_plugin_loader():