Merge "Added retry logic for RabbitMQ connection" into release-0.1
This commit is contained in:
commit
9ae78d0121
|
@ -12,35 +12,37 @@
|
||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import socket
|
||||||
|
|
||||||
|
from amqplib.client_0_8 import AMQPConnectionException
|
||||||
import anyjson
|
import anyjson
|
||||||
from eventlet import patcher
|
import eventlet
|
||||||
|
from muranoapi.common.utils import retry, handle
|
||||||
from muranoapi.db.models import Status, Session, Environment
|
from muranoapi.db.models import Status, Session, Environment
|
||||||
from muranoapi.db.session import get_session
|
from muranoapi.db.session import get_session
|
||||||
|
|
||||||
amqp = patcher.import_patched('amqplib.client_0_8')
|
|
||||||
|
|
||||||
from muranoapi.openstack.common import service
|
|
||||||
from muranoapi.openstack.common import log as logging
|
from muranoapi.openstack.common import log as logging
|
||||||
from muranoapi.common import config
|
from muranoapi.common import config
|
||||||
|
|
||||||
|
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
|
||||||
conf = config.CONF.reports
|
conf = config.CONF.reports
|
||||||
rabbitmq = config.CONF.rabbitmq
|
rabbitmq = config.CONF.rabbitmq
|
||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
channel = None
|
|
||||||
|
|
||||||
|
|
||||||
class TaskResultHandlerService(service.Service):
|
class TaskResultHandlerService():
|
||||||
def __init__(self, threads=1000):
|
thread = None
|
||||||
super(TaskResultHandlerService, self).__init__(threads)
|
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
super(TaskResultHandlerService, self).start()
|
self.thread = eventlet.spawn(self.connect)
|
||||||
self.tg.add_thread(self._handle_results)
|
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
super(TaskResultHandlerService, self).stop()
|
pass
|
||||||
|
|
||||||
def _handle_results(self):
|
def wait(self):
|
||||||
|
self.thread.wait()
|
||||||
|
|
||||||
|
@retry((socket.error, AMQPConnectionException), tries=-1)
|
||||||
|
def connect(self):
|
||||||
connection = amqp.Connection('{0}:{1}'.
|
connection = amqp.Connection('{0}:{1}'.
|
||||||
format(rabbitmq.host, rabbitmq.port),
|
format(rabbitmq.host, rabbitmq.port),
|
||||||
virtual_host=rabbitmq.virtual_host,
|
virtual_host=rabbitmq.virtual_host,
|
||||||
|
@ -67,36 +69,15 @@ class TaskResultHandlerService(service.Service):
|
||||||
ch.wait()
|
ch.wait()
|
||||||
|
|
||||||
|
|
||||||
def handle_report(msg):
|
@handle
|
||||||
log.debug(_('Got report message from orchestration engine:\n{0}'.
|
|
||||||
format(msg.body)))
|
|
||||||
|
|
||||||
params = anyjson.deserialize(msg.body)
|
|
||||||
params['entity_id'] = params['id']
|
|
||||||
del params['id']
|
|
||||||
|
|
||||||
status = Status()
|
|
||||||
status.update(params)
|
|
||||||
|
|
||||||
session = get_session()
|
|
||||||
#connect with session
|
|
||||||
conf_session = session.query(Session).filter_by(
|
|
||||||
**{'environment_id': status.environment_id,
|
|
||||||
'state': 'deploying'}).first()
|
|
||||||
status.session_id = conf_session.id
|
|
||||||
|
|
||||||
with session.begin():
|
|
||||||
session.add(status)
|
|
||||||
|
|
||||||
|
|
||||||
def handle_result(msg):
|
def handle_result(msg):
|
||||||
log.debug(_('Got result message from '
|
log.debug(_('Got result message from '
|
||||||
'orchestration engine:\n{0}'.format(msg.body)))
|
'orchestration engine:\n{0}'.format(msg.body)))
|
||||||
|
|
||||||
environment_result = anyjson.deserialize(msg.body)
|
environment_result = anyjson.deserialize(msg.body)
|
||||||
if 'deleted' in environment_result:
|
if 'deleted' in environment_result:
|
||||||
log.debug(_('Result for environment {0} is dropped. '
|
log.debug(_('Result for environment {0} is dropped. Environment '
|
||||||
'Environment is deleted'.format(environment_result['id'])))
|
'is deleted'.format(environment_result['id'])))
|
||||||
|
|
||||||
msg.channel.basic_ack(msg.delivery_tag)
|
msg.channel.basic_ack(msg.delivery_tag)
|
||||||
return
|
return
|
||||||
|
@ -119,3 +100,26 @@ def handle_result(msg):
|
||||||
conf_session.save(session)
|
conf_session.save(session)
|
||||||
|
|
||||||
msg.channel.basic_ack(msg.delivery_tag)
|
msg.channel.basic_ack(msg.delivery_tag)
|
||||||
|
|
||||||
|
|
||||||
|
@handle
|
||||||
|
def handle_report(msg):
|
||||||
|
log.debug(_('Got report message from orchestration '
|
||||||
|
'engine:\n{0}'.format(msg.body)))
|
||||||
|
|
||||||
|
params = anyjson.deserialize(msg.body)
|
||||||
|
params['entity_id'] = params['id']
|
||||||
|
del params['id']
|
||||||
|
|
||||||
|
status = Status()
|
||||||
|
status.update(params)
|
||||||
|
|
||||||
|
session = get_session()
|
||||||
|
#connect with session
|
||||||
|
conf_session = session.query(Session).filter_by(
|
||||||
|
**{'environment_id': status.environment_id,
|
||||||
|
'state': 'deploying'}).first()
|
||||||
|
status.session_id = conf_session.id
|
||||||
|
|
||||||
|
with session.begin():
|
||||||
|
session.add(status)
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
# Copyright (c) 2013 Mirantis, Inc.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import eventlet
|
||||||
|
from functools import wraps
|
||||||
|
from muranoapi.openstack.common import log as logging
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
def retry(ExceptionToCheck, tries=4, delay=3, backoff=2):
|
||||||
|
"""Retry calling the decorated function using an exponential backoff.
|
||||||
|
|
||||||
|
http://www.saltycrane.com/blog/2009/11/trying-out-retry-decorator-python/
|
||||||
|
original from: http://wiki.python.org/moin/PythonDecoratorLibrary#Retry
|
||||||
|
|
||||||
|
:param ExceptionToCheck: the exception to check. may be a tuple of
|
||||||
|
exceptions to check
|
||||||
|
:type ExceptionToCheck: Exception or tuple
|
||||||
|
:param tries: number of times to try (not retry) before giving up
|
||||||
|
:type tries: int
|
||||||
|
:param delay: initial delay between retries in seconds
|
||||||
|
:type delay: int
|
||||||
|
:param backoff: backoff multiplier e.g. value of 2 will double the delay
|
||||||
|
each retry
|
||||||
|
:type backoff: int
|
||||||
|
"""
|
||||||
|
|
||||||
|
def deco_retry(f):
|
||||||
|
@wraps(f)
|
||||||
|
def f_retry(*args, **kwargs):
|
||||||
|
mtries, mdelay = tries, delay
|
||||||
|
forever = mtries == -1
|
||||||
|
while forever or mtries > 1:
|
||||||
|
try:
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
except ExceptionToCheck as e:
|
||||||
|
|
||||||
|
log.exception(e)
|
||||||
|
log.info("Retrying in {0} seconds...".format(mdelay))
|
||||||
|
|
||||||
|
eventlet.sleep(mdelay)
|
||||||
|
|
||||||
|
if not forever:
|
||||||
|
mtries -= 1
|
||||||
|
|
||||||
|
if mdelay < 60:
|
||||||
|
mdelay *= backoff
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
|
||||||
|
return f_retry
|
||||||
|
|
||||||
|
return deco_retry
|
||||||
|
|
||||||
|
|
||||||
|
def handle(f):
|
||||||
|
"""Handles exception in wrapped function and writes to log."""
|
||||||
|
|
||||||
|
@wraps(f)
|
||||||
|
def f_handle(*args, **kwargs):
|
||||||
|
try:
|
||||||
|
return f(*args, **kwargs)
|
||||||
|
except Exception as e:
|
||||||
|
log.exception(e)
|
||||||
|
|
||||||
|
return f_handle
|
Loading…
Reference in New Issue