Implement retries in notify_kombu

Fixes bug 915817

Copies code from nova and includes improvements pending for nova as well

Change-Id: Ib596f4bacc5f58c4507f4111cdfac273029018bc
This commit is contained in:
Chris Behrens 2012-01-12 23:42:06 -08:00
parent b0181065d0
commit 195e6670c4
4 changed files with 328 additions and 37 deletions

@ -3,6 +3,7 @@ Alex Meade <alex.meade@rackspace.com>
Andrey Brindeyev <abrindeyev@griddynamics.com>
Brian Lamar <brian.lamar@rackspace.com>
Brian Waldon <brian.waldon@rackspace.com>
Chris Behrens <cbehrens@codestud.com>
Christopher MacGown <chris@slicehost.com>
Cory Wright <corywright@gmail.com>
Dan Prince <dan.prince@rackspace.com>

@ -667,3 +667,24 @@ Exchange name to use for connection when using ``rabbit`` strategy.
Optional. Default: ``glance_notifications``
Topic to use for connection when using ``rabbit`` strategy.
* ``rabbit_max_retries``
Optional. Default: ``0``
Number of retries on communication failures when using ``rabbit`` strategy.
A value of 0 means to retry forever.
* ``rabbit_retry_backoff``
Optional. Default: ``2``
Number of seconds to wait before reconnecting on failures when using
``rabbit`` strategy.
* ``rabbit_retry_max_backoff``
Optional. Default: ``30``
Maximum seconds to wait before reconnecting on failures when using
``rabbit`` strategy.

