From 04e24eeb8a8de02831a7e60c7ec6161c364add3a Mon Sep 17 00:00:00 2001 From: Thomas Herve <thomas.herve@enovance.com> Date: Mon, 7 Apr 2014 21:17:28 +0200 Subject: [PATCH] Cherry pick oslo rpc HA fixes Get 2 commits from oslo-rpc fixing HA failover: * Ia148baa6e1ec632789ac3621c85173c2c16f3918 (fixed HA failover, Qpid part) * I67923cb024bbd143edc8edccf35b9b400df31eb3 (fixed HA failover, RabbitMQ part) Change-Id: I45f679f3da720a0c28fb552d9f4cfb3d8bd21c20 Closes-Bug: #1261631 --- heat/openstack/common/rpc/impl_kombu.py | 5 ++++- heat/openstack/common/rpc/impl_qpid.py | 8 +++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/heat/openstack/common/rpc/impl_kombu.py b/heat/openstack/common/rpc/impl_kombu.py index af00b3ffe5..6cb4d1e73e 100644 --- a/heat/openstack/common/rpc/impl_kombu.py +++ b/heat/openstack/common/rpc/impl_kombu.py @@ -458,6 +458,9 @@ class Connection(object): self.params_list = params_list + brokers_count = len(self.params_list) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + self.memory_transport = self.conf.fake_rabbit self.connection = None @@ -528,7 +531,7 @@ class Connection(object): attempt = 0 while True: - params = self.params_list[attempt % len(self.params_list)] + params = self.params_list[next(self.next_broker_indices)] attempt += 1 try: self._connect(params) diff --git a/heat/openstack/common/rpc/impl_qpid.py b/heat/openstack/common/rpc/impl_qpid.py index 14baceab34..90ddb30c88 100644 --- a/heat/openstack/common/rpc/impl_qpid.py +++ b/heat/openstack/common/rpc/impl_qpid.py @@ -467,6 +467,10 @@ class Connection(object): self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] + + brokers_count = len(self.brokers) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + self.connection_create(self.brokers[0]) self.reconnect() @@ -494,7 +498,6 @@ class Connection(object): def reconnect(self): """Handles reconnecting and re-establishing sessions and queues.""" - attempt = 0 delay = 1 while True: # Close the session if necessary @@ -504,8 +507,7 @@ class Connection(object): except qpid_exceptions.ConnectionError: pass - broker = self.brokers[attempt % len(self.brokers)] - attempt += 1 + broker = self.brokers[next(self.next_broker_indices)] try: self.connection_create(broker)