Default rabbit max_retries to forever
Modify carrot code to handle retry backoffs and obey max_retries = forever Fix some kombu issues from cut-n-paste Service should make sure to close the RPC connection
This commit is contained in:
@@ -302,9 +302,9 @@ DEFINE_bool('rabbit_use_ssl', False, 'connect over SSL')
|
|||||||
DEFINE_string('rabbit_userid', 'guest', 'rabbit userid')
|
DEFINE_string('rabbit_userid', 'guest', 'rabbit userid')
|
||||||
DEFINE_string('rabbit_password', 'guest', 'rabbit password')
|
DEFINE_string('rabbit_password', 'guest', 'rabbit password')
|
||||||
DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
|
DEFINE_string('rabbit_virtual_host', '/', 'rabbit virtual host')
|
||||||
DEFINE_integer('rabbit_retry_interval', 10, 'rabbit connection retry interval')
|
DEFINE_integer('rabbit_retry_interval', 1, 'rabbit connection retry interval to start')
|
||||||
DEFINE_integer('rabbit_interval_stepping', 2, 'rabbit connection retry backoff in seconds')
|
DEFINE_integer('rabbit_retry_backoff', 2, 'rabbit connection retry backoff in seconds')
|
||||||
DEFINE_integer('rabbit_max_retries', 12, 'rabbit connection attempts')
|
DEFINE_integer('rabbit_max_retries', 0, 'maximum rabbit connection attempts (0=try forever)')
|
||||||
DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
|
DEFINE_string('control_exchange', 'nova', 'the main exchange to connect to')
|
||||||
DEFINE_boolean('rabbit_durable_queues', False, 'use durable queues')
|
DEFINE_boolean('rabbit_durable_queues', False, 'use durable queues')
|
||||||
DEFINE_list('enabled_apis', ['ec2', 'osapi'],
|
DEFINE_list('enabled_apis', ['ec2', 'osapi'],
|
||||||
|
|||||||
@@ -119,25 +119,34 @@ class Consumer(messaging.Consumer):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
for i in xrange(FLAGS.rabbit_max_retries):
|
max_retries = FALGS.rabbit_max_retries
|
||||||
if i > 0:
|
sleep_time = FLAGS.rabbit_retry_interval
|
||||||
time.sleep(FLAGS.rabbit_retry_interval)
|
tries = 0
|
||||||
|
while True:
|
||||||
|
tries += 1
|
||||||
|
if tries > 1:
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
# backoff for next retry attempt.. if there is one
|
||||||
|
sleep_time += FLAGS.rabbit_retry_backoff
|
||||||
|
if sleep_time > 30:
|
||||||
|
sleep_time = 30
|
||||||
try:
|
try:
|
||||||
super(Consumer, self).__init__(*args, **kwargs)
|
super(Consumer, self).__init__(*args, **kwargs)
|
||||||
self.failed_connection = False
|
self.failed_connection = False
|
||||||
break
|
break
|
||||||
except Exception as e: # Catching all because carrot sucks
|
except Exception as e: # Catching all because carrot sucks
|
||||||
|
self.failed_connection = True
|
||||||
|
if max_retries > 0 and tries == max_retries:
|
||||||
|
break
|
||||||
fl_host = FLAGS.rabbit_host
|
fl_host = FLAGS.rabbit_host
|
||||||
fl_port = FLAGS.rabbit_port
|
fl_port = FLAGS.rabbit_port
|
||||||
fl_intv = FLAGS.rabbit_retry_interval
|
fl_intv = sleep_time
|
||||||
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
|
LOG.error(_('AMQP server on %(fl_host)s:%(fl_port)d is'
|
||||||
' unreachable: %(e)s. Trying again in %(fl_intv)d'
|
' unreachable: %(e)s. Trying again in %(fl_intv)d'
|
||||||
' seconds.') % locals())
|
' seconds.') % locals())
|
||||||
self.failed_connection = True
|
|
||||||
if self.failed_connection:
|
if self.failed_connection:
|
||||||
LOG.error(_('Unable to connect to AMQP server '
|
LOG.error(_('Unable to connect to AMQP server '
|
||||||
'after %d tries. Shutting down.'),
|
'after %(tries)d tries. Shutting down.') % locals())
|
||||||
FLAGS.rabbit_max_retries)
|
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
|
||||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||||
@@ -520,6 +529,11 @@ class MulticallWaiter(object):
|
|||||||
yield result
|
yield result
|
||||||
|
|
||||||
|
|
||||||
|
def create_connection(new=True):
|
||||||
|
"""Create a connection"""
|
||||||
|
return Connection.instance(new=new)
|
||||||
|
|
||||||
|
|
||||||
def create_consumer(conn, topic, proxy, fanout=False):
|
def create_consumer(conn, topic, proxy, fanout=False):
|
||||||
"""Create a consumer that calls methods in the proxy"""
|
"""Create a consumer that calls methods in the proxy"""
|
||||||
if fanout:
|
if fanout:
|
||||||
|
|||||||
@@ -288,9 +288,13 @@ class Connection(object):
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.queues = []
|
self.queues = []
|
||||||
self.max_retries = FLAGS.rabbit_max_retries
|
self.max_retries = FLAGS.rabbit_max_retries
|
||||||
|
# Try forever?
|
||||||
|
if self.max_retries <= 0:
|
||||||
|
self.max_retries = None
|
||||||
self.interval_start = FLAGS.rabbit_retry_interval
|
self.interval_start = FLAGS.rabbit_retry_interval
|
||||||
self.interval_stepping = FLAGS.rabbit_interval_stepping
|
self.interval_stepping = FLAGS.rabbit_retry_backoff
|
||||||
self.interval_max = FLAGS.rabbit_retry_interval
|
# max retry-interval = 30 seconds
|
||||||
|
self.interval_max = 30
|
||||||
|
|
||||||
self.params = dict(hostname=FLAGS.rabbit_host,
|
self.params = dict(hostname=FLAGS.rabbit_host,
|
||||||
port=FLAGS.rabbit_port,
|
port=FLAGS.rabbit_port,
|
||||||
@@ -302,16 +306,6 @@ class Connection(object):
|
|||||||
self.connection = None
|
self.connection = None
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def instance(cls, new=True):
|
|
||||||
"""Returns the instance."""
|
|
||||||
if new or not hasattr(cls, '_instance'):
|
|
||||||
if new:
|
|
||||||
return cls()
|
|
||||||
else:
|
|
||||||
cls._instance = cls()
|
|
||||||
return cls._instance
|
|
||||||
|
|
||||||
def reconnect(self):
|
def reconnect(self):
|
||||||
"""Handles reconnecting and re-estblishing queues"""
|
"""Handles reconnecting and re-estblishing queues"""
|
||||||
if self.connection:
|
if self.connection:
|
||||||
@@ -330,12 +324,12 @@ class Connection(object):
|
|||||||
interval_step=self.interval_stepping,
|
interval_step=self.interval_stepping,
|
||||||
interval_max=self.interval_max)
|
interval_max=self.interval_max)
|
||||||
except self.connection.connection_errors, e:
|
except self.connection.connection_errors, e:
|
||||||
|
# We should only get here if max_retries is set. We'll go
|
||||||
|
# ahead and exit in this case.
|
||||||
err_str = str(e)
|
err_str = str(e)
|
||||||
max_retries = FLAGS.rabbit_max_retries
|
max_retries = self.max_retries
|
||||||
LOG.error(_('Unable to connect to AMQP server '
|
LOG.error(_('Unable to connect to AMQP server '
|
||||||
'after %(max_retries)d tries: %(err_str)s') % locals())
|
'after %(max_retries)d tries: %(err_str)s') % locals())
|
||||||
# NOTE(comstud): Original carrot code exits after so many
|
|
||||||
# attempts, but I wonder if we should re-try indefinitely
|
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
|
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d' %
|
||||||
self.params))
|
self.params))
|
||||||
@@ -448,7 +442,7 @@ class Pool(pools.Pool):
|
|||||||
# TODO(comstud): Timeout connections not used in a while
|
# TODO(comstud): Timeout connections not used in a while
|
||||||
def create(self):
|
def create(self):
|
||||||
LOG.debug('Creating new connection')
|
LOG.debug('Creating new connection')
|
||||||
return RPCIMPL.Connection()
|
return Connection()
|
||||||
|
|
||||||
# Create a ConnectionPool to use for RPC calls. We'll order the
|
# Create a ConnectionPool to use for RPC calls. We'll order the
|
||||||
# pool as a stack (LIFO), so that we can potentially loop through and
|
# pool as a stack (LIFO), so that we can potentially loop through and
|
||||||
@@ -464,7 +458,7 @@ class ConnectionContext(object):
|
|||||||
if pooled:
|
if pooled:
|
||||||
self.connection = ConnectionPool.get()
|
self.connection = ConnectionPool.get()
|
||||||
else:
|
else:
|
||||||
self.connection = RPCIMPL.Connection()
|
self.connection = Connection()
|
||||||
self.pooled = pooled
|
self.pooled = pooled
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
@@ -636,6 +630,11 @@ class MulticallWaiter(object):
|
|||||||
yield result
|
yield result
|
||||||
|
|
||||||
|
|
||||||
|
def create_connection(new=True):
|
||||||
|
"""Create a connection"""
|
||||||
|
return ConnectionContext(pooled=not new)
|
||||||
|
|
||||||
|
|
||||||
def create_consumer(conn, topic, proxy, fanout=False):
|
def create_consumer(conn, topic, proxy, fanout=False):
|
||||||
"""Create a consumer that calls a method in a proxy object"""
|
"""Create a consumer that calls a method in a proxy object"""
|
||||||
if fanout:
|
if fanout:
|
||||||
@@ -649,7 +648,7 @@ def create_consumer_set(conn, consumers):
|
|||||||
# Returns an object that you can call .wait() on to consume
|
# Returns an object that you can call .wait() on to consume
|
||||||
# all queues?
|
# all queues?
|
||||||
# Needs to have a .close() which will stop consuming?
|
# Needs to have a .close() which will stop consuming?
|
||||||
# Needs to also have an attach_to_eventlet method for tests?
|
# Needs to also have an method for tests?
|
||||||
raise NotImplemented
|
raise NotImplemented
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -242,6 +242,12 @@ class Service(object):
|
|||||||
self.consumer_set_thread.wait()
|
self.consumer_set_thread.wait()
|
||||||
except greenlet.GreenletExit:
|
except greenlet.GreenletExit:
|
||||||
pass
|
pass
|
||||||
|
# Try to shut the connection down, but if we get any sort of
|
||||||
|
# errors, go ahead and ignore them.. as we're shutting down anyway
|
||||||
|
try:
|
||||||
|
self.conn.close()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
for x in self.timers:
|
for x in self.timers:
|
||||||
try:
|
try:
|
||||||
x.stop()
|
x.stop()
|
||||||
|
|||||||
Reference in New Issue
Block a user