Coordinate cluster join events

Use the coordination module when cluster join events are called.
The `cluster_wait` method has been removed as it is no longer used
and `cluster_with` has been broken up into three new methods (
`clustered_with_leader`, `update_peer_cluster_status` and
`join_leader`) which can be called separately. The `modulo-nodes`
and `known-wait` charm options have been removed as they are no
longer needed.

Closes-Bug: #1902793
Change-Id: I136f5dcc855da329071e119b67df25d9045e86cc
This commit is contained in:
Liam Young 2022-01-07 11:25:59 +00:00
parent 70cbe1eef9
commit 12de0d964c
4 changed files with 179 additions and 200 deletions

View File

@ -58,10 +58,6 @@ from charmhelpers.contrib.openstack.utils import (
is_unit_paused_set,
)
from charmhelpers.contrib.hahelpers.cluster import (
distributed_wait,
)
from charmhelpers.core.hookenv import (
relation_id,
relation_ids,
@ -134,7 +130,8 @@ CRONJOB_CMD = ("{schedule} root timeout -k 10s -s SIGINT {timeout} "
COORD_KEY_RESTART = "restart"
COORD_KEY_PKG_UPGRADE = "pkg_upgrade"
COORD_KEYS = [COORD_KEY_RESTART, COORD_KEY_PKG_UPGRADE]
COORD_KEY_CLUSTER = "cluster"
COORD_KEYS = [COORD_KEY_RESTART, COORD_KEY_PKG_UPGRADE, COORD_KEY_CLUSTER]
_named_passwd = '/var/lib/charm/{}/{}.passwd'
_local_named_passwd = '/var/lib/charm/{}/{}.local_passwd'
@ -958,37 +955,6 @@ def wait_app():
raise ex
def cluster_wait():
''' Wait for operations based on modulo distribution
Use the distributed_wait function to determine how long to wait before
running an operation like restart or cluster join. By setting modulo to
the exact number of nodes in the cluster we get serial operations.
Check for explicit configuration parameters for modulo distribution.
The config setting modulo-nodes has first priority. If modulo-nodes is not
set, check min-cluster-size. Finally, if neither value is set, determine
how many peers there are from the cluster relation.
@side_effect: distributed_wait is called which calls time.sleep()
@return: None
'''
wait = config('known-wait')
if config('modulo-nodes') is not None:
# modulo-nodes has first priority
num_nodes = config('modulo-nodes')
elif config('min-cluster-size'):
# min-cluster-size is consulted next
num_nodes = config('min-cluster-size')
else:
# If nothing explicit is configured, determine cluster size based on
# peer relations
num_nodes = 1
for rid in relation_ids('cluster'):
num_nodes += len(related_units(rid))
distributed_wait(modulo=num_nodes, wait=wait)
def start_app():
''' Start the rabbitmq app and wait until it is fully started '''
status_set('maintenance', 'Starting rabbitmq application')
@ -1013,54 +979,61 @@ def join_cluster(node):
log('Host clustered with %s.' % node, 'INFO')
def cluster_with():
def clustered_with_leader():
"""Whether this unit is clustered with the leader
:returns: Whether this unit is clustered with the leader
:rtype: bool
"""
node = leader_node()
if node:
return node in running_nodes()
status_set('waiting', 'Leader not available for clustering')
return False
def update_peer_cluster_status():
"""Inform peers that this unit is clustered if it is."""
# check the leader and try to cluster with it
if clustered_with_leader():
log('Host already clustered with %s.' % leader_node())
cluster_rid = relation_id('cluster', local_unit())
is_clustered = relation_get(attribute='clustered',
rid=cluster_rid,
unit=local_unit())
log('am I clustered?: %s' % bool(is_clustered), level=DEBUG)
if not is_clustered:
# NOTE(freyes): this node needs to be marked as clustered, it's
# part of the cluster according to 'rabbitmqctl cluster_status'
# (LP: #1691510)
relation_set(relation_id=cluster_rid,
clustered=get_unit_hostname(),
timestamp=time.time())
def join_leader():
"""Attempt to cluster with leader.
Attempt to cluster with leader.
"""
if is_unit_paused_set():
log("Do not run cluster_with while unit is paused", "WARNING")
return
log('Clustering with new node')
# check the leader and try to cluster with it
node = leader_node()
if node:
if node in running_nodes():
log('Host already clustered with %s.' % node)
cluster_rid = relation_id('cluster', local_unit())
is_clustered = relation_get(attribute='clustered',
rid=cluster_rid,
unit=local_unit())
log('am I clustered?: %s' % bool(is_clustered), level=DEBUG)
if not is_clustered:
# NOTE(freyes): this node needs to be marked as clustered, it's
# part of the cluster according to 'rabbitmqctl cluster_status'
# (LP: #1691510)
relation_set(relation_id=cluster_rid,
clustered=get_unit_hostname(),
timestamp=time.time())
return False
# NOTE: The primary problem rabbitmq has clustering is when
# more than one node attempts to cluster at the same time.
# The asynchronous nature of hook firing nearly guarantees
# this. Using cluster_wait based on modulo_distribution
cluster_wait()
if clustered_with_leader():
log("Unit already clustered with leader", "DEBUG")
else:
log("Attempting to cluster with leader", "INFO")
try:
node = leader_node()
join_cluster(node)
# NOTE: toggle the cluster relation to ensure that any peers
# already clustered re-assess status correctly
relation_set(clustered=get_unit_hostname(), timestamp=time.time())
return True
update_peer_cluster_status()
except subprocess.CalledProcessError as e:
status_set('blocked', 'Failed to cluster with %s. Exception: %s'
% (node, e))
start_app()
else:
status_set('waiting', 'Leader not available for clustering')
return False
return False
def check_cluster_memberships():

View File

@ -197,9 +197,24 @@ def coordinated_upgrade():
log("Package upgrade lock not granted")
def coordinated_cluster():
"""Join cluster if lock is granted."""
serial = coordinator.Serial()
if serial.granted(rabbit.COORD_KEY_CLUSTER):
log("Cluster lock granted")
rabbit.join_leader()
log("update_clients called from coordinated_cluster", DEBUG)
update_clients()
if not is_leader() and is_relation_made('nrpe-external-master'):
update_nrpe_checks()
else:
log("Cluster lock not granted")
def check_coordinated_functions():
"""Run any functions that require coordination locks."""
coordinated_upgrade()
coordinated_cluster()
manage_restart()
@ -578,9 +593,12 @@ def cluster_changed(relation_id=None, remote_unit=None):
return
if rabbit.is_sufficient_peers():
# NOTE(freyes): all the nodes need to marked as 'clustered'
# (LP: #1691510)
rabbit.cluster_with()
rabbit.update_peer_cluster_status()
if not rabbit.clustered_with_leader():
serial = coordinator.Serial()
log("Requesting cluster join lock")
serial.acquire(rabbit.COORD_KEY_CLUSTER)
coordinated_cluster()
# Local rabbit maybe clustered now so check and inform clients if
# needed.
log("update_clients called from cluster_changed", DEBUG)
@ -778,9 +796,12 @@ def upgrade_charm():
new = os.path.join('var/lib/rabbitmq', 'nagios.passwd')
shutil.move(old, new)
# NOTE(freyes): cluster_with() will take care of marking the node as
# 'clustered' for existing deployments (LP: #1691510).
rabbit.cluster_with()
rabbit.update_peer_cluster_status()
if not rabbit.clustered_with_leader():
serial = coordinator.Serial()
log("Requesting cluster join lock")
serial.acquire(rabbit.COORD_KEY_CLUSTER)
coordinated_cluster()
# Ensure all client connections are up to date on upgrade
log("update_clients called from upgrade_charm", DEBUG)

View File

@ -460,108 +460,85 @@ class UtilsTests(CharmTestCase):
self.assertEqual(rabbit_utils.leader_node(),
'rabbit@juju-devel3-machine-15')
@mock.patch('rabbit_utils.is_unit_paused_set')
@mock.patch('rabbit_utils.cluster_wait')
@mock.patch('rabbit_utils.relation_set')
@mock.patch('rabbit_utils.wait_app')
@mock.patch('rabbit_utils.subprocess.check_call')
@mock.patch('rabbit_utils.subprocess.check_output')
@mock.patch('rabbit_utils.time')
@mock.patch('rabbit_utils.running_nodes')
@mock.patch('rabbit_utils.leader_node')
@mock.patch('rabbit_utils.clustered')
@mock.patch.object(rabbit_utils, 'rabbitmqctl')
@mock.patch('rabbit_utils.cmp_pkgrevno')
@mock.patch('rabbit_utils.rabbitmq_version_newer_or_equal')
def test_cluster_with_not_clustered(
self, mock_new_rabbitmq, mock_cmp_pkgrevno, mock_clustered,
mock_leader_node, mock_running_nodes, mock_time, mock_check_output,
mock_check_call, mock_wait_app, mock_relation_set,
mock_cluster_wait, mock_is_unit_paused_set):
mock_new_rabbitmq.return_value = True
mock_is_unit_paused_set.return_value = False
mock_cmp_pkgrevno.return_value = True
mock_clustered.return_value = False
mock_leader_node.return_value = 'rabbit@juju-devel7-machine-11'
mock_running_nodes.return_value = ['rabbit@juju-devel5-machine-19']
rabbit_utils.cluster_with()
mock_cluster_wait.assert_called_once_with()
mock_check_output.assert_called_with([rabbit_utils.RABBITMQ_CTL,
'join_cluster',
'rabbit@juju-devel7-machine-11'],
stderr=-2)
@mock.patch('rabbit_utils.is_unit_paused_set')
@mock.patch('rabbit_utils.relation_get')
@mock.patch('rabbit_utils.relation_id')
@mock.patch('rabbit_utils.peer_retrieve')
@mock.patch('rabbit_utils.subprocess.check_call')
@mock.patch('rabbit_utils.subprocess.check_output')
@mock.patch('rabbit_utils.time')
@mock.patch('rabbit_utils.running_nodes')
@mock.patch('rabbit_utils.leader_node')
@mock.patch('rabbit_utils.clustered')
@mock.patch('rabbit_utils.cmp_pkgrevno')
def test_cluster_with_clustered(self, mock_cmp_pkgrevno, mock_clustered,
mock_leader_node, mock_running_nodes,
mock_time, mock_check_output,
mock_check_call, mock_peer_retrieve,
mock_relation_id, mock_relation_get,
mock_is_unit_paused_set):
mock_is_unit_paused_set.return_value = False
mock_clustered.return_value = True
mock_peer_retrieve.return_value = 'juju-devel7-machine-11'
mock_leader_node.return_value = 'rabbit@juju-devel7-machine-11'
mock_running_nodes.return_value = ['rabbit@juju-devel5-machine-19',
'rabbit@juju-devel7-machine-11']
mock_relation_id.return_value = 'cluster:1'
rabbit_utils.cluster_with()
self.assertEqual(0, mock_check_output.call_count)
def test_join_cluster(self, mock_check_output, mock_cmp_pkgrevno,
mock_rabbitmqctl):
mock_cmp_pkgrevno.return_value = 1
rabbit_utils.join_cluster('node42')
mock_check_output.assert_called_once_with(
['/usr/sbin/rabbitmqctl', 'join_cluster', 'node42'], stderr=-2)
@mock.patch('rabbit_utils.wait_app')
@mock.patch('rabbit_utils.subprocess.check_call')
@mock.patch('rabbit_utils.subprocess.check_output')
@mock.patch('rabbit_utils.time')
@mock.patch('rabbit_utils.running_nodes')
@mock.patch('rabbit_utils.leader_node')
@mock.patch('rabbit_utils.clustered')
@mock.patch('rabbit_utils.cmp_pkgrevno')
def test_cluster_with_no_leader(self, mock_cmp_pkgrevno, mock_clustered,
mock_leader_node, mock_running_nodes,
mock_time, mock_check_output,
mock_check_call, mock_wait_app):
mock_clustered.return_value = False
mock_leader_node.return_value = None
mock_running_nodes.return_value = ['rabbit@juju-devel5-machine-19']
rabbit_utils.cluster_with()
self.assertEqual(0, mock_check_output.call_count)
@mock.patch('rabbit_utils.running_nodes')
def test_clustered_with_leader(self, mock_running_nodes,
mock_leader_node):
mock_leader_node.return_value = 'node42'
mock_running_nodes.return_value = ['node12', 'node27']
self.assertFalse(rabbit_utils.clustered_with_leader())
mock_running_nodes.return_value = ['node12', 'node42', 'node27']
self.assertTrue(rabbit_utils.clustered_with_leader())
@mock.patch('rabbit_utils.is_unit_paused_set')
@mock.patch('time.time')
@mock.patch('rabbit_utils.relation_set')
@mock.patch('rabbit_utils.get_unit_hostname')
@mock.patch('rabbit_utils.time.time')
@mock.patch.object(rabbit_utils, 'clustered_with_leader')
@mock.patch('rabbit_utils.relation_get')
@mock.patch('rabbit_utils.relation_id')
@mock.patch('rabbit_utils.running_nodes')
@mock.patch('rabbit_utils.peer_retrieve')
def test_cluster_with_single_node(self, mock_peer_retrieve,
mock_running_nodes, mock_relation_id,
mock_relation_get,
mock_get_unit_hostname,
mock_relation_set, mock_time,
mock_is_unit_paused_set):
@mock.patch('rabbit_utils.relation_set')
@mock.patch('rabbit_utils.leader_node')
def test_update_peer_cluster_status_clustered(self, mock_leader_node,
mock_relation_set,
mock_relation_id,
mock_relation_get,
mock_clustered_with_leader,
mock_time,
mock_get_unit_hostname):
mock_get_unit_hostname.return_value = 'host1'
mock_time.return_value = '12:30'
mock_leader_node.return_value = 'node42'
mock_clustered_with_leader.return_value = False
self.assertFalse(mock_relation_set.called)
mock_clustered_with_leader.return_value = True
mock_relation_id.return_value = 'rid1'
mock_relation_get.return_value = 'True'
rabbit_utils.update_peer_cluster_status()
self.assertFalse(mock_relation_set.called)
mock_clustered_with_leader.return_value = True
mock_relation_id.return_value = 'rid1'
mock_relation_get.return_value = None
rabbit_utils.update_peer_cluster_status()
mock_relation_set.assert_called_once_with(
relation_id='rid1', clustered='host1', timestamp='12:30')
@mock.patch.object(rabbit_utils, 'clustered_with_leader')
@mock.patch.object(rabbit_utils, 'join_cluster')
@mock.patch.object(rabbit_utils, 'update_peer_cluster_status')
@mock.patch('rabbit_utils.leader_node')
@mock.patch('rabbit_utils.is_unit_paused_set')
def test_join_leader(self, mock_is_unit_paused_set, mock_leader_node,
mock_update_peer_cluster_status,
mock_join_cluster,
mock_clustered_with_leader):
mock_leader_node.return_value = 'node42'
mock_is_unit_paused_set.return_value = True
rabbit_utils.join_leader()
self.assertFalse(mock_join_cluster.called)
self.assertFalse(mock_update_peer_cluster_status.called)
mock_is_unit_paused_set.return_value = False
mock_peer_retrieve.return_value = 'localhost'
mock_running_nodes.return_value = ['rabbit@localhost']
mock_relation_id.return_value = 'cluster:1'
mock_relation_get.return_value = False
mock_get_unit_hostname.return_value = 'localhost'
mock_time.return_value = 1234.1
mock_clustered_with_leader.return_value = True
rabbit_utils.join_leader()
self.assertFalse(mock_join_cluster.called)
self.assertFalse(mock_update_peer_cluster_status.called)
self.assertFalse(rabbit_utils.cluster_with())
mock_relation_set.assert_called_with(relation_id='cluster:1',
clustered='localhost',
timestamp=1234.1)
mock_is_unit_paused_set.return_value = False
mock_clustered_with_leader.return_value = False
rabbit_utils.join_leader()
mock_join_cluster.assert_called_once_with('node42')
mock_update_peer_cluster_status.assert_called_once_with()
@mock.patch('rabbit_utils.application_version_set')
@mock.patch('rabbit_utils.get_upstream_version')
@ -725,6 +702,7 @@ class UtilsTests(CharmTestCase):
get_deferred_restarts, get_deferred_hooks, is_cron_schedule_valid):
self.coordinator.Serial().requested.side_effect = lambda x: {
'restart': True,
'cluster': False,
'pkg_upgrade': False}[x]
is_cron_schedule_valid.return_value = True
get_deferred_hooks.return_value = []
@ -744,6 +722,7 @@ class UtilsTests(CharmTestCase):
status_set.reset_mock()
self.coordinator.Serial().requested.side_effect = lambda x: {
'restart': False,
'cluster': False,
'pkg_upgrade': True}[x]
rabbit_utils.assess_status_func('test-config')()
status_set.assert_called_once_with(
@ -1056,34 +1035,6 @@ class UtilsTests(CharmTestCase):
self.test_config.set_previous("source", "same")
self.assertFalse(rabbit_utils.archive_upgrade_available())
@mock.patch.object(rabbit_utils, 'distributed_wait')
def test_cluster_wait(self, mock_distributed_wait):
self.relation_ids.return_value = ['amqp:27']
self.related_units.return_value = ['unit/1', 'unit/2', 'unit/3']
# Default check peer relation
_config = {'known-wait': 30}
self.config.side_effect = lambda key: _config.get(key)
rabbit_utils.cluster_wait()
mock_distributed_wait.assert_called_with(modulo=4, wait=30)
# Use Min Cluster Size
_config = {'min-cluster-size': 5, 'known-wait': 30}
self.config.side_effect = lambda key: _config.get(key)
rabbit_utils.cluster_wait()
mock_distributed_wait.assert_called_with(modulo=5, wait=30)
# Override with modulo-nodes
_config = {'min-cluster-size': 5, 'modulo-nodes': 10, 'known-wait': 60}
self.config.side_effect = lambda key: _config.get(key)
rabbit_utils.cluster_wait()
mock_distributed_wait.assert_called_with(modulo=10, wait=60)
# Just modulo-nodes
_config = {'modulo-nodes': 10, 'known-wait': 60}
self.config.side_effect = lambda key: _config.get(key)
rabbit_utils.cluster_wait()
mock_distributed_wait.assert_called_with(modulo=10, wait=60)
@mock.patch.object(rabbit_utils, 'get_vhost_policy')
@mock.patch.object(rabbit_utils, 'rabbitmqctl')
def test_configure_notification_ttl(self, mock_rabbitmqctl,

View File

@ -42,6 +42,8 @@ TO_PATCH = [
'is_leader',
'relation_ids',
'related_units',
'coordinator',
'deferred_events',
]
@ -445,3 +447,35 @@ class RelationUtil(CharmTestCase):
description='Remove check RabbitMQ Cluster',
check_cmd='{}/check_rabbitmq_cluster.py'.format(
nagios_plugins))])
@patch.object(rabbitmq_server_relations, 'update_clients')
@patch.object(rabbitmq_server_relations.rabbit, 'wait_app')
def test_manage_restart(self, wait_app, update_clients):
self.coordinator.Serial().granted.return_value = True
rabbitmq_server_relations.manage_restart()
self.deferred_events.deferrable_svc_restart.assert_called_once_with(
'rabbitmq-server')
update_clients.assert_called_once_with()
wait_app.assert_called_once_with()
self.deferred_events.deferrable_svc_restart.reset_mock()
update_clients.reset_mock()
wait_app.reset_mock()
self.coordinator.Serial().granted.return_value = False
rabbitmq_server_relations.manage_restart()
self.assertFalse(self.deferred_events.called)
self.assertFalse(update_clients.called)
self.assertFalse(wait_app.called)
self.deferred_events.deferrable_svc_restart.reset_mock()
update_clients.reset_mock()
wait_app.reset_mock()
self.coordinator.Serial().granted.return_value = True
self.deferred_events.is_restart_permitted.return_value = False
rabbitmq_server_relations.manage_restart()
self.deferred_events.deferrable_svc_restart.assert_called_once_with(
'rabbitmq-server')
wait_app.assert_called_once_with()
self.assertFalse(update_clients.called)