Each node does a join_cluster with the leader. Add a random time wait before attempting to cluster
This commit is contained in:
@@ -4,6 +4,8 @@ import sys
|
||||
import subprocess
|
||||
import glob
|
||||
import tempfile
|
||||
import random
|
||||
import time
|
||||
|
||||
from rabbitmq_context import (
|
||||
RabbitMQSSLContext,
|
||||
@@ -19,14 +21,12 @@ from charmhelpers.contrib.openstack.utils import (
|
||||
from charmhelpers.core.hookenv import (
|
||||
config,
|
||||
relation_ids,
|
||||
relation_get,
|
||||
related_units,
|
||||
log, ERROR,
|
||||
INFO,
|
||||
service_name,
|
||||
status_set,
|
||||
cached,
|
||||
unit_private_ip,
|
||||
cached
|
||||
)
|
||||
|
||||
from charmhelpers.core.host import (
|
||||
@@ -303,18 +303,23 @@ def cluster_with():
|
||||
return False
|
||||
|
||||
# check all peers and try to cluster with them
|
||||
if len(available_nodes()) == 0:
|
||||
if len(leader_node()) == 0:
|
||||
log('No nodes available to cluster with')
|
||||
return False
|
||||
|
||||
# iterate over all the nodes, join to the first available
|
||||
num_tries = 0
|
||||
for node in available_nodes():
|
||||
for node in leader_node():
|
||||
if node in running_nodes():
|
||||
log('Host already clustered with %s.' % node)
|
||||
continue
|
||||
return False
|
||||
log('Clustering with remote rabbit host (%s).' % node)
|
||||
|
||||
# NOTE: The primary problem rabbitmq has clustering is when
|
||||
# more than one node attempts to cluster at the same time.
|
||||
# The asynchronous nature of hook firing nearly guarantees
|
||||
# this. Using random time wait is a hack until we can
|
||||
# implement charmhelpers.coordinator.
|
||||
time.sleep(random.random()*100)
|
||||
try:
|
||||
cmd = [RABBITMQ_CTL, 'stop_app']
|
||||
subprocess.check_call(cmd)
|
||||
@@ -323,8 +328,10 @@ def cluster_with():
|
||||
cmd = [RABBITMQ_CTL, 'start_app']
|
||||
subprocess.check_call(cmd)
|
||||
log('Host clustered with %s.' % node)
|
||||
return True
|
||||
except subprocess.CalledProcessError as e:
|
||||
log('Failed to cluster with %s. Exception: %s' % (node, e))
|
||||
log('Failed to cluster with %s. Exception: %s'
|
||||
% (node, e))
|
||||
cmd = [RABBITMQ_CTL, 'start_app']
|
||||
subprocess.check_call(cmd)
|
||||
# continue to the next node
|
||||
@@ -333,6 +340,8 @@ def cluster_with():
|
||||
log('Max tries number exhausted, exiting', level=ERROR)
|
||||
raise
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def break_cluster():
|
||||
try:
|
||||
@@ -595,23 +604,13 @@ def running_nodes():
|
||||
|
||||
|
||||
@cached
|
||||
def available_nodes():
|
||||
''' Provide list of expected nodes in the cluster '''
|
||||
nodes = []
|
||||
for r_id in relation_ids('cluster'):
|
||||
for unit in related_units(r_id):
|
||||
if config('prefer-ipv6'):
|
||||
address = relation_get('hostname',
|
||||
rid=r_id, unit=unit)
|
||||
else:
|
||||
address = relation_get('private-address',
|
||||
rid=r_id, unit=unit)
|
||||
if address is not None:
|
||||
node = get_node_hostname(address)
|
||||
if node:
|
||||
nodes.append("rabbit@" + node)
|
||||
|
||||
return nodes
|
||||
def leader_node():
|
||||
''' Provide the leader node for clustering '''
|
||||
# Each rabbitmq node should join_cluster with the leader
|
||||
# to avoid split-brain clusters.
|
||||
leader_node_ip = peer_retrieve('leader_node_ip')
|
||||
if leader_node_ip:
|
||||
return ["rabbit@" + get_node_hostname(leader_node_ip)]
|
||||
|
||||
|
||||
def get_node_hostname(address):
|
||||
@@ -628,10 +627,13 @@ def get_node_hostname(address):
|
||||
@cached
|
||||
def clustered():
|
||||
''' Determine whether local rabbitmq-server is clustered '''
|
||||
local_node = "rabbit@" + get_node_hostname(unit_private_ip())
|
||||
nodes = available_nodes()
|
||||
nodes.append(local_node)
|
||||
return sorted(nodes) == sorted(running_nodes())
|
||||
# NOTE: A rabbitmq node can only join a cluster once.
|
||||
# Simply checking for more than one running node tells us
|
||||
# if this unit is in a cluster.
|
||||
if len(running_nodes()) > 1:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def assess_status():
|
||||
|
||||
@@ -66,6 +66,7 @@ from charmhelpers.core.hookenv import (
|
||||
is_leader,
|
||||
charm_dir,
|
||||
status_set,
|
||||
unit_private_ip,
|
||||
)
|
||||
from charmhelpers.core.host import (
|
||||
cmp_pkgrevno,
|
||||
@@ -322,6 +323,7 @@ def cluster_joined(relation_id=None):
|
||||
log('Leader peer_storing cookie', level=INFO)
|
||||
cookie = open(rabbit.COOKIE_PATH, 'r').read().strip()
|
||||
peer_store('cookie', cookie)
|
||||
peer_store('leader_node_ip', unit_private_ip())
|
||||
|
||||
|
||||
@hooks.hook('cluster-relation-changed')
|
||||
|
||||
@@ -131,29 +131,14 @@ class UtilsTests(unittest.TestCase):
|
||||
self.assertEqual(lines[1], "%s %s\n" % (map.items()[0]))
|
||||
self.assertEqual(lines[4], "%s %s\n" % (map.items()[3]))
|
||||
|
||||
@mock.patch('rabbit_utils.unit_private_ip')
|
||||
@mock.patch('rabbit_utils.get_node_hostname')
|
||||
@mock.patch('rabbit_utils.available_nodes')
|
||||
@mock.patch('rabbit_utils.running_nodes')
|
||||
def test_not_clustered(self, mock_running_nodes, mock_available_nodes,
|
||||
mock_get_node_hostname,
|
||||
mock_unit_private_ip):
|
||||
mock_get_node_hostname.return_value = 'host-c'
|
||||
mock_available_nodes.return_value = ['rabbit@host-a', 'rabbit@host-b']
|
||||
mock_running_nodes.return_value = ['rabbit@host-c']
|
||||
def test_not_clustered(self, mock_running_nodes):
|
||||
mock_running_nodes.return_value = []
|
||||
self.assertFalse(rabbit_utils.clustered())
|
||||
|
||||
@mock.patch('rabbit_utils.unit_private_ip')
|
||||
@mock.patch('rabbit_utils.get_node_hostname')
|
||||
@mock.patch('rabbit_utils.available_nodes')
|
||||
@mock.patch('rabbit_utils.running_nodes')
|
||||
def test_clustered(self, mock_running_nodes, mock_available_nodes,
|
||||
mock_get_node_hostname,
|
||||
mock_unit_private_ip):
|
||||
mock_get_node_hostname.return_value = 'host-c'
|
||||
mock_available_nodes.return_value = ['rabbit@host-a', 'rabbit@host-b']
|
||||
mock_running_nodes.return_value = ['rabbit@host-a', 'rabbit@host-b',
|
||||
'rabbit@host-c']
|
||||
def test_clustered(self, mock_running_nodes):
|
||||
mock_running_nodes.return_value = ['a', 'b']
|
||||
self.assertTrue(rabbit_utils.clustered())
|
||||
|
||||
@mock.patch('rabbit_utils.subprocess')
|
||||
|
||||
Reference in New Issue
Block a user