Forget cluster node as an action
There are race conditions in which the forget_cluster_node can get executed against a node that is in the process of joining the cluster. This change moves forget_cluster_node to an action that can be performed by the administrator. The asses_cluster_status has been updated to check for departed nodes and set status pointing toward the use of the new forget-cluster-node action. Closes-Bug: #1818260 Change-Id: I64bcdb9811a3816b394395fac19f5af5cc9f9006
This commit is contained in:
parent
27b43f4bd6
commit
5318764cc6
|
@ -1,6 +1,6 @@
|
||||||
bin
|
bin
|
||||||
revision
|
revision
|
||||||
.coverage
|
.coverage*
|
||||||
.venv
|
.venv
|
||||||
.tox
|
.tox
|
||||||
.testrepository
|
.testrepository
|
||||||
|
@ -10,3 +10,4 @@ __pycache__/
|
||||||
.project
|
.project
|
||||||
.pydevproject
|
.pydevproject
|
||||||
.stestr
|
.stestr
|
||||||
|
.unit-state.db
|
||||||
|
|
|
@ -35,3 +35,10 @@ list-unconsumed-queues:
|
||||||
$vhost:
|
$vhost:
|
||||||
"0": queue_name1 - 0
|
"0": queue_name1 - 0
|
||||||
"1": $queue_name - $num_messages
|
"1": $queue_name - $num_messages
|
||||||
|
forget-cluster-node:
|
||||||
|
description: |-
|
||||||
|
Remove a dead node from the cluster mnesia db.
|
||||||
|
params:
|
||||||
|
node:
|
||||||
|
type: string
|
||||||
|
description: Node name i.e. rabbit@<hostname>
|
||||||
|
|
|
@ -17,7 +17,7 @@ import json
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from collections import OrderedDict
|
from collections import OrderedDict
|
||||||
from subprocess import check_output, CalledProcessError
|
from subprocess import check_output, CalledProcessError, PIPE
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
@ -45,6 +45,10 @@ from charmhelpers.core.hookenv import (
|
||||||
ERROR,
|
ERROR,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from charmhelpers.core.host import (
|
||||||
|
cmp_pkgrevno,
|
||||||
|
)
|
||||||
|
|
||||||
from hooks.rabbit_utils import (
|
from hooks.rabbit_utils import (
|
||||||
ConfigRenderer,
|
ConfigRenderer,
|
||||||
CONFIG_FILES,
|
CONFIG_FILES,
|
||||||
|
@ -119,6 +123,30 @@ def complete_cluster_series_upgrade(args):
|
||||||
assess_status(ConfigRenderer(CONFIG_FILES))
|
assess_status(ConfigRenderer(CONFIG_FILES))
|
||||||
|
|
||||||
|
|
||||||
|
def forget_cluster_node(args):
|
||||||
|
"""Remove previously departed node from cluster."""
|
||||||
|
node = (action_get('node'))
|
||||||
|
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
|
||||||
|
action_fail(
|
||||||
|
'rabbitmq-server version < 3.0.0, '
|
||||||
|
'forget_cluster_node not supported.')
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
output = check_output(
|
||||||
|
['rabbitmqctl', 'forget_cluster_node', node],
|
||||||
|
stderr=PIPE)
|
||||||
|
action_set({'output': output.decode('utf-8'), 'outcome': 'Success'})
|
||||||
|
except CalledProcessError as e:
|
||||||
|
action_set({'output': e.stderr})
|
||||||
|
if e.returncode == 2:
|
||||||
|
action_fail(
|
||||||
|
"Unable to remove node '{}' from cluster. It is either still "
|
||||||
|
"running or already removed. (Output: '{}')"
|
||||||
|
.format(node, e.stderr))
|
||||||
|
else:
|
||||||
|
action_fail('Failed running rabbitmqctl forget_cluster_node')
|
||||||
|
|
||||||
|
|
||||||
def list_unconsumed_queues(args):
|
def list_unconsumed_queues(args):
|
||||||
"""List queues which are unconsumed in RabbitMQ"""
|
"""List queues which are unconsumed in RabbitMQ"""
|
||||||
log("Listing unconsumed queues...", level=INFO)
|
log("Listing unconsumed queues...", level=INFO)
|
||||||
|
@ -163,6 +191,7 @@ ACTIONS = {
|
||||||
"cluster-status": cluster_status,
|
"cluster-status": cluster_status,
|
||||||
"check-queues": check_queues,
|
"check-queues": check_queues,
|
||||||
"complete-cluster-series-upgrade": complete_cluster_series_upgrade,
|
"complete-cluster-series-upgrade": complete_cluster_series_upgrade,
|
||||||
|
"forget-cluster-node": forget_cluster_node,
|
||||||
"list-unconsumed-queues": list_unconsumed_queues,
|
"list-unconsumed-queues": list_unconsumed_queues,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
actions.py
|
|
@ -572,38 +572,23 @@ def cluster_with():
|
||||||
|
|
||||||
|
|
||||||
def check_cluster_memberships():
|
def check_cluster_memberships():
|
||||||
''' Iterate over RabbitMQ node list, compare it to charm cluster
|
"""Check for departed nodes.
|
||||||
relationships, and forget about any nodes previously abruptly removed
|
|
||||||
from the cluster '''
|
Iterate over RabbitMQ node list, compare it to charm cluster relationships,
|
||||||
|
and notify about any nodes previously abruptly removed from the cluster.
|
||||||
|
|
||||||
|
:returns: String node name or None
|
||||||
|
:rtype: Union[str, None]
|
||||||
|
"""
|
||||||
for rid in relation_ids('cluster'):
|
for rid in relation_ids('cluster'):
|
||||||
for node in nodes():
|
for node in nodes():
|
||||||
if not any(rel.get('clustered', None) == node.split('@')[1]
|
if not any(rel.get('clustered', None) == node.split('@')[1]
|
||||||
for rel in relations_for_id(relid=rid)) and \
|
for rel in relations_for_id(relid=rid)) and \
|
||||||
node not in running_nodes():
|
node not in running_nodes():
|
||||||
log("check_cluster_memberships(): '{}' in nodes but not in "
|
log("check_cluster_memberships(): '{}' in nodes but not in "
|
||||||
"charm relations or running_nodes, telling RabbitMQ to "
|
"charm relations or running_nodes."
|
||||||
"forget about it.".format(node), level=DEBUG)
|
.format(node), level=DEBUG)
|
||||||
forget_cluster_node(node)
|
return node
|
||||||
|
|
||||||
|
|
||||||
def forget_cluster_node(node):
|
|
||||||
''' Remove previously departed node from cluster '''
|
|
||||||
if cmp_pkgrevno('rabbitmq-server', '3.0.0') < 0:
|
|
||||||
log('rabbitmq-server version < 3.0.0, '
|
|
||||||
'forget_cluster_node not supported.', level=DEBUG)
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
rabbitmqctl('forget_cluster_node', node)
|
|
||||||
except subprocess.CalledProcessError as e:
|
|
||||||
if e.returncode == 2:
|
|
||||||
log("Unable to remove node '{}' from cluster. It is either still "
|
|
||||||
"running or already removed. (Output: '{}')"
|
|
||||||
"".format(node, e.output), level=ERROR)
|
|
||||||
return
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
log("Removed previously departed node from cluster: '{}'."
|
|
||||||
"".format(node), level=INFO)
|
|
||||||
|
|
||||||
|
|
||||||
def leave_cluster():
|
def leave_cluster():
|
||||||
|
@ -885,23 +870,36 @@ def assess_cluster_status(*args):
|
||||||
|
|
||||||
# NOTE: ensure rabbitmq is actually installed before doing
|
# NOTE: ensure rabbitmq is actually installed before doing
|
||||||
# any checks
|
# any checks
|
||||||
if rabbitmq_is_installed():
|
if not rabbitmq_is_installed():
|
||||||
# Clustering Check
|
return 'waiting', 'RabbitMQ is not yet installed'
|
||||||
|
|
||||||
|
# Sufficient peers
|
||||||
if not is_sufficient_peers():
|
if not is_sufficient_peers():
|
||||||
return 'waiting', ("Waiting for all {} peers to complete the "
|
return 'waiting', ("Waiting for all {} peers to complete the "
|
||||||
"cluster.".format(config('min-cluster-size')))
|
"cluster.".format(config('min-cluster-size')))
|
||||||
|
# Clustering Check
|
||||||
peer_ids = relation_ids('cluster')
|
peer_ids = relation_ids('cluster')
|
||||||
if peer_ids and len(related_units(peer_ids[0])):
|
if peer_ids and len(related_units(peer_ids[0])):
|
||||||
if not clustered():
|
if not clustered():
|
||||||
return 'waiting', 'Unit has peers, but RabbitMQ not clustered'
|
return 'waiting', 'Unit has peers, but RabbitMQ not clustered'
|
||||||
|
|
||||||
|
# Departed nodes
|
||||||
|
departed_node = check_cluster_memberships()
|
||||||
|
if departed_node:
|
||||||
|
return (
|
||||||
|
'blocked',
|
||||||
|
'Node {} in the cluster but not running. If it is a departed '
|
||||||
|
'node, remove with `forget-cluster-node` action'
|
||||||
|
.format(departed_node))
|
||||||
|
|
||||||
# General status check
|
# General status check
|
||||||
ret = wait_app()
|
if not wait_app():
|
||||||
if ret:
|
return (
|
||||||
|
'blocked', 'Unable to determine if the rabbitmq service is up')
|
||||||
|
|
||||||
# we're active - so just return the 'active' state, but if 'active'
|
# we're active - so just return the 'active' state, but if 'active'
|
||||||
# is returned, then it is ignored by the assess_status system.
|
# is returned, then it is ignored by the assess_status system.
|
||||||
return 'active', "message is ignored"
|
return 'active', "message is ignored"
|
||||||
else:
|
|
||||||
return 'waiting', 'RabbitMQ is not yet installed'
|
|
||||||
|
|
||||||
|
|
||||||
def restart_on_change(restart_map, stopstart=False):
|
def restart_on_change(restart_map, stopstart=False):
|
||||||
|
|
|
@ -950,24 +950,6 @@ def certs_changed(relation_id=None, unit=None):
|
||||||
def update_status():
|
def update_status():
|
||||||
log('Updating status.')
|
log('Updating status.')
|
||||||
|
|
||||||
# leader check for previously unsuccessful cluster departures
|
|
||||||
#
|
|
||||||
# This must be done here and not in the cluster-relation-departed hook. At
|
|
||||||
# the point in time the cluster-relation-departed hook is called we know
|
|
||||||
# that a unit is departing. We also know that RabbitMQ will not have
|
|
||||||
# noticed its departure yet. We cannot remove a node pre-emptively.
|
|
||||||
#
|
|
||||||
# In the normal case the departing node should remove itself from the
|
|
||||||
# cluster in its stop hook. We clean up the ones that for whatever reason
|
|
||||||
# are unable to clean up after themselves successfully here.
|
|
||||||
#
|
|
||||||
# Have a look at the docstring of the stop() function for detailed
|
|
||||||
# explanation.
|
|
||||||
kvstore = kv()
|
|
||||||
if (is_leader() and not is_unit_paused_set() and not
|
|
||||||
kvstore.get(INITIAL_CLIENT_UPDATE_KEY, False)):
|
|
||||||
rabbit.check_cluster_memberships()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
try:
|
try:
|
||||||
|
|
|
@ -16,7 +16,6 @@ import collections
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
import mock
|
import mock
|
||||||
import os
|
import os
|
||||||
import subprocess
|
|
||||||
import sys
|
import sys
|
||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
|
@ -41,9 +40,11 @@ TO_PATCH = [
|
||||||
'relation_ids',
|
'relation_ids',
|
||||||
'relation_get',
|
'relation_get',
|
||||||
'relation_set',
|
'relation_set',
|
||||||
|
'related_units',
|
||||||
'leader_get',
|
'leader_get',
|
||||||
'config',
|
'config',
|
||||||
'is_unit_paused_set',
|
'is_unit_paused_set',
|
||||||
|
'local_unit',
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -785,47 +786,13 @@ class UtilsTests(CharmTestCase):
|
||||||
mock_get_upstream_version.return_value = '3.5.7'
|
mock_get_upstream_version.return_value = '3.5.7'
|
||||||
self.assertEqual(rabbit_utils.get_managment_port(), 15672)
|
self.assertEqual(rabbit_utils.get_managment_port(), 15672)
|
||||||
|
|
||||||
@mock.patch('rabbit_utils.rabbitmqctl')
|
|
||||||
@mock.patch('rabbit_utils.cmp_pkgrevno')
|
|
||||||
def test_forget_cluster_node_old_rabbitmq(self, mock_cmp_pkgrevno,
|
|
||||||
mock_rabbitmqctl):
|
|
||||||
mock_cmp_pkgrevno.return_value = -1
|
|
||||||
rabbit_utils.forget_cluster_node('a')
|
|
||||||
self.assertFalse(mock_rabbitmqctl.called)
|
|
||||||
|
|
||||||
@mock.patch('rabbit_utils.log')
|
|
||||||
@mock.patch('subprocess.check_call')
|
|
||||||
@mock.patch('rabbit_utils.cmp_pkgrevno')
|
|
||||||
def test_forget_cluster_node_subprocess_fails(self, mock_cmp_pkgrevno,
|
|
||||||
mock_check_call,
|
|
||||||
mock_log):
|
|
||||||
mock_cmp_pkgrevno.return_value = 0
|
|
||||||
|
|
||||||
def raise_error(x):
|
|
||||||
raise subprocess.CalledProcessError(2, x)
|
|
||||||
mock_check_call.side_effect = raise_error
|
|
||||||
|
|
||||||
rabbit_utils.forget_cluster_node('a')
|
|
||||||
mock_log.assert_called_with("Unable to remove node 'a' from cluster. "
|
|
||||||
"It is either still running or already "
|
|
||||||
"removed. (Output: 'None')", level='ERROR')
|
|
||||||
|
|
||||||
@mock.patch('rabbit_utils.rabbitmqctl')
|
|
||||||
@mock.patch('rabbit_utils.cmp_pkgrevno')
|
|
||||||
def test_forget_cluster_node(self, mock_cmp_pkgrevno, mock_rabbitmqctl):
|
|
||||||
mock_cmp_pkgrevno.return_value = 1
|
|
||||||
rabbit_utils.forget_cluster_node('a')
|
|
||||||
mock_rabbitmqctl.assert_called_with('forget_cluster_node', 'a')
|
|
||||||
|
|
||||||
@mock.patch('rabbit_utils.caching_cmp_pkgrevno')
|
@mock.patch('rabbit_utils.caching_cmp_pkgrevno')
|
||||||
@mock.patch('rabbit_utils.forget_cluster_node')
|
|
||||||
@mock.patch('rabbit_utils.relations_for_id')
|
@mock.patch('rabbit_utils.relations_for_id')
|
||||||
@mock.patch('rabbit_utils.subprocess')
|
@mock.patch('rabbit_utils.subprocess')
|
||||||
@mock.patch('rabbit_utils.relation_ids')
|
@mock.patch('rabbit_utils.relation_ids')
|
||||||
def test_check_cluster_memberships(self, mock_relation_ids,
|
def test_check_cluster_memberships(self, mock_relation_ids,
|
||||||
mock_subprocess,
|
mock_subprocess,
|
||||||
mock_relations_for_id,
|
mock_relations_for_id,
|
||||||
mock_forget_cluster_node,
|
|
||||||
mock_cmp_pkgrevno):
|
mock_cmp_pkgrevno):
|
||||||
mock_relation_ids.return_value = [0]
|
mock_relation_ids.return_value = [0]
|
||||||
mock_subprocess.check_output.return_value = \
|
mock_subprocess.check_output.return_value = \
|
||||||
|
@ -837,9 +804,9 @@ class UtilsTests(CharmTestCase):
|
||||||
{'dummy-entry': 'to validate behaviour on relations without '
|
{'dummy-entry': 'to validate behaviour on relations without '
|
||||||
'clustered key in dict'},
|
'clustered key in dict'},
|
||||||
]
|
]
|
||||||
rabbit_utils.check_cluster_memberships()
|
self.assertEqual(
|
||||||
mock_forget_cluster_node.assert_called_with(
|
"rabbit@juju-devel3-machine-42",
|
||||||
'rabbit@juju-devel3-machine-42')
|
rabbit_utils.check_cluster_memberships())
|
||||||
|
|
||||||
@mock.patch('rabbitmq_context.psutil.NUM_CPUS', 2)
|
@mock.patch('rabbitmq_context.psutil.NUM_CPUS', 2)
|
||||||
@mock.patch('rabbitmq_context.relation_ids')
|
@mock.patch('rabbitmq_context.relation_ids')
|
||||||
|
@ -912,3 +879,72 @@ class UtilsTests(CharmTestCase):
|
||||||
'--apply-to', 'queues',
|
'--apply-to', 'queues',
|
||||||
'-p', 'test'
|
'-p', 'test'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@mock.patch.object(rabbit_utils, 'wait_app')
|
||||||
|
@mock.patch.object(rabbit_utils, 'check_cluster_memberships')
|
||||||
|
@mock.patch.object(rabbit_utils, 'clustered')
|
||||||
|
@mock.patch.object(rabbit_utils, 'is_sufficient_peers')
|
||||||
|
@mock.patch.object(rabbit_utils, 'is_unit_paused_set')
|
||||||
|
@mock.patch.object(rabbit_utils, 'rabbitmq_is_installed')
|
||||||
|
def test_assess_cluster_status(
|
||||||
|
self, rabbitmq_is_installed, is_unit_paused_set,
|
||||||
|
is_sufficient_peers, clustered, check_cluster_memberships,
|
||||||
|
wait_app):
|
||||||
|
|
||||||
|
self.relation_ids.return_value = ["cluster:1"]
|
||||||
|
self.related_units.return_value = ["rabbitmq-server/1"]
|
||||||
|
_min = 3
|
||||||
|
self.config.return_value = _min
|
||||||
|
|
||||||
|
# Paused
|
||||||
|
is_unit_paused_set.return_value = True
|
||||||
|
_expected = ("maintenance", "Paused")
|
||||||
|
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
|
||||||
|
|
||||||
|
# Not installed
|
||||||
|
is_unit_paused_set.return_value = False
|
||||||
|
rabbitmq_is_installed.return_value = False
|
||||||
|
_expected = ("waiting", "RabbitMQ is not yet installed")
|
||||||
|
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
|
||||||
|
|
||||||
|
# Not sufficient peers
|
||||||
|
rabbitmq_is_installed.return_value = True
|
||||||
|
is_sufficient_peers.return_value = False
|
||||||
|
_expected = (
|
||||||
|
"waiting",
|
||||||
|
"Waiting for all {} peers to complete the cluster.".format(_min))
|
||||||
|
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
|
||||||
|
|
||||||
|
# Nodes not clustered
|
||||||
|
is_sufficient_peers.return_value = True
|
||||||
|
clustered.return_value = False
|
||||||
|
_expected = (
|
||||||
|
"waiting",
|
||||||
|
"Unit has peers, but RabbitMQ not clustered")
|
||||||
|
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
|
||||||
|
|
||||||
|
# Departed node
|
||||||
|
clustered.return_value = True
|
||||||
|
_departed_node = "rabbit@hostname"
|
||||||
|
check_cluster_memberships.return_value = _departed_node
|
||||||
|
_expected = (
|
||||||
|
"blocked",
|
||||||
|
"Node {} in the cluster but not running. If it is a departed "
|
||||||
|
"node, remove with `forget-cluster-node` action"
|
||||||
|
.format(_departed_node))
|
||||||
|
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
|
||||||
|
|
||||||
|
# Wait app does not return True
|
||||||
|
check_cluster_memberships.return_value = None
|
||||||
|
wait_app.return_value = None
|
||||||
|
_expected = (
|
||||||
|
"blocked",
|
||||||
|
"Unable to determine if the rabbitmq service is up")
|
||||||
|
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
|
||||||
|
|
||||||
|
# All OK
|
||||||
|
wait_app.return_value = True
|
||||||
|
_expected = (
|
||||||
|
"active",
|
||||||
|
"message is ignored")
|
||||||
|
self.assertEqual(_expected, rabbit_utils.assess_cluster_status())
|
||||||
|
|
Loading…
Reference in New Issue