Do not wait for MessageHandlingServer

This commit adds ApiService/NotificationService service
implementation that has its own wait() and encapsulates
MessageHandlingServer instance.

Similar issue was fixed for murano-engine, this commit fixes the same
for murano-api service.

Change-Id: Ia1566dc79aa5a05a00851fcf48f3e7318ac85d54
Closes-Bug: #1521087
This commit is contained in:
varshak05 2016-03-09 21:28:48 +05:30 committed by Kirill Zaitsev
parent c7b59e1a45
commit 940d69394c
3 changed files with 46 additions and 31 deletions

View File

@ -66,8 +66,8 @@ def main():
launcher.launch_service(wsgi.Service(app, port, host))
launcher.launch_service(server.get_rpc_service())
launcher.launch_service(server.get_notification_service())
launcher.launch_service(server.ApiService())
launcher.launch_service(server.NotificationService())
launcher.launch_service(stats.StatsCollectingService())
launcher.wait()

View File

@ -65,8 +65,8 @@ def main():
launcher.launch_service(wsgi.Service(cfapp, cfport, cfhost))
launcher.launch_service(server.get_rpc_service())
launcher.launch_service(server.get_notification_service())
launcher.launch_service(server.ApiService())
launcher.launch_service(server.NotificationService())
launcher.wait()
except RuntimeError as e:

View File

@ -19,6 +19,7 @@ from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_messaging.notify import dispatcher as oslo_dispatcher
from oslo_messaging import target
from oslo_service import service
from oslo_utils import timeutils
from sqlalchemy import desc
@ -32,9 +33,6 @@ from murano.services import states
CONF = cfg.CONF
RPC_SERVICE = None
NOTIFICATION_SERVICE = None
LOG = logging.getLogger(__name__)
@ -198,35 +196,52 @@ def get_last_deployment(unit, env_id):
return query.first()
def _prepare_rpc_service(server_id):
endpoints = [ResultEndpoint()]
class Service(service.Service):
"""Service class, that contains common methods for custom services"""
transport = messaging.get_transport(CONF)
s_target = target.Target('murano', 'results', server=server_id)
return messaging.get_rpc_server(transport, s_target, endpoints, 'eventlet')
def __init__(self):
super(Service, self).__init__()
self.server = None
def stop(self, graceful=False):
if self.server:
self.server.stop()
if graceful:
self.server.wait()
super(Service, self).stop()
def reset(self):
if self.server:
self.server.reset()
super(Service, self).reset()
def _prepare_notification_service(server_id):
endpoints = [report_notification, track_instance, untrack_instance]
class NotificationService(Service):
def __init__(self):
super(NotificationService, self).__init__()
self.server = None
transport = messaging.get_transport(CONF)
s_target = target.Target(topic='murano', server=server_id)
dispatcher = oslo_dispatcher.NotificationDispatcher(
[s_target], endpoints, None, True)
return messaging.MessageHandlingServer(transport, dispatcher, 'eventlet')
def start(self):
endpoints = [report_notification, track_instance, untrack_instance]
transport = messaging.get_transport(CONF)
s_target = target.Target(topic='murano', server=str(uuid.uuid4()))
dispatcher = oslo_dispatcher.NotificationDispatcher(
[s_target], endpoints, None, True)
self.server = messaging.MessageHandlingServer(
transport, dispatcher, 'eventlet')
self.server.start()
super(NotificationService, self).start()
def get_rpc_service():
global RPC_SERVICE
class ApiService(Service):
if RPC_SERVICE is None:
RPC_SERVICE = _prepare_rpc_service(str(uuid.uuid4()))
return RPC_SERVICE
def start(self):
endpoints = [ResultEndpoint()]
def get_notification_service():
global NOTIFICATION_SERVICE
if NOTIFICATION_SERVICE is None:
NOTIFICATION_SERVICE = _prepare_notification_service(str(uuid.uuid4()))
return NOTIFICATION_SERVICE
transport = messaging.get_transport(CONF)
s_target = target.Target('murano', 'results', server=str(uuid.uuid4()))
self.server = messaging.get_rpc_server(
transport, s_target, endpoints, 'eventlet')
self.server.start()
super(ApiService, self).start()