Merge "RabbitMQ: advance thru the list of brokers on reconnect"
This commit is contained in:
commit
5a48b027a0
|
@ -452,6 +452,9 @@ class Connection(object):
|
|||
|
||||
self.params_list = params_list
|
||||
|
||||
brokers_count = len(self.params_list)
|
||||
self.next_broker_indices = itertools.cycle(range(brokers_count))
|
||||
|
||||
self.memory_transport = self.conf.fake_rabbit
|
||||
|
||||
self.connection = None
|
||||
|
@ -524,7 +527,7 @@ class Connection(object):
|
|||
|
||||
attempt = 0
|
||||
while True:
|
||||
params = self.params_list[attempt % len(self.params_list)]
|
||||
params = self.params_list[next(self.next_broker_indices)]
|
||||
attempt += 1
|
||||
try:
|
||||
self._connect(params)
|
||||
|
|
|
@ -603,3 +603,41 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
|
|||
|
||||
|
||||
TestReplyWireFormat.generate_scenarios()
|
||||
|
||||
|
||||
class RpcKombuHATestCase(test_utils.BaseTestCase):
|
||||
|
||||
def test_reconnect_order(self):
|
||||
brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
||||
brokers_count = len(brokers)
|
||||
|
||||
self.conf.rabbit_hosts = brokers
|
||||
self.conf.rabbit_max_retries = 1
|
||||
|
||||
info = {'attempt': 0}
|
||||
|
||||
def _connect(myself, params):
|
||||
# do as little work that is enough to pass connection attempt
|
||||
myself.connection = kombu.connection.BrokerConnection(**params)
|
||||
myself.connection_errors = myself.connection.connection_errors
|
||||
|
||||
expected_broker = brokers[info['attempt'] % brokers_count]
|
||||
self.assertEqual(params['hostname'], expected_broker)
|
||||
|
||||
info['attempt'] += 1
|
||||
|
||||
# just make sure connection instantiation does not fail with an
|
||||
# exception
|
||||
self.stubs.Set(rabbit_driver.Connection, '_connect', _connect)
|
||||
|
||||
# starting from the first broker in the list
|
||||
connection = rabbit_driver.Connection(self.conf)
|
||||
|
||||
# now that we have connection object, revert to the real 'connect'
|
||||
# implementation
|
||||
self.stubs.UnsetAll()
|
||||
|
||||
for i in range(len(brokers)):
|
||||
self.assertRaises(driver_common.RPCException, connection.reconnect)
|
||||
|
||||
connection.close()
|
||||
|
|
Loading…
Reference in New Issue