137 lines
5.6 KiB
Diff
137 lines
5.6 KiB
Diff
--- a/openstack/common/rpc/impl_qpid.py
|
|
+++ a/openstack/common/rpc/impl_qpid.py
|
|
@@ -41,6 +41,9 @@
|
|
cfg.StrOpt('qpid_port',
|
|
default='5672',
|
|
help='Qpid broker port'),
|
|
+ cfg.ListOpt('qpid_hosts',
|
|
+ default=['$qpid_hostname:$qpid_port'],
|
|
+ help='Qpid HA cluster host:port pairs'),
|
|
cfg.StrOpt('qpid_username',
|
|
default='',
|
|
help='Username for qpid connection'),
|
|
@@ -50,24 +53,6 @@
|
|
cfg.StrOpt('qpid_sasl_mechanisms',
|
|
default='',
|
|
help='Space separated list of SASL mechanisms to use for auth'),
|
|
- cfg.BoolOpt('qpid_reconnect',
|
|
- default=True,
|
|
- help='Automatically reconnect'),
|
|
- cfg.IntOpt('qpid_reconnect_timeout',
|
|
- default=0,
|
|
- help='Reconnection timeout in seconds'),
|
|
- cfg.IntOpt('qpid_reconnect_limit',
|
|
- default=0,
|
|
- help='Max reconnections before giving up'),
|
|
- cfg.IntOpt('qpid_reconnect_interval_min',
|
|
- default=0,
|
|
- help='Minimum seconds between reconnection attempts'),
|
|
- cfg.IntOpt('qpid_reconnect_interval_max',
|
|
- default=0,
|
|
- help='Maximum seconds between reconnection attempts'),
|
|
- cfg.IntOpt('qpid_reconnect_interval',
|
|
- default=0,
|
|
- help='Equivalent to setting max and min to the same value'),
|
|
cfg.IntOpt('qpid_heartbeat',
|
|
default=60,
|
|
help='Seconds between connection keepalive heartbeats'),
|
|
@@ -296,47 +281,32 @@
|
|
if server_params is None:
|
|
server_params = {}
|
|
|
|
- default_params = dict(hostname=self.conf.qpid_hostname,
|
|
- port=self.conf.qpid_port,
|
|
+ default_params = dict(hosts=self.conf.qpid_hosts,
|
|
username=self.conf.qpid_username,
|
|
password=self.conf.qpid_password)
|
|
|
|
- params = server_params
|
|
+ self.params = server_params
|
|
for key in default_params.keys():
|
|
- params.setdefault(key, default_params[key])
|
|
+ self.params.setdefault(key, default_params[key])
|
|
|
|
- self.broker = params['hostname'] + ":" + str(params['port'])
|
|
+ self.brokers = self.params['hosts']
|
|
+ self.create_connection(self.brokers[0])
|
|
+ self.reconnect()
|
|
+
|
|
+ def create_connection(self, broker):
|
|
# Create the connection - this does not open the connection
|
|
- self.connection = qpid.messaging.Connection(self.broker)
|
|
+ self.connection = qpid.messaging.Connection(broker)
|
|
|
|
# Check if flags are set and if so set them for the connection
|
|
# before we call open
|
|
- self.connection.username = params['username']
|
|
- self.connection.password = params['password']
|
|
+ self.connection.username = self.params['username']
|
|
+ self.connection.password = self.params['password']
|
|
+ self.connection.reconnect = False
|
|
self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms
|
|
- self.connection.reconnect = self.conf.qpid_reconnect
|
|
- if self.conf.qpid_reconnect_timeout:
|
|
- self.connection.reconnect_timeout = (
|
|
- self.conf.qpid_reconnect_timeout)
|
|
- if self.conf.qpid_reconnect_limit:
|
|
- self.connection.reconnect_limit = self.conf.qpid_reconnect_limit
|
|
- if self.conf.qpid_reconnect_interval_max:
|
|
- self.connection.reconnect_interval_max = (
|
|
- self.conf.qpid_reconnect_interval_max)
|
|
- if self.conf.qpid_reconnect_interval_min:
|
|
- self.connection.reconnect_interval_min = (
|
|
- self.conf.qpid_reconnect_interval_min)
|
|
- if self.conf.qpid_reconnect_interval:
|
|
- self.connection.reconnect_interval = (
|
|
- self.conf.qpid_reconnect_interval)
|
|
self.connection.heartbeat = self.conf.qpid_heartbeat
|
|
self.connection.protocol = self.conf.qpid_protocol
|
|
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
|
|
|
- # Open is part of reconnect -
|
|
- # NOTE(WGH) not sure we need this with the reconnect flags
|
|
- self.reconnect()
|
|
-
|
|
def _register_consumer(self, consumer):
|
|
self.consumers[str(consumer.get_receiver())] = consumer
|
|
|
|
@@ -351,23 +321,32 @@
|
|
except qpid.messaging.exceptions.ConnectionError:
|
|
pass
|
|
|
|
+ delay = 1
|
|
+ attempt = 0
|
|
while True:
|
|
+ broker = self.brokers[attempt % len(self.brokers)]
|
|
+ attempt += 1
|
|
try:
|
|
+ self.create_connection(broker)
|
|
self.connection.open()
|
|
except qpid.messaging.exceptions.ConnectionError, e:
|
|
- LOG.error(_('Unable to connect to AMQP server: %s'), e)
|
|
- time.sleep(self.conf.qpid_reconnect_interval or 1)
|
|
+ LOG.error(_('Unable to connect to AMQP server %s: %s'), broker, e)
|
|
+ time.sleep(delay)
|
|
+ delay = min(2*delay, 60)
|
|
else:
|
|
break
|
|
|
|
- LOG.info(_('Connected to AMQP server on %s'), self.broker)
|
|
+ LOG.info(_('Connected to AMQP server on %s'), broker)
|
|
|
|
self.session = self.connection.session()
|
|
|
|
- for consumer in self.consumers.itervalues():
|
|
- consumer.reconnect(self.session)
|
|
-
|
|
if self.consumers:
|
|
+ consumers = self.consumers
|
|
+ self.consumers = {}
|
|
+ for consumer in consumers.itervalues():
|
|
+ consumer.reconnect(self.session)
|
|
+ self._register_consumer(consumer)
|
|
+
|
|
LOG.debug(_("Re-established AMQP queues"))
|
|
|
|
def ensure(self, error_callback, method, *args, **kwargs):
|