Check for running AMQP instances
This commit is contained in:
45
nova/rpc.py
45
nova/rpc.py
@@ -24,6 +24,7 @@ No fan-out support yet.
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from carrot import connection as carrot_connection
|
from carrot import connection as carrot_connection
|
||||||
@@ -82,31 +83,35 @@ class Consumer(messaging.Consumer):
|
|||||||
Contains methods for connecting the fetch method to async loops
|
Contains methods for connecting the fetch method to async loops
|
||||||
"""
|
"""
|
||||||
def __init__(self, *args, **kwargs):
|
def __init__(self, *args, **kwargs):
|
||||||
self.failed_connection = False
|
while True:
|
||||||
super(Consumer, self).__init__(*args, **kwargs)
|
try:
|
||||||
|
super(Consumer, self).__init__(*args, **kwargs)
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
logging.warning("AMQP server on %s:%d is unreachable. " \
|
||||||
|
"Trying again in 30 seconds." % (
|
||||||
|
FLAGS.rabbit_host,
|
||||||
|
FLAGS.rabbit_port
|
||||||
|
))
|
||||||
|
time.sleep(30)
|
||||||
|
continue
|
||||||
|
|
||||||
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
def fetch(self, no_ack=None, auto_ack=None, enable_callbacks=False):
|
||||||
"""Wraps the parent fetch with some logic for failed connections"""
|
"""Wraps the parent fetch with some logic for failed connections"""
|
||||||
# TODO(vish): the logic for failed connections and logging should be
|
|
||||||
# refactored into some sort of connection manager object
|
|
||||||
try:
|
try:
|
||||||
if self.failed_connection:
|
|
||||||
# NOTE(vish): conn is defined in the parent class, we can
|
|
||||||
# recreate it as long as we create the backend too
|
|
||||||
# pylint: disable-msg=W0201
|
|
||||||
self.conn = Connection.recreate()
|
|
||||||
self.backend = self.conn.create_backend()
|
|
||||||
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
|
super(Consumer, self).fetch(no_ack, auto_ack, enable_callbacks)
|
||||||
if self.failed_connection:
|
except:
|
||||||
logging.error("Reconnected to queue")
|
try:
|
||||||
self.failed_connection = False
|
self.connection = Connection.recreate()
|
||||||
# NOTE(vish): This is catching all errors because we really don't
|
self.backend = self.connection.create_backend()
|
||||||
# exceptions to be logged 10 times a second if some
|
self.declare()
|
||||||
# persistent failure occurs.
|
except:
|
||||||
except Exception: # pylint: disable-msg=W0703
|
logging.warning("AMQP server on %s:%d is unreachable. " \
|
||||||
if not self.failed_connection:
|
"Trying again in 30 seconds." % (
|
||||||
logging.exception("Failed to fetch message from queue")
|
FLAGS.rabbit_host,
|
||||||
self.failed_connection = True
|
FLAGS.rabbit_port
|
||||||
|
))
|
||||||
|
time.sleep(30)
|
||||||
|
|
||||||
def attach_to_eventlet(self):
|
def attach_to_eventlet(self):
|
||||||
"""Only needed for unit tests!"""
|
"""Only needed for unit tests!"""
|
||||||
|
|||||||
Reference in New Issue
Block a user