deb-murano/muranoapi/common/service.py
Stan Lagun 332078e3b7 Updated to muranocommon 0.2 with SSL support
Change-Id: Ibf4ec63332ab1073bda7d1a1cd102cf514284188
2013-07-29 13:25:05 +04:00

153 lines
5.2 KiB
Python

# Copyright (c) 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from muranoapi.common.utils import handle
from muranoapi.db.models import Status, Session, Environment, Deployment
from muranoapi.db.session import get_session
from muranoapi.openstack.common import log as logging, timeutils, service
from muranoapi.common import config
from muranocommon.messaging import MqClient
from sqlalchemy import desc
import eventlet
conf = config.CONF.reports
log = logging.getLogger(__name__)
class TaskResultHandlerService(service.Service):
def __init__(self):
super(TaskResultHandlerService, self).__init__()
def start(self):
super(TaskResultHandlerService, self).start()
self.tg.add_thread(self._handle_results)
self.tg.add_thread(self._handle_reports)
def stop(self):
super(TaskResultHandlerService, self).stop()
def _create_mq_client(self):
rabbitmq = config.CONF.rabbitmq
connection_params = {
'login': rabbitmq.login,
'password': rabbitmq.password,
'host': rabbitmq.host,
'port': rabbitmq.port,
'virtual_host': rabbitmq.virtual_host,
'ssl': rabbitmq.ssl,
'ca_certs': rabbitmq.ca_certs.strip() or None
}
return MqClient(**connection_params)
def _handle_results(self):
while True:
try:
with self._create_mq_client() as mqClient:
mqClient.declare(conf.results_exchange, conf.results_queue)
with mqClient.open(conf.results_queue) as results_sb:
while True:
result = results_sb.get_message()
eventlet.spawn(handle_result, result)
except Exception as ex:
log.exception(ex)
def _handle_reports(self):
while True:
try:
with self._create_mq_client() as mqClient:
mqClient.declare(conf.reports_exchange, conf.reports_queue)
with mqClient.open(conf.reports_queue) as reports_sb:
while True:
report = reports_sb.get_message()
eventlet.spawn(handle_report, report)
except Exception as ex:
log.exception(ex)
@handle
def handle_result(message):
try:
environment_result = message.body
log.debug(_('Got result message from '
'orchestration engine:\n{0}'.format(environment_result)))
if 'deleted' in environment_result:
log.debug(_('Result for environment {0} is dropped. Environment '
'is deleted'.format(environment_result['id'])))
return
session = get_session()
environment = session.query(Environment).get(environment_result['id'])
if not environment:
log.warning(_('Environment result could not be handled, specified '
'environment does not found in database'))
return
environment.description = environment_result
environment.version += 1
environment.save(session)
#close session
conf_session = session.query(Session).filter_by(
**{'environment_id': environment.id, 'state': 'deploying'}).first()
conf_session.state = 'deployed'
conf_session.save(session)
#close deployment
deployment = get_last_deployment(session, environment.id)
deployment.finished = timeutils.utcnow()
status = Status()
status.deployment_id = deployment.id
status.text = "Deployment finished"
deployment.statuses.append(status)
deployment.save(session)
except Exception as ex:
log.exception(ex)
finally:
message.ack()
@handle
def handle_report(message):
try:
report = message.body
log.debug(_('Got report message from orchestration '
'engine:\n{0}'.format(report)))
report['entity_id'] = report['id']
del report['id']
status = Status()
status.update(report)
session = get_session()
#connect with deployment
with session.begin():
running_deployment = get_last_deployment(session,
status.environment_id)
status.deployment_id = running_deployment.id
session.add(status)
except Exception as ex:
log.exception(ex)
finally:
message.ack()
def get_last_deployment(session, env_id):
query = session.query(Deployment). \
filter_by(environment_id=env_id). \
order_by(desc(Deployment.started))
return query.first()