deb-murano/muranoapi/common/server.py
Serg Melikyan a8c66b0047 Use oslo.messaging
* Remove muranoapi/openstack/common/rpc
* Remove muranoapi/openstack/common/notifier
* Add ResultEndpoint for handling results from Conductor
* Add ReportNotificationEndpoint for handling reports from Conductor
* Add ConductorClient to send environments to Conductor
* Update murano-api.conf.sample with new RabbitMQ settings
* Clean-up openstack-common from rpc/notifier modules

Change-Id: I4b63f3e2b848143af99dff3c7d39080cc39516a0
2014-03-13 14:35:59 +04:00

186 lines
6.2 KiB
Python

# Copyright (c) 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslo import messaging
from oslo.messaging import localcontext
from oslo.messaging import serializer as msg_serializer
from oslo.messaging import target
from sqlalchemy import desc
from muranoapi.common import config
from muranoapi.db import models
from muranoapi.db import session
from muranoapi.openstack.common.gettextutils import _ # noqa
from muranoapi.openstack.common import log as logging
from muranoapi.openstack.common import timeutils
from muranocommon.helpers import token_sanitizer
RPC_SERVICE = None
NOTIFICATION_SERVICE = None
log = logging.getLogger(__name__)
class ResultEndpoint(object):
@staticmethod
def process_result(context, result):
secure_result = token_sanitizer.TokenSanitizer().sanitize(result)
log.debug(_('Got result from orchestration '
'engine:\n{0}'.format(secure_result)))
if 'deleted' in result:
log.debug(_('Result for environment {0} is dropped. Environment '
'is deleted'.format(result['id'])))
return
unit = session.get_session()
environment = unit.query(models.Environment).get(result['id'])
if not environment:
log.warning(_('Environment result could not be handled, specified '
'environment was not found in database'))
return
environment.description = result
environment.networking = result.get('networking', {})
environment.version += 1
environment.save(unit)
#close session
conf_session = unit.query(models.Session).filter_by(
**{'environment_id': environment.id, 'state': 'deploying'}).first()
conf_session.state = 'deployed'
conf_session.save(unit)
#close deployment
deployment = get_last_deployment(unit, environment.id)
deployment.finished = timeutils.utcnow()
num_errors = unit.query(models.Status)\
.filter_by(level='error', deployment_id=deployment.id).count()
num_warnings = unit.query(models.Status)\
.filter_by(level='warning', deployment_id=deployment.id).count()
final_status_text = "Deployment finished"
if num_errors:
final_status_text += " with errors"
elif num_warnings:
final_status_text += " with warnings"
status = models.Status()
status.deployment_id = deployment.id
status.text = final_status_text
status.level = 'info'
deployment.statuses.append(status)
deployment.save(unit)
class ReportNotificationEndpoint(object):
@staticmethod
def report_notification(context, report):
log.debug(_('Got report from orchestration '
'engine:\n{0}'.format(report)))
report['entity_id'] = report['id']
del report['id']
status = models.Status()
status.update(report)
unit = session.get_session()
#connect with deployment
with unit.begin():
running_deployment = get_last_deployment(unit,
status.environment_id)
status.deployment_id = running_deployment.id
unit.add(status)
class NotificationDispatcher(object):
def __init__(self, srv_target, endpoints, serializer):
self.endpoints = endpoints
self.serializer = serializer or msg_serializer.NoOpSerializer()
self._default_target = target.Target()
self._target = srv_target
def _listen(self, transport):
return transport._listen(self._target)
def _dispatch(self, endpoint, method, ctxt, payload):
ctxt = self.serializer.deserialize_context(ctxt)
result = getattr(endpoint, method)(ctxt, payload)
return self.serializer.serialize_entity(ctxt, result)
def __call__(self, ctxt, message):
event_type = message.get('event_type')
if not event_type.startswith('murano.'):
return
method = '{0}_notification'.format(event_type[7:])
for endpoint in self.endpoints:
if hasattr(endpoint, method):
localcontext.set_local_context(ctxt)
try:
payload = message.get('payload')
return self._dispatch(endpoint, method, ctxt, payload)
finally:
localcontext.clear_local_context()
msg = 'Could not find notification handler for event \'{0}\''
raise Exception(msg.format(method))
def get_last_deployment(unit, env_id):
query = unit.query(models.Deployment)\
.filter_by(environment_id=env_id)\
.order_by(desc(models.Deployment.started))
return query.first()
def _prepare_rpc_service(server_id):
endpoints = [ResultEndpoint()]
transport = messaging.get_transport(config.CONF)
s_target = target.Target('murano', 'results', server=server_id)
return messaging.get_rpc_server(transport, s_target, endpoints, 'eventlet')
def _prepare_notification_service(server_id):
endpoints = [ReportNotificationEndpoint()]
transport = messaging.get_transport(config.CONF)
s_target = target.Target(topic='notifications.info', server=server_id)
dispatcher = NotificationDispatcher(s_target, endpoints, None)
return messaging.MessageHandlingServer(transport, dispatcher, 'eventlet')
def get_rpc_service():
global RPC_SERVICE
if RPC_SERVICE is None:
RPC_SERVICE = _prepare_rpc_service(str(uuid.uuid4()))
return RPC_SERVICE
def get_notification_service():
global NOTIFICATION_SERVICE
if NOTIFICATION_SERVICE is None:
NOTIFICATION_SERVICE = _prepare_notification_service(str(uuid.uuid4()))
return NOTIFICATION_SERVICE