From 12de0d964c2c24613df50a8abd212c49c34117e4 Mon Sep 17 00:00:00 2001 From: Liam Young Date: Fri, 7 Jan 2022 11:25:59 +0000 Subject: [PATCH] Coordinate cluster join events Use the coordination module when cluster join events are called. The `cluster_wait` method has been removed as it is no longer used and `cluster_with` has been broken up into three new methods ( `clustered_with_leader`, `update_peer_cluster_status` and `join_leader`) which can be called separately. The `modulo-nodes` and `known-wait` charm options have been removed as they are no longer needed. Closes-Bug: #1902793 Change-Id: I136f5dcc855da329071e119b67df25d9045e86cc --- hooks/rabbit_utils.py | 121 +++++------- hooks/rabbitmq_server_relations.py | 33 +++- unit_tests/test_rabbit_utils.py | 191 +++++++------------ unit_tests/test_rabbitmq_server_relations.py | 34 ++++ 4 files changed, 179 insertions(+), 200 deletions(-) diff --git a/hooks/rabbit_utils.py b/hooks/rabbit_utils.py index bd04b935..794f5591 100644 --- a/hooks/rabbit_utils.py +++ b/hooks/rabbit_utils.py @@ -58,10 +58,6 @@ from charmhelpers.contrib.openstack.utils import ( is_unit_paused_set, ) -from charmhelpers.contrib.hahelpers.cluster import ( - distributed_wait, -) - from charmhelpers.core.hookenv import ( relation_id, relation_ids, @@ -134,7 +130,8 @@ CRONJOB_CMD = ("{schedule} root timeout -k 10s -s SIGINT {timeout} " COORD_KEY_RESTART = "restart" COORD_KEY_PKG_UPGRADE = "pkg_upgrade" -COORD_KEYS = [COORD_KEY_RESTART, COORD_KEY_PKG_UPGRADE] +COORD_KEY_CLUSTER = "cluster" +COORD_KEYS = [COORD_KEY_RESTART, COORD_KEY_PKG_UPGRADE, COORD_KEY_CLUSTER] _named_passwd = '/var/lib/charm/{}/{}.passwd' _local_named_passwd = '/var/lib/charm/{}/{}.local_passwd' @@ -958,37 +955,6 @@ def wait_app(): raise ex -def cluster_wait(): - ''' Wait for operations based on modulo distribution - - Use the distributed_wait function to determine how long to wait before - running an operation like restart or cluster join. By setting modulo to - the exact number of nodes in the cluster we get serial operations. - - Check for explicit configuration parameters for modulo distribution. - The config setting modulo-nodes has first priority. If modulo-nodes is not - set, check min-cluster-size. Finally, if neither value is set, determine - how many peers there are from the cluster relation. - - @side_effect: distributed_wait is called which calls time.sleep() - @return: None - ''' - wait = config('known-wait') - if config('modulo-nodes') is not None: - # modulo-nodes has first priority - num_nodes = config('modulo-nodes') - elif config('min-cluster-size'): - # min-cluster-size is consulted next - num_nodes = config('min-cluster-size') - else: - # If nothing explicit is configured, determine cluster size based on - # peer relations - num_nodes = 1 - for rid in relation_ids('cluster'): - num_nodes += len(related_units(rid)) - distributed_wait(modulo=num_nodes, wait=wait) - - def start_app(): ''' Start the rabbitmq app and wait until it is fully started ''' status_set('maintenance', 'Starting rabbitmq application') @@ -1013,54 +979,61 @@ def join_cluster(node): log('Host clustered with %s.' % node, 'INFO') -def cluster_with(): +def clustered_with_leader(): + """Whether this unit is clustered with the leader + + :returns: Whether this unit is clustered with the leader + :rtype: bool + """ + node = leader_node() + if node: + return node in running_nodes() + status_set('waiting', 'Leader not available for clustering') + return False + + +def update_peer_cluster_status(): + """Inform peers that this unit is clustered if it is.""" + # check the leader and try to cluster with it + if clustered_with_leader(): + log('Host already clustered with %s.' % leader_node()) + + cluster_rid = relation_id('cluster', local_unit()) + is_clustered = relation_get(attribute='clustered', + rid=cluster_rid, + unit=local_unit()) + + log('am I clustered?: %s' % bool(is_clustered), level=DEBUG) + if not is_clustered: + # NOTE(freyes): this node needs to be marked as clustered, it's + # part of the cluster according to 'rabbitmqctl cluster_status' + # (LP: #1691510) + relation_set(relation_id=cluster_rid, + clustered=get_unit_hostname(), + timestamp=time.time()) + + +def join_leader(): + """Attempt to cluster with leader. + + Attempt to cluster with leader. + """ if is_unit_paused_set(): log("Do not run cluster_with while unit is paused", "WARNING") return - - log('Clustering with new node') - - # check the leader and try to cluster with it - node = leader_node() - if node: - if node in running_nodes(): - log('Host already clustered with %s.' % node) - - cluster_rid = relation_id('cluster', local_unit()) - is_clustered = relation_get(attribute='clustered', - rid=cluster_rid, - unit=local_unit()) - - log('am I clustered?: %s' % bool(is_clustered), level=DEBUG) - if not is_clustered: - # NOTE(freyes): this node needs to be marked as clustered, it's - # part of the cluster according to 'rabbitmqctl cluster_status' - # (LP: #1691510) - relation_set(relation_id=cluster_rid, - clustered=get_unit_hostname(), - timestamp=time.time()) - - return False - # NOTE: The primary problem rabbitmq has clustering is when - # more than one node attempts to cluster at the same time. - # The asynchronous nature of hook firing nearly guarantees - # this. Using cluster_wait based on modulo_distribution - cluster_wait() + if clustered_with_leader(): + log("Unit already clustered with leader", "DEBUG") + else: + log("Attempting to cluster with leader", "INFO") try: + node = leader_node() join_cluster(node) # NOTE: toggle the cluster relation to ensure that any peers # already clustered re-assess status correctly - relation_set(clustered=get_unit_hostname(), timestamp=time.time()) - return True + update_peer_cluster_status() except subprocess.CalledProcessError as e: status_set('blocked', 'Failed to cluster with %s. Exception: %s' % (node, e)) - start_app() - else: - status_set('waiting', 'Leader not available for clustering') - return False - - return False def check_cluster_memberships(): diff --git a/hooks/rabbitmq_server_relations.py b/hooks/rabbitmq_server_relations.py index f2e5044a..fd75a0ab 100755 --- a/hooks/rabbitmq_server_relations.py +++ b/hooks/rabbitmq_server_relations.py @@ -197,9 +197,24 @@ def coordinated_upgrade(): log("Package upgrade lock not granted") +def coordinated_cluster(): + """Join cluster if lock is granted.""" + serial = coordinator.Serial() + if serial.granted(rabbit.COORD_KEY_CLUSTER): + log("Cluster lock granted") + rabbit.join_leader() + log("update_clients called from coordinated_cluster", DEBUG) + update_clients() + if not is_leader() and is_relation_made('nrpe-external-master'): + update_nrpe_checks() + else: + log("Cluster lock not granted") + + def check_coordinated_functions(): """Run any functions that require coordination locks.""" coordinated_upgrade() + coordinated_cluster() manage_restart() @@ -578,9 +593,12 @@ def cluster_changed(relation_id=None, remote_unit=None): return if rabbit.is_sufficient_peers(): - # NOTE(freyes): all the nodes need to marked as 'clustered' - # (LP: #1691510) - rabbit.cluster_with() + rabbit.update_peer_cluster_status() + if not rabbit.clustered_with_leader(): + serial = coordinator.Serial() + log("Requesting cluster join lock") + serial.acquire(rabbit.COORD_KEY_CLUSTER) + coordinated_cluster() # Local rabbit maybe clustered now so check and inform clients if # needed. log("update_clients called from cluster_changed", DEBUG) @@ -778,9 +796,12 @@ def upgrade_charm(): new = os.path.join('var/lib/rabbitmq', 'nagios.passwd') shutil.move(old, new) - # NOTE(freyes): cluster_with() will take care of marking the node as - # 'clustered' for existing deployments (LP: #1691510). - rabbit.cluster_with() + rabbit.update_peer_cluster_status() + if not rabbit.clustered_with_leader(): + serial = coordinator.Serial() + log("Requesting cluster join lock") + serial.acquire(rabbit.COORD_KEY_CLUSTER) + coordinated_cluster() # Ensure all client connections are up to date on upgrade log("update_clients called from upgrade_charm", DEBUG) diff --git a/unit_tests/test_rabbit_utils.py b/unit_tests/test_rabbit_utils.py index aa26d173..31e8574e 100644 --- a/unit_tests/test_rabbit_utils.py +++ b/unit_tests/test_rabbit_utils.py @@ -460,108 +460,85 @@ class UtilsTests(CharmTestCase): self.assertEqual(rabbit_utils.leader_node(), 'rabbit@juju-devel3-machine-15') - @mock.patch('rabbit_utils.is_unit_paused_set') - @mock.patch('rabbit_utils.cluster_wait') - @mock.patch('rabbit_utils.relation_set') - @mock.patch('rabbit_utils.wait_app') - @mock.patch('rabbit_utils.subprocess.check_call') - @mock.patch('rabbit_utils.subprocess.check_output') - @mock.patch('rabbit_utils.time') - @mock.patch('rabbit_utils.running_nodes') - @mock.patch('rabbit_utils.leader_node') - @mock.patch('rabbit_utils.clustered') + @mock.patch.object(rabbit_utils, 'rabbitmqctl') @mock.patch('rabbit_utils.cmp_pkgrevno') - @mock.patch('rabbit_utils.rabbitmq_version_newer_or_equal') - def test_cluster_with_not_clustered( - self, mock_new_rabbitmq, mock_cmp_pkgrevno, mock_clustered, - mock_leader_node, mock_running_nodes, mock_time, mock_check_output, - mock_check_call, mock_wait_app, mock_relation_set, - mock_cluster_wait, mock_is_unit_paused_set): - mock_new_rabbitmq.return_value = True - mock_is_unit_paused_set.return_value = False - mock_cmp_pkgrevno.return_value = True - mock_clustered.return_value = False - mock_leader_node.return_value = 'rabbit@juju-devel7-machine-11' - mock_running_nodes.return_value = ['rabbit@juju-devel5-machine-19'] - rabbit_utils.cluster_with() - mock_cluster_wait.assert_called_once_with() - mock_check_output.assert_called_with([rabbit_utils.RABBITMQ_CTL, - 'join_cluster', - 'rabbit@juju-devel7-machine-11'], - stderr=-2) - - @mock.patch('rabbit_utils.is_unit_paused_set') - @mock.patch('rabbit_utils.relation_get') - @mock.patch('rabbit_utils.relation_id') - @mock.patch('rabbit_utils.peer_retrieve') - @mock.patch('rabbit_utils.subprocess.check_call') @mock.patch('rabbit_utils.subprocess.check_output') - @mock.patch('rabbit_utils.time') - @mock.patch('rabbit_utils.running_nodes') - @mock.patch('rabbit_utils.leader_node') - @mock.patch('rabbit_utils.clustered') - @mock.patch('rabbit_utils.cmp_pkgrevno') - def test_cluster_with_clustered(self, mock_cmp_pkgrevno, mock_clustered, - mock_leader_node, mock_running_nodes, - mock_time, mock_check_output, - mock_check_call, mock_peer_retrieve, - mock_relation_id, mock_relation_get, - mock_is_unit_paused_set): - mock_is_unit_paused_set.return_value = False - mock_clustered.return_value = True - mock_peer_retrieve.return_value = 'juju-devel7-machine-11' - mock_leader_node.return_value = 'rabbit@juju-devel7-machine-11' - mock_running_nodes.return_value = ['rabbit@juju-devel5-machine-19', - 'rabbit@juju-devel7-machine-11'] - mock_relation_id.return_value = 'cluster:1' - rabbit_utils.cluster_with() - self.assertEqual(0, mock_check_output.call_count) + def test_join_cluster(self, mock_check_output, mock_cmp_pkgrevno, + mock_rabbitmqctl): + mock_cmp_pkgrevno.return_value = 1 + rabbit_utils.join_cluster('node42') + mock_check_output.assert_called_once_with( + ['/usr/sbin/rabbitmqctl', 'join_cluster', 'node42'], stderr=-2) - @mock.patch('rabbit_utils.wait_app') - @mock.patch('rabbit_utils.subprocess.check_call') - @mock.patch('rabbit_utils.subprocess.check_output') - @mock.patch('rabbit_utils.time') - @mock.patch('rabbit_utils.running_nodes') @mock.patch('rabbit_utils.leader_node') - @mock.patch('rabbit_utils.clustered') - @mock.patch('rabbit_utils.cmp_pkgrevno') - def test_cluster_with_no_leader(self, mock_cmp_pkgrevno, mock_clustered, - mock_leader_node, mock_running_nodes, - mock_time, mock_check_output, - mock_check_call, mock_wait_app): - mock_clustered.return_value = False - mock_leader_node.return_value = None - mock_running_nodes.return_value = ['rabbit@juju-devel5-machine-19'] - rabbit_utils.cluster_with() - self.assertEqual(0, mock_check_output.call_count) + @mock.patch('rabbit_utils.running_nodes') + def test_clustered_with_leader(self, mock_running_nodes, + mock_leader_node): + mock_leader_node.return_value = 'node42' + mock_running_nodes.return_value = ['node12', 'node27'] + self.assertFalse(rabbit_utils.clustered_with_leader()) + mock_running_nodes.return_value = ['node12', 'node42', 'node27'] + self.assertTrue(rabbit_utils.clustered_with_leader()) - @mock.patch('rabbit_utils.is_unit_paused_set') - @mock.patch('time.time') - @mock.patch('rabbit_utils.relation_set') @mock.patch('rabbit_utils.get_unit_hostname') + @mock.patch('rabbit_utils.time.time') + @mock.patch.object(rabbit_utils, 'clustered_with_leader') @mock.patch('rabbit_utils.relation_get') @mock.patch('rabbit_utils.relation_id') - @mock.patch('rabbit_utils.running_nodes') - @mock.patch('rabbit_utils.peer_retrieve') - def test_cluster_with_single_node(self, mock_peer_retrieve, - mock_running_nodes, mock_relation_id, - mock_relation_get, - mock_get_unit_hostname, - mock_relation_set, mock_time, - mock_is_unit_paused_set): + @mock.patch('rabbit_utils.relation_set') + @mock.patch('rabbit_utils.leader_node') + def test_update_peer_cluster_status_clustered(self, mock_leader_node, + mock_relation_set, + mock_relation_id, + mock_relation_get, + mock_clustered_with_leader, + mock_time, + mock_get_unit_hostname): + mock_get_unit_hostname.return_value = 'host1' + mock_time.return_value = '12:30' + mock_leader_node.return_value = 'node42' + mock_clustered_with_leader.return_value = False + self.assertFalse(mock_relation_set.called) + + mock_clustered_with_leader.return_value = True + mock_relation_id.return_value = 'rid1' + mock_relation_get.return_value = 'True' + rabbit_utils.update_peer_cluster_status() + self.assertFalse(mock_relation_set.called) + + mock_clustered_with_leader.return_value = True + mock_relation_id.return_value = 'rid1' + mock_relation_get.return_value = None + rabbit_utils.update_peer_cluster_status() + mock_relation_set.assert_called_once_with( + relation_id='rid1', clustered='host1', timestamp='12:30') + + @mock.patch.object(rabbit_utils, 'clustered_with_leader') + @mock.patch.object(rabbit_utils, 'join_cluster') + @mock.patch.object(rabbit_utils, 'update_peer_cluster_status') + @mock.patch('rabbit_utils.leader_node') + @mock.patch('rabbit_utils.is_unit_paused_set') + def test_join_leader(self, mock_is_unit_paused_set, mock_leader_node, + mock_update_peer_cluster_status, + mock_join_cluster, + mock_clustered_with_leader): + mock_leader_node.return_value = 'node42' + mock_is_unit_paused_set.return_value = True + rabbit_utils.join_leader() + self.assertFalse(mock_join_cluster.called) + self.assertFalse(mock_update_peer_cluster_status.called) + mock_is_unit_paused_set.return_value = False - mock_peer_retrieve.return_value = 'localhost' - mock_running_nodes.return_value = ['rabbit@localhost'] - mock_relation_id.return_value = 'cluster:1' - mock_relation_get.return_value = False - mock_get_unit_hostname.return_value = 'localhost' - mock_time.return_value = 1234.1 + mock_clustered_with_leader.return_value = True + rabbit_utils.join_leader() + self.assertFalse(mock_join_cluster.called) + self.assertFalse(mock_update_peer_cluster_status.called) - self.assertFalse(rabbit_utils.cluster_with()) - - mock_relation_set.assert_called_with(relation_id='cluster:1', - clustered='localhost', - timestamp=1234.1) + mock_is_unit_paused_set.return_value = False + mock_clustered_with_leader.return_value = False + rabbit_utils.join_leader() + mock_join_cluster.assert_called_once_with('node42') + mock_update_peer_cluster_status.assert_called_once_with() @mock.patch('rabbit_utils.application_version_set') @mock.patch('rabbit_utils.get_upstream_version') @@ -725,6 +702,7 @@ class UtilsTests(CharmTestCase): get_deferred_restarts, get_deferred_hooks, is_cron_schedule_valid): self.coordinator.Serial().requested.side_effect = lambda x: { 'restart': True, + 'cluster': False, 'pkg_upgrade': False}[x] is_cron_schedule_valid.return_value = True get_deferred_hooks.return_value = [] @@ -744,6 +722,7 @@ class UtilsTests(CharmTestCase): status_set.reset_mock() self.coordinator.Serial().requested.side_effect = lambda x: { 'restart': False, + 'cluster': False, 'pkg_upgrade': True}[x] rabbit_utils.assess_status_func('test-config')() status_set.assert_called_once_with( @@ -1056,34 +1035,6 @@ class UtilsTests(CharmTestCase): self.test_config.set_previous("source", "same") self.assertFalse(rabbit_utils.archive_upgrade_available()) - @mock.patch.object(rabbit_utils, 'distributed_wait') - def test_cluster_wait(self, mock_distributed_wait): - self.relation_ids.return_value = ['amqp:27'] - self.related_units.return_value = ['unit/1', 'unit/2', 'unit/3'] - # Default check peer relation - _config = {'known-wait': 30} - self.config.side_effect = lambda key: _config.get(key) - rabbit_utils.cluster_wait() - mock_distributed_wait.assert_called_with(modulo=4, wait=30) - - # Use Min Cluster Size - _config = {'min-cluster-size': 5, 'known-wait': 30} - self.config.side_effect = lambda key: _config.get(key) - rabbit_utils.cluster_wait() - mock_distributed_wait.assert_called_with(modulo=5, wait=30) - - # Override with modulo-nodes - _config = {'min-cluster-size': 5, 'modulo-nodes': 10, 'known-wait': 60} - self.config.side_effect = lambda key: _config.get(key) - rabbit_utils.cluster_wait() - mock_distributed_wait.assert_called_with(modulo=10, wait=60) - - # Just modulo-nodes - _config = {'modulo-nodes': 10, 'known-wait': 60} - self.config.side_effect = lambda key: _config.get(key) - rabbit_utils.cluster_wait() - mock_distributed_wait.assert_called_with(modulo=10, wait=60) - @mock.patch.object(rabbit_utils, 'get_vhost_policy') @mock.patch.object(rabbit_utils, 'rabbitmqctl') def test_configure_notification_ttl(self, mock_rabbitmqctl, diff --git a/unit_tests/test_rabbitmq_server_relations.py b/unit_tests/test_rabbitmq_server_relations.py index 24988963..aa3f59e0 100644 --- a/unit_tests/test_rabbitmq_server_relations.py +++ b/unit_tests/test_rabbitmq_server_relations.py @@ -42,6 +42,8 @@ TO_PATCH = [ 'is_leader', 'relation_ids', 'related_units', + 'coordinator', + 'deferred_events', ] @@ -445,3 +447,35 @@ class RelationUtil(CharmTestCase): description='Remove check RabbitMQ Cluster', check_cmd='{}/check_rabbitmq_cluster.py'.format( nagios_plugins))]) + + @patch.object(rabbitmq_server_relations, 'update_clients') + @patch.object(rabbitmq_server_relations.rabbit, 'wait_app') + def test_manage_restart(self, wait_app, update_clients): + self.coordinator.Serial().granted.return_value = True + rabbitmq_server_relations.manage_restart() + self.deferred_events.deferrable_svc_restart.assert_called_once_with( + 'rabbitmq-server') + update_clients.assert_called_once_with() + wait_app.assert_called_once_with() + + self.deferred_events.deferrable_svc_restart.reset_mock() + update_clients.reset_mock() + wait_app.reset_mock() + + self.coordinator.Serial().granted.return_value = False + rabbitmq_server_relations.manage_restart() + self.assertFalse(self.deferred_events.called) + self.assertFalse(update_clients.called) + self.assertFalse(wait_app.called) + + self.deferred_events.deferrable_svc_restart.reset_mock() + update_clients.reset_mock() + wait_app.reset_mock() + + self.coordinator.Serial().granted.return_value = True + self.deferred_events.is_restart_permitted.return_value = False + rabbitmq_server_relations.manage_restart() + self.deferred_events.deferrable_svc_restart.assert_called_once_with( + 'rabbitmq-server') + wait_app.assert_called_once_with() + self.assertFalse(update_clients.called)