@ -15,6 +15,8 @@
import json
import logging
import time
import kombu.connection
import kombu.entity
@ -23,6 +25,13 @@ from glance.common import cfg
from glance.notifier import strategy
logger = logging.getLogger('glance.notifier.notify_kombu')
class KombuMaxRetriesReached(Exception):
pass
class RabbitStrategy(strategy.Strategy):
"""A notifier that puts a message on a queue when called."""
@ -34,7 +43,11 @@ class RabbitStrategy(strategy.Strategy):
cfg.StrOpt('rabbit_password', default='guest'),
cfg.StrOpt('rabbit_virtual_host', default='/'),
cfg.StrOpt('rabbit_notification_exchange', default='glance'),
cfg.StrOpt('rabbit_notification_topic', default='glance_notifications')
cfg.StrOpt('rabbit_notification_topic',
default='glance_notifications'),
cfg.StrOpt('rabbit_max_retries', default=0),
cfg.StrOpt('rabbit_retry_backoff', default=2),
cfg.StrOpt('rabbit_retry_max_backoff', default=30)
]
def __init__(self, conf):
@ -43,45 +56,166 @@ class RabbitStrategy(strategy.Strategy):
self._conf.register_opts(self.opts)
self.topic = self._conf.rabbit_notification_topic
self.connect()
self.max_retries = self._conf.rabbit_max_retries
# NOTE(comstud): When reading the config file, these values end
# up being strings, and we need them as ints.
self.retry_backoff = int(self._conf.rabbit_retry_backoff)
self.retry_max_backoff = int(self._conf.rabbit_retry_max_backoff)
def connect(self):
self.connection = None
self.retry_attempts = 0
try:
self.reconnect()
except KombuMaxRetriesReached:
pass
def _close(self):
"""Close connection to rabbit."""
try:
self.connection.close()
except self.connection_errors:
pass
self.connection = None
def _connect(self):
"""Connect to rabbit. Exceptions should be handled by the
caller.
"""
log_info = {}
log_info['hostname'] = self._conf.rabbit_host
log_info['port'] = self._conf.rabbit_port
if self.connection:
logger.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % log_info)
self._close()
else:
logger.info(_("Connecting to AMQP server on "
"%(hostname)s:%(port)d") % log_info)
self.connection = kombu.connection.BrokerConnection(
hostname=self._conf.rabbit_host,
userid=self._conf.rabbit_userid,
password=self._conf.rabbit_password,
virtual_host=self._conf.rabbit_virtual_host,
ssl=self._conf.rabbit_use_ssl)
hostname=self._conf.rabbit_host,
port=self._conf.rabbit_port,
userid=self._conf.rabbit_userid,
password=self._conf.rabbit_password,
virtual_host=self._conf.rabbit_virtual_host,
ssl=self._conf.rabbit_use_ssl)
self.connection_errors = self.connection.connection_errors
self.connection.connect()
self.channel = self.connection.channel()
self.exchange = kombu.entity.Exchange(
channel=self.channel,
type="topic",
name=self._conf.rabbit_notification_exchange)
self.exchange.declare()
channel=self.channel,
type="topic",
name=self._conf.rabbit_notification_exchange)
def _send_message(self, message, priority):
routing_key = "%s.%s" % (self.topic, priority.lower())
# NOTE(jerdfelt): Normally the consumer would create the queue, but
# we do this to ensure that messages don't get dropped if the
# NOTE(jerdfelt): Normally the consumer would create the queues,
# but we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(
channel=self.channel,
exchange=self.exchange,
durable=True,
name=routing_key,
routing_key=routing_key)
queue.declare()
for priority in ["WARN", "INFO", "ERROR"]:
routing_key = "%s.%s" % (self.topic, priority.lower())
queue = kombu.entity.Queue(
channel=self.channel,
exchange=self.exchange,
durable=True,
name=routing_key,
routing_key=routing_key)
queue.declare()
logger.info(_("Connected to AMQP server on "
"%(hostname)s:%(port)d") % log_info)
def reconnect(self):
"""Handles reconnecting and re-establishing queues."""
while True:
self.retry_attempts += 1
try:
self._connect()
return
except self.connection_errors, e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621 for
# nova.) So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
log_info = {}
log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries
log_info['hostname'] = self._conf.rabbit_host
log_info['port'] = self._conf.rabbit_port
if self.max_retries and self.retry_attempts >= self.max_retries:
logger.exception(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info)
if self.connection:
self._close()
raise KombuMaxRetriesReached
sleep_time = self.retry_backoff * self.retry_attempts
if self.retry_max_backoff:
sleep_time = min(sleep_time, self.retry_max_backoff)
log_info['sleep_time'] = sleep_time
logger.exception(_('AMQP server on %(hostname)s:%(port)d is'
' unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def log_failure(self, msg, priority):
"""Fallback to logging when we can't send to rabbit."""
logger.error(_('Notification with priority %(priority)s failed; '
'msg=%s') % msg)
def _send_message(self, msg, routing_key):
"""Send a message. Caller needs to catch exceptions for retry."""
msg = self.exchange.Message(json.dumps(message))
self.exchange.publish(msg, routing_key=routing_key)
def _notify(self, msg, priority):
"""Send a notification and retry if needed."""
self.retry_attempts = 0
if not self.connection:
try:
self.reconnect()
except KombuMaxRetriesReached:
self.log_failure(msg, priority)
return
routing_key = "%s.%s" % (self.topic, priority.lower())
while True:
try:
self._send_message(msg, routing_key)
return
except self.connection_errors, e:
pass
except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport
# connection_errors in the case of a timeout waiting for
# a protocol response. (See paste link in LP888621 for
# nova.) So, we check all exceptions for 'timeout' in them
# and try to reconnect in this case.
if 'timeout' not in str(e):
raise
logger.exception(_("Unable to send notification: %s") % str(e))
try:
self.reconnect()
except KombuMaxRetriesReached:
break
self.log_failure(msg, priority)
def warn(self, msg):
self._send_message(msg, "WARN")
self._notify(msg, "WARN")
def info(self, msg):
self._send_message(msg, "INFO")
self._notify(msg, "INFO")
def error(self, msg):
self._send_message(msg, "ERROR")
self._notify(msg, "ERROR")

@ -86,18 +86,26 @@ class TestRabbitNotifier(unittest.TestCase):
"""Test AMQP/Rabbit notifier works."""
def setUp(self):
notify_kombu = common_utils.import_object(
"glance.notifier.notify_kombu")
notify_kombu.RabbitStrategy._send_message = self._send_message
notify_kombu.RabbitStrategy.connect = lambda s: None
self.called = False
conf = utils.TestConfigOpts({"notifier_strategy": "rabbit"})
self.notifier = notifier.Notifier(conf)
def _fake_connect(rabbit_self):
rabbit_self.connection_errors = ()
rabbit_self.connection = 'fake_connection'
return None
def _send_message(self, message, priority):
self.notify_kombu = common_utils.import_object(
"glance.notifier.notify_kombu")
self.notify_kombu.RabbitStrategy._send_message = self._send_message
self.notify_kombu.RabbitStrategy._connect = _fake_connect
self.called = False
self.conf = utils.TestConfigOpts({"notifier_strategy": "rabbit",
"rabbit_retry_backoff": 0,
"rabbit_notification_topic":
"fake_topic"})
self.notifier = notifier.Notifier(self.conf)
def _send_message(self, message, routing_key):
self.called = {
"message": message,
"priority": priority,
"routing_key": routing_key
}
def test_warn(self):
@ -108,6 +116,7 @@ class TestRabbitNotifier(unittest.TestCase):
self.assertEquals("test_message", self.called["message"]["payload"])
self.assertEquals("WARN", self.called["message"]["priority"])
self.assertEquals("fake_topic.warn", self.called["routing_key"])
def test_info(self):
self.notifier.info("test_event", "test_message")
@ -117,6 +126,7 @@ class TestRabbitNotifier(unittest.TestCase):
self.assertEquals("test_message", self.called["message"]["payload"])
self.assertEquals("INFO", self.called["message"]["priority"])
self.assertEquals("fake_topic.info", self.called["routing_key"])
def test_error(self):
self.notifier.error("test_event", "test_message")
@ -126,3 +136,128 @@ class TestRabbitNotifier(unittest.TestCase):
self.assertEquals("test_message", self.called["message"]["payload"])
self.assertEquals("ERROR", self.called["message"]["priority"])
self.assertEquals("fake_topic.error", self.called["routing_key"])
def test_unknown_error_on_connect_raises(self):
class MyException(Exception):
pass
def _connect(self):
self.connection_errors = ()
raise MyException('meow')
self.notify_kombu.RabbitStrategy._connect = _connect
self.assertRaises(MyException, notifier.Notifier, self.conf)
def test_timeout_on_connect_reconnects(self):
info = {'num_called': 0}
def _connect(rabbit_self):
rabbit_self.connection_errors = ()
info['num_called'] += 1
if info['num_called'] == 1:
raise Exception('foo timeout foo')
rabbit_self.connection = 'fake_connection'
self.notify_kombu.RabbitStrategy._connect = _connect
notifier_ = notifier.Notifier(self.conf)
notifier_.error('test_event', 'test_message')
if self.called is False:
self.fail("Did not call _send_message properly.")
self.assertEquals("test_message", self.called["message"]["payload"])
self.assertEquals("ERROR", self.called["message"]["priority"])
self.assertEquals(info['num_called'], 2)
def test_connection_error_on_connect_reconnects(self):
info = {'num_called': 0}
class MyException(Exception):
pass
def _connect(rabbit_self):
rabbit_self.connection_errors = (MyException, )
info['num_called'] += 1
if info['num_called'] == 1:
raise MyException('meow')
rabbit_self.connection = 'fake_connection'
self.notify_kombu.RabbitStrategy._connect = _connect
notifier_ = notifier.Notifier(self.conf)
notifier_.error('test_event', 'test_message')
if self.called is False:
self.fail("Did not call _send_message properly.")
self.assertEquals("test_message", self.called["message"]["payload"])
self.assertEquals("ERROR", self.called["message"]["priority"])
self.assertEquals(info['num_called'], 2)
def test_unknown_error_on_send_message_raises(self):
class MyException(Exception):
pass
def _send_message(rabbit_self, msg, routing_key):
raise MyException('meow')
self.notify_kombu.RabbitStrategy._send_message = _send_message
notifier_ = notifier.Notifier(self.conf)
self.assertRaises(MyException, notifier_.error, 'a', 'b')
def test_timeout_on_send_message_reconnects(self):
info = {'send_called': 0, 'conn_called': 0}
def _connect(rabbit_self):
info['conn_called'] += 1
rabbit_self.connection_errors = ()
rabbit_self.connection = 'fake_connection'
def _send_message(rabbit_self, msg, routing_key):
info['send_called'] += 1
if info['send_called'] == 1:
raise Exception('foo timeout foo')
self._send_message(msg, routing_key)
self.notify_kombu.RabbitStrategy._connect = _connect
self.notify_kombu.RabbitStrategy._send_message = _send_message
notifier_ = notifier.Notifier(self.conf)
notifier_.error('test_event', 'test_message')
if self.called is False:
self.fail("Did not call _send_message properly.")
self.assertEquals("test_message", self.called["message"]["payload"])
self.assertEquals("ERROR", self.called["message"]["priority"])
self.assertEquals(info['send_called'], 2)
self.assertEquals(info['conn_called'], 2)
def test_connection_error_on_send_message_reconnects(self):
info = {'send_called': 0, 'conn_called': 0}
class MyException(Exception):
pass
def _connect(rabbit_self):
info['conn_called'] += 1
rabbit_self.connection_errors = (MyException, )
rabbit_self.connection = 'fake_connection'
def _send_message(rabbit_self, msg, routing_key):
info['send_called'] += 1
if info['send_called'] == 1:
raise MyException('meow')
self._send_message(msg, routing_key)
self.notify_kombu.RabbitStrategy._connect = _connect
self.notify_kombu.RabbitStrategy._send_message = _send_message
notifier_ = notifier.Notifier(self.conf)
notifier_.error('test_event', 'test_message')
if self.called is False:
self.fail("Did not call _send_message properly.")
self.assertEquals("test_message", self.called["message"]["payload"])
self.assertEquals("ERROR", self.called["message"]["priority"])
self.assertEquals(info['send_called'], 2)
self.assertEquals(info['conn_called'], 2)