diff --git a/actions/actions.py b/actions/actions.py index da83dad1..2af985b8 100755 --- a/actions/actions.py +++ b/actions/actions.py @@ -36,6 +36,7 @@ _add_path(_hooks) import charmhelpers.contrib.openstack.deferred_events as deferred_events import charmhelpers.contrib.openstack.utils as os_utils +from charmhelpers import coordinator from charmhelpers.core.host import ( service_start, @@ -51,6 +52,8 @@ from charmhelpers.core.hookenv import ( log, INFO, ERROR, + _run_atstart, + _run_atexit, ) from charmhelpers.core.host import ( @@ -314,6 +317,10 @@ ACTIONS = { def main(args): + _ = coordinator.Serial() + # Manually trigger any registered atstart events as these are not + # triggered by actions + _run_atstart() action_name = os.path.basename(args[0]) try: action = ACTIONS[action_name] @@ -326,6 +333,7 @@ def main(args): action(args) except Exception as e: action_fail("Action {} failed: {}".format(action_name, str(e))) + _run_atexit() if __name__ == "__main__": diff --git a/hooks/rabbit_utils.py b/hooks/rabbit_utils.py index 63ffdcd7..e2c5be5d 100644 --- a/hooks/rabbit_utils.py +++ b/hooks/rabbit_utils.py @@ -24,6 +24,7 @@ import time import shutil import socket import yaml +import functools from collections import OrderedDict, defaultdict @@ -55,7 +56,6 @@ from charmhelpers.contrib.openstack.utils import ( pause_unit, resume_unit, is_unit_paused_set, - pausable_restart_on_change, ) from charmhelpers.contrib.hahelpers.cluster import ( @@ -80,6 +80,7 @@ from charmhelpers.core.hookenv import ( is_leader, leader_get, local_unit, + action_name, charm_dir ) @@ -91,6 +92,7 @@ from charmhelpers.core.host import ( rsync, lsb_release, CompareHostReleases, + path_hash, ) from charmhelpers.contrib.peerstorage import ( @@ -107,6 +109,8 @@ from charmhelpers.fetch import ( CLUSTER_MODE_KEY = 'cluster-partition-handling' CLUSTER_MODE_FOR_INSTALL = 'ignore' +import charmhelpers.coordinator as coordinator + PACKAGES = ['rabbitmq-server', 'python3-amqplib', 'lockfile-progs', 'python3-croniter'] @@ -128,6 +132,9 @@ STATS_CRONFILE = '/etc/cron.d/rabbitmq-stats' CRONJOB_CMD = ("{schedule} root timeout -k 10s -s SIGINT {timeout} " "{command} 2>&1 | logger -p local0.notice\n") +COORD_KEY_RESTART = "restart" +COORD_KEYS = [COORD_KEY_RESTART] + _named_passwd = '/var/lib/charm/{}/{}.passwd' _local_named_passwd = '/var/lib/charm/{}/{}.local_passwd' @@ -1462,29 +1469,56 @@ def assess_cluster_status(*args): return 'active', "message is ignored" -def restart_on_change(restart_map, stopstart=False): - """Restart services based on configuration files changing +def in_run_deferred_hooks_action(): + """Check if current execution context is the run-deferred-hooks action - This function is used a decorator, for example:: - - @restart_on_change({ - '/etc/apache/sites-enabled/*': [ 'apache2' ] - }) - def config_changed(): - pass # your code here - - In this example the apache2 service would be - restarted if any file matching the pattern got changed, created - or removed. Standard wildcards are supported, see documentation - for the 'glob' module for more information. + :returns: Whether currently in run-deferred-hooks hook + :rtype: bool """ - return pausable_restart_on_change( - restart_map, - stopstart=stopstart, - pre_restarts_wait_f=cluster_wait, - can_restart_now_f=deferred_events.check_and_record_restart_request, - post_svc_restart_f=deferred_events.process_svc_restart - ) + return action_name() == 'run-deferred-hooks' + + +def coordinated_restart_on_change(restart_map, restart_function): + """Decorator to check for file changes. + + Check for file changes after decorated function runs. If a change + is detected then a restart request is made using the coordinator + module. + + :param restart_map: {file: [service, ...]} + :type restart_map: Dict[str, List[str,]] + :param restart_functions: Function to be used to perform restart if + lock is immediatly granted. + :type restart_functions: Callable + """ + def wrap(f): + @functools.wraps(f) + def wrapped_f(*args, **kwargs): + if is_unit_paused_set(): + return f(*args, **kwargs) + pre = {path: path_hash(path) for path in restart_map} + f(*args, **kwargs) + post = {path: path_hash(path) for path in restart_map} + if pre == post: + log("No restart needed") + else: + changed = [ + path + for path in restart_map.keys() + if pre[path] != post[path]] + log("Requesting restart. File(s) {} have changed".format( + ','.join(changed))) + if in_run_deferred_hooks_action(): + log("In action context, requesting immediate restart") + restart_function(coordinate_restart=False) + else: + serial = coordinator.Serial() + serial.acquire(COORD_KEY_RESTART) + # Run the restart function which will check if lock is + # is immediatly available. + restart_function(coordinate_restart=True) + return wrapped_f + return wrap def assess_status(configs): @@ -1562,6 +1596,13 @@ def assess_status_func(configs): ', '.join(sorted(deferred_hooks))) message = "{}. {}".format(message, svc_msg) + serial = coordinator.Serial() + requested_locks = [k for k in COORD_KEYS if serial.requested(k)] + if requested_locks: + message = "{}. {}".format( + message, + 'Waiting for {} lock(s)'.format(','.join(requested_locks))) + state = "waiting" status_set(state, message) return _assess_status_func diff --git a/hooks/rabbitmq_server_relations.py b/hooks/rabbitmq_server_relations.py index 8cc544cb..92560c3d 100755 --- a/hooks/rabbitmq_server_relations.py +++ b/hooks/rabbitmq_server_relations.py @@ -118,6 +118,10 @@ import charmhelpers.contrib.openstack.cert_utils as ch_cert_utils import charmhelpers.contrib.network.ip as ch_ip +import charmhelpers.coordinator as coordinator + +import charmhelpers.contrib.openstack.deferred_events as deferred_events + hooks = Hooks() SERVICE_NAME = os.getenv('JUJU_UNIT_NAME').split('/')[0] @@ -141,6 +145,52 @@ def install(): rabbit.install_or_upgrade_packages() +def manage_restart(coordinate_restart=True): + """Restart service if lock is granted and update clients. + + Restart services and update clients. Clients are updated in case the + restart has altered how clients need to connect to rabbit (port change + etc). + + :param coordinate_restart: Whether to coordinate restarts with other + nodes in cluster. + :type coordinate_restart: bool + """ + run_restart = False + if coordinate_restart: + serial = coordinator.Serial() + if serial.granted(rabbit.COORD_KEY_RESTART): + run_restart = True + log("Restart lock granted") + else: + run_restart = False + log("Restart lock not granted") + else: + log("Forcing restart without coordination") + run_restart = True + if run_restart: + msg = 'Restarting {}'.format(','.join(rabbit.services())) + log(msg) + status_set('maintenance', msg) + for svc in rabbit.services(): + deferred_events.deferrable_svc_restart(svc) + if 'rabbitmq-server' in rabbit.services(): + rabbit.wait_app() + if deferred_events.is_restart_permitted(): + # Restart may have picked up a change in how clients need to + # connect (TLS v Plain for example), so need to ensure + # clients have the latest connection information. + log("update_clients called from manage_restart", DEBUG) + update_clients() + else: + log("Restart not run") + + +def check_coordinated_functions(): + """Run any functions that require coordination locks.""" + manage_restart() + + def validate_amqp_config_tracker(f): """Decorator to mark all existing tracked amqp configs as stale so that they are refreshed the next time the current unit leader. @@ -162,7 +212,7 @@ def validate_amqp_config_tracker(f): def configure_amqp(username, vhost, relation_id, admin=False, ttlname=None, ttlreg=None, ttl=None): - """Configure rabbitmq server. + """Configure rabbitmq server for a given client access request. This function creates user/password, vhost and sets user permissions. It also enabales mirroring queues if requested. @@ -484,7 +534,7 @@ def cluster_joined(relation_id=None): @hooks.hook('cluster-relation-changed') -@rabbit.restart_on_change(rabbit.restart_map()) +@rabbit.coordinated_restart_on_change(rabbit.restart_map(), manage_restart) def cluster_changed(relation_id=None, remote_unit=None): # Future travelers beware ordering is significant rdata = relation_get(rid=relation_id, unit=remote_unit) @@ -521,6 +571,7 @@ def cluster_changed(relation_id=None, remote_unit=None): rabbit.cluster_with() # Local rabbit maybe clustered now so check and inform clients if # needed. + log("update_clients called from cluster_changed", DEBUG) update_clients() if is_leader(): if (leader_get(rabbit.CLUSTER_MODE_KEY) != @@ -536,6 +587,7 @@ def cluster_changed(relation_id=None, remote_unit=None): if not is_leader() and is_relation_made('nrpe-external-master'): update_nrpe_checks() + check_coordinated_functions() @hooks.hook('stop') @@ -589,8 +641,9 @@ def update_cookie(leaders_cookie=None): @hooks.hook('ha-relation-joined') -@rabbit.restart_on_change({rabbit.ENV_CONF: - rabbit.restart_map()[rabbit.ENV_CONF]}) +@rabbit.coordinated_restart_on_change({rabbit.ENV_CONF: + rabbit.restart_map()[rabbit.ENV_CONF]}, + manage_restart) def ha_joined(): corosync_bindiface = config('ha-bindiface') corosync_mcastport = config('ha-mcastport') @@ -718,6 +771,7 @@ def upgrade_charm(): rabbit.cluster_with() # Ensure all client connections are up to date on upgrade + log("update_clients called from upgrade_charm", DEBUG) update_clients() # BUG:#1804348 @@ -739,7 +793,7 @@ RMQ_MON_PORT = 15692 @hooks.hook('config-changed') -@rabbit.restart_on_change(rabbit.restart_map()) +@rabbit.coordinated_restart_on_change(rabbit.restart_map(), manage_restart) @harden() def config_changed(check_deferred_restarts=True): """Run config-chaged hook. @@ -855,7 +909,7 @@ def leader_elected(): @hooks.hook('leader-settings-changed') -@rabbit.restart_on_change(rabbit.restart_map()) +@rabbit.coordinated_restart_on_change(rabbit.restart_map(), manage_restart) def leader_settings_changed(): if is_unit_paused_set(): @@ -876,9 +930,11 @@ def leader_settings_changed(): # using LE and peerstorage for rid in relation_ids('cluster'): relation_set(relation_id=rid, relation_settings={'cookie': cookie}) + log("update_clients called from leader_settings_changed", DEBUG) update_clients() rabbit.ConfigRenderer( rabbit.CONFIG_FILES()).write_all() + check_coordinated_functions() def pre_install_hooks(): @@ -930,15 +986,13 @@ def certs_joined(relation_id=None): @hooks.hook('certificates-relation-changed') +@rabbit.coordinated_restart_on_change(rabbit.restart_map(), + manage_restart) def certs_changed(relation_id=None, unit=None): - # Ensure Rabbit has restart before telling the clients as rabbit may - # take time to restart. - @rabbit.restart_on_change(rabbit.restart_map()) - def render_and_restart(): - rabbit.ConfigRenderer( - rabbit.CONFIG_FILES()).write_all() - render_and_restart() - update_clients() + # manage_restart will handle the client update if + # certificates have changed. + rabbit.ConfigRenderer( + rabbit.CONFIG_FILES()).write_all() @hooks.hook('update-status') @@ -949,6 +1003,7 @@ def update_status(): if __name__ == '__main__': try: + _ = coordinator.Serial() hooks.execute(sys.argv) except UnregisteredHookError as e: log('Unknown hook {} - skipping.'.format(e)) diff --git a/unit_tests/test_actions.py b/unit_tests/test_actions.py index a98543ca..330e2d89 100644 --- a/unit_tests/test_actions.py +++ b/unit_tests/test_actions.py @@ -31,11 +31,18 @@ with mock.patch('charmhelpers.core.hookenv.cached') as cached: import actions -class PauseTestCase(CharmTestCase): +class RabbitActionTestCase(CharmTestCase): + + def setUp(self, patches): + rmq_patches = ["_run_atstart", "_run_atexit"] + rmq_patches.extend(patches) + super().setUp(actions, rmq_patches) + + +class PauseTestCase(RabbitActionTestCase): def setUp(self): - super(PauseTestCase, self).setUp( - actions, ["pause_unit_helper", "ConfigRenderer", "CONFIG_FILES"]) + super().setUp(["pause_unit_helper", "ConfigRenderer", "CONFIG_FILES"]) self.ConfigRenderer.return_value = 'test-config' def test_pauses_services(self): @@ -267,10 +274,11 @@ class ForceBootTestCase(CharmTestCase): self.action_set.assert_called_once_with({'output': output}) -class MainTestCase(CharmTestCase): +class MainTestCase(RabbitActionTestCase): def setUp(self): - super(MainTestCase, self).setUp(actions, ["action_fail"]) + super().setUp(["action_fail", "ConfigRenderer"]) + self.ConfigRenderer.return_value = 'test-config' def test_invokes_action(self): dummy_calls = [] diff --git a/unit_tests/test_rabbit_utils.py b/unit_tests/test_rabbit_utils.py index ea4c2043..8e8d93e7 100644 --- a/unit_tests/test_rabbit_utils.py +++ b/unit_tests/test_rabbit_utils.py @@ -19,6 +19,7 @@ from unittest import mock import os import sys import tempfile +import uuid from datetime import timedelta from unit_tests.test_utils import CharmTestCase @@ -48,6 +49,8 @@ TO_PATCH = [ 'is_unit_paused_set', 'local_unit', 'lsb_release', + 'coordinator', + 'path_hash', ] @@ -202,6 +205,7 @@ class UtilsTests(CharmTestCase): self.nrpe_compat.remove_check = mock.MagicMock() self.lsb_release.return_value = { 'DISTRIB_CODENAME': 'focal'} + self.coordinator.Serial().requested.return_value = False def tearDown(self): super(UtilsTests, self).tearDown() @@ -707,6 +711,35 @@ class UtilsTests(CharmTestCase): 'complete-cluster-series-upgrade when the cluster has completed ' 'its upgrade') + @mock.patch.object(rabbit_utils, 'is_cron_schedule_valid') + @mock.patch.object(rabbit_utils.deferred_events, 'get_deferred_hooks') + @mock.patch.object(rabbit_utils.deferred_events, 'get_deferred_restarts') + @mock.patch.object(rabbit_utils, 'clustered') + @mock.patch.object(rabbit_utils, 'status_set') + @mock.patch.object(rabbit_utils, 'assess_cluster_status') + @mock.patch.object(rabbit_utils, 'services') + @mock.patch.object(rabbit_utils, '_determine_os_workload_status') + def test_assess_status_func_coord_restart( + self, _determine_os_workload_status, services, + assess_cluster_status, status_set, clustered, + get_deferred_restarts, get_deferred_hooks, is_cron_schedule_valid): + self.coordinator.Serial().requested.side_effect = lambda x: { + 'restart': True}[x] + is_cron_schedule_valid.return_value = True + get_deferred_hooks.return_value = [] + get_deferred_restarts.return_value = [] + self.leader_get.return_value = False + services.return_value = 's1' + _determine_os_workload_status.return_value = ('active', '') + clustered.return_value = True + rabbit_utils.assess_status_func('test-config')() + _determine_os_workload_status.assert_called_once_with( + 'test-config', {}, charm_func=assess_cluster_status, services='s1', + ports=None) + status_set.assert_called_once_with( + 'waiting', + 'Unit is ready and clustered. Waiting for restart lock(s)') + def test_pause_unit_helper(self): with mock.patch.object(rabbit_utils, '_pause_resume_helper') as prh: rabbit_utils.pause_unit_helper('random-config') @@ -1808,3 +1841,27 @@ class UtilsTests(CharmTestCase): '-p', 'nagios-rabbitmq-server-0', 'HA') + + def test_coordinated_restart_on_change(self): + self.is_unit_paused_set.return_value = False + test_map = {'/etc/a-file.conf': ['svc1']} + self.path_hash.side_effect = lambda x: str(uuid.uuid4()) + self.restarter_calls = 0 + + def _restarter(coordinate_restart=True): + self.restarter_calls = self.restarter_calls + 1 + return + + @rabbit_utils.coordinated_restart_on_change(test_map, _restarter) + def test_function(): + return + + # Check restarter called when file hashes have changed + test_function() + self.assertEqual(self.restarter_calls, 1) + + # Check restarter not called when file hashes have not changed + self.path_hash.side_effect = lambda x: 'hash' + self.restarter_calls = 0 + test_function() + self.assertEqual(self.restarter_calls, 0)