From 977d254cc69915819cf4226dc8cfc8c36969735b Mon Sep 17 00:00:00 2001 From: John Schwarz Date: Mon, 5 Dec 2016 14:15:17 +0200 Subject: [PATCH] Throttle SIGHUPs to keepalived Multiple SIGHUPs in quick succession might cause the master keepalived to forfeit its mastership (which will cause keepalived to remove IPs of its external devices, severing connectivity). This can happen when, for example, associating or disassociating multiple floatingips. The patch makes the agent throttle SIGHUP sent to keepalived: the very first SIGHUP is always sent; as for subsequent signals, they are delayed till agent threshold is reached. (It's 3 seconds by default.) As an example, when three consequent router updates trigger keepalived respawn then: * the very first signal is sent as usual; * the second signal is deferred and sent in up to 3 seconds since the first signal; * the third signal is ignored, though the change that triggered it will be correctly applied by the second signal handler when it is triggered after threshold delay. If the last time a spawn request occurred is older than current-time minus threshold then there is no delay. Co-Authored-By: Jakub Libosvar Co-Authored-By: Cedric Brandily Co-Authored-By: Ihar Hrachyshka Closes-Bug: 1647432 Change-Id: I2955e0de835458a2eea4dd088addf33b656f8670 --- neutron/agent/l3/ha_router.py | 9 +- neutron/agent/linux/keepalived.py | 11 ++- neutron/common/utils.py | 48 +++++++++++ neutron/tests/common/agents/l3_agent.py | 36 ++++++++ neutron/tests/fullstack/test_l3_agent.py | 105 +++++++++++++++++++---- neutron/tests/unit/common/test_utils.py | 50 +++++++++++ 6 files changed, 241 insertions(+), 18 deletions(-) diff --git a/neutron/agent/l3/ha_router.py b/neutron/agent/l3/ha_router.py index b9fba67e10a..fa7fdda172d 100644 --- a/neutron/agent/l3/ha_router.py +++ b/neutron/agent/l3/ha_router.py @@ -34,6 +34,11 @@ HA_DEV_PREFIX = 'ha-' IP_MONITOR_PROCESS_SERVICE = 'ip_monitor' SIGTERM_TIMEOUT = 10 +# The multiplier is used to compensate execution time of function sending +# SIGHUP to keepalived process. The constant multiplies ha_vrrp_advert_int +# config option and the result is the throttle delay. +THROTTLER_MULTIPLIER = 1.5 + class HaRouterNamespace(namespaces.RouterNamespace): """Namespace for HA router. @@ -124,7 +129,9 @@ class HaRouter(router.RouterInfo): keepalived.KeepalivedConf(), process_monitor, conf_path=self.agent_conf.ha_confs_path, - namespace=self.ha_namespace) + namespace=self.ha_namespace, + throttle_restart_value=( + self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER)) config = self.keepalived_manager.config diff --git a/neutron/agent/linux/keepalived.py b/neutron/agent/linux/keepalived.py index c5ba8c83f8c..e4ab6c7b884 100644 --- a/neutron/agent/linux/keepalived.py +++ b/neutron/agent/linux/keepalived.py @@ -27,6 +27,7 @@ from oslo_utils import fileutils from neutron._i18n import _, _LE from neutron.agent.linux import external_process from neutron.common import constants +from neutron.common import utils VALID_STATES = ['MASTER', 'BACKUP'] VALID_AUTH_TYPES = ['AH', 'PASS'] @@ -367,12 +368,20 @@ class KeepalivedManager(object): """ def __init__(self, resource_id, config, process_monitor, conf_path='/tmp', - namespace=None): + namespace=None, throttle_restart_value=None): self.resource_id = resource_id self.config = config self.namespace = namespace self.process_monitor = process_monitor self.conf_path = conf_path + # configure throttler for spawn to introduce delay between SIGHUPs, + # otherwise keepalived master may unnecessarily flip to slave + if throttle_restart_value is not None: + self._throttle_spawn(throttle_restart_value) + + #pylint: disable=method-hidden + def _throttle_spawn(self, threshold): + self.spawn = utils.throttler(threshold)(self.spawn) def get_conf_dir(self): confs_dir = os.path.abspath(os.path.normpath(self.conf_path)) diff --git a/neutron/common/utils.py b/neutron/common/utils.py index e84ae3b63a1..20ae9f7d6cb 100644 --- a/neutron/common/utils.py +++ b/neutron/common/utils.py @@ -25,6 +25,7 @@ import os.path import random import signal import sys +import threading import time import uuid import weakref @@ -54,6 +55,8 @@ TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" LOG = logging.getLogger(__name__) SYNCHRONIZED_PREFIX = 'neutron-' +DEFAULT_THROTTLER_VALUE = 2 + synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX) @@ -61,6 +64,51 @@ class WaitTimeout(Exception): """Default exception coming from wait_until_true() function.""" +class LockWithTimer(object): + def __init__(self, threshold): + self._threshold = threshold + self.timestamp = 0 + self._lock = threading.Lock() + + def acquire(self): + return self._lock.acquire(False) + + def release(self): + return self._lock.release() + + def time_to_wait(self): + return self.timestamp - time.time() + self._threshold + + +# REVISIT(jlibosva): Some parts of throttler may be similar to what +# neutron.notifiers.batch_notifier.BatchNotifier does. They +# could be refactored and unified. +def throttler(threshold=DEFAULT_THROTTLER_VALUE): + """Throttle number of calls to a function to only once per 'threshold'. + """ + def decorator(f): + lock_with_timer = LockWithTimer(threshold) + + @functools.wraps(f) + def wrapper(*args, **kwargs): + if lock_with_timer.acquire(): + try: + fname = f.__name__ + time_to_wait = lock_with_timer.time_to_wait() + if time_to_wait > 0: + LOG.debug("Call of function %s scheduled, sleeping " + "%.1f seconds", fname, time_to_wait) + # Decorated function has been called recently, wait. + eventlet.sleep(time_to_wait) + lock_with_timer.timestamp = time.time() + finally: + lock_with_timer.release() + LOG.debug("Calling throttled function %s", fname) + return f(*args, **kwargs) + return wrapper + return decorator + + @removals.remove( message="Use ensure_tree(path, 0o755) from oslo_utils.fileutils") def ensure_dir(dir_path): diff --git a/neutron/tests/common/agents/l3_agent.py b/neutron/tests/common/agents/l3_agent.py index 3557bdaaf09..fcc46890f4c 100755 --- a/neutron/tests/common/agents/l3_agent.py +++ b/neutron/tests/common/agents/l3_agent.py @@ -13,6 +13,7 @@ # under the License. import sys +import types import mock from oslo_config import cfg @@ -21,6 +22,7 @@ from neutron._i18n import _ from neutron.agent.l3 import agent from neutron.agent.l3 import namespaces from neutron.agent import l3_agent +from neutron.common import constants class L3NATAgentForTest(agent.L3NATAgentWithStateReport): @@ -59,6 +61,40 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport): super(L3NATAgentForTest, self).__init__(host, conf) + def _create_router(self, router_id, router): + """Create a router with suffix added to the router namespace name. + + This is needed to be able to run two agents serving the same router + on the same node. + """ + router = ( + super(L3NATAgentForTest, self)._create_router(router_id, router)) + + router.get_internal_device_name = types.MethodType( + get_internal_device_name, router) + router.get_external_device_name = types.MethodType( + get_external_device_name, router) + + return router + + +def _append_suffix(dev_name): + # If dev_name = 'xyz123' and the suffix is 'hostB' then the result + # will be 'xy_stB' + return '%s_%s' % (dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:]) + + +def get_internal_device_name(ri, port_id): + return _append_suffix( + (namespaces.INTERNAL_DEV_PREFIX + port_id) + [:constants.LINUX_DEV_LEN]) + + +def get_external_device_name(ri, port_id): + return _append_suffix( + (namespaces.EXTERNAL_DEV_PREFIX + port_id) + [:constants.LINUX_DEV_LEN]) + OPTS = [ cfg.StrOpt('test_namespace_suffix', default='testprefix', diff --git a/neutron/tests/fullstack/test_l3_agent.py b/neutron/tests/fullstack/test_l3_agent.py index 2e78bd5f88c..5c07dc5ef03 100644 --- a/neutron/tests/fullstack/test_l3_agent.py +++ b/neutron/tests/fullstack/test_l3_agent.py @@ -14,10 +14,13 @@ import functools import netaddr +import os +import time from neutron_lib import constants from oslo_utils import uuidutils +from neutron.agent.l3 import ha_router from neutron.agent.l3 import namespaces from neutron.agent.linux import ip_lib from neutron.common import utils as common_utils @@ -50,25 +53,34 @@ class TestL3Agent(base.BaseFullStackTestCase): return port['port']['status'] == 'ACTIVE' common_utils.wait_until_true(lambda: is_port_status_active(), sleep=1) + def _create_and_attach_subnet( + self, tenant_id, subnet_cidr, network_id, router_id): + # For IPv6 subnets, enable_dhcp should be set to true. + enable_dhcp = (netaddr.IPNetwork(subnet_cidr).version == + constants.IP_VERSION_6) + subnet = self.safe_client.create_subnet( + tenant_id, network_id, subnet_cidr, enable_dhcp=enable_dhcp) + + router_interface_info = self.safe_client.add_router_interface( + router_id, subnet['id']) + self.block_until_port_status_active( + router_interface_info['port_id']) + + def _boot_fake_vm_in_network(self, host, tenant_id, network_id, wait=True): + vm = self.useFixture( + machine.FakeFullstackMachine( + host, network_id, tenant_id, self.safe_client)) + if wait: + vm.block_until_boot() + return vm + def _create_net_subnet_and_vm(self, tenant_id, subnet_cidrs, host, router): network = self.safe_client.create_network(tenant_id) for cidr in subnet_cidrs: - # For IPv6 subnets, enable_dhcp should be set to true. - enable_dhcp = (netaddr.IPNetwork(cidr).version == - constants.IP_VERSION_6) - subnet = self.safe_client.create_subnet( - tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp) + self._create_and_attach_subnet( + tenant_id, cidr, network['id'], router['id']) - router_interface_info = self.safe_client.add_router_interface( - router['id'], subnet['id']) - self.block_until_port_status_active( - router_interface_info['port_id']) - - vm = self.useFixture( - machine.FakeFullstackMachine( - host, network['id'], tenant_id, self.safe_client)) - vm.block_until_boot() - return vm + return self._boot_fake_vm_in_network(host, tenant_id, network['id']) class TestLegacyL3Agent(TestL3Agent): @@ -175,7 +187,7 @@ class TestLegacyL3Agent(TestL3Agent): vm.block_until_ping(external_vm.ipv6) -class TestHAL3Agent(base.BaseFullStackTestCase): +class TestHAL3Agent(TestL3Agent): def setUp(self): host_descriptions = [ @@ -206,3 +218,64 @@ class TestHAL3Agent(base.BaseFullStackTestCase): self._is_ha_router_active_on_one_agent, router['id']), timeout=90) + + def _get_keepalived_state(self, keepalived_state_file): + with open(keepalived_state_file, "r") as fd: + return fd.read() + + def _get_state_file_for_master_agent(self, router_id): + for host in self.environment.hosts: + keepalived_state_file = os.path.join( + host.neutron_config.state_path, "ha_confs", router_id, "state") + + if self._get_keepalived_state(keepalived_state_file) == "master": + return keepalived_state_file + + def test_keepalived_multiple_sighups_does_not_forfeit_mastership(self): + """Setup a complete "Neutron stack" - both an internal and an external + network+subnet, and a router connected to both. + """ + tenant_id = uuidutils.generate_uuid() + ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id) + router = self.safe_client.create_router(tenant_id, ha=True, + external_network=ext_net['id']) + common_utils.wait_until_true( + lambda: + len(self.client.list_l3_agent_hosting_routers( + router['id'])['agents']) == 2, + timeout=90) + common_utils.wait_until_true( + functools.partial( + self._is_ha_router_active_on_one_agent, + router['id']), + timeout=90) + keepalived_state_file = self._get_state_file_for_master_agent( + router['id']) + self.assertIsNotNone(keepalived_state_file) + network = self.safe_client.create_network(tenant_id) + self._create_and_attach_subnet( + tenant_id, '13.37.0.0/24', network['id'], router['id']) + + # Create 10 fake VMs, each with a floating ip. Each floating ip + # association should send a SIGHUP to the keepalived's parent process, + # unless the Throttler works. + host = self.environment.hosts[0] + vms = [self._boot_fake_vm_in_network(host, tenant_id, network['id'], + wait=False) + for i in range(10)] + for vm in vms: + self.safe_client.create_floatingip( + tenant_id, ext_net['id'], vm.ip, vm.neutron_port['id']) + + # Check that the keepalived's state file has not changed and is still + # master. This will indicate that the Throttler works. We want to check + # for ha_vrrp_advert_int (the default is 2 seconds), plus a bit more. + time_to_stop = (time.time() + + (common_utils.DEFAULT_THROTTLER_VALUE * + ha_router.THROTTLER_MULTIPLIER * 1.3)) + while True: + if time.time() > time_to_stop: + break + self.assertEqual( + "master", + self._get_keepalived_state(keepalived_state_file)) diff --git a/neutron/tests/unit/common/test_utils.py b/neutron/tests/unit/common/test_utils.py index db9a9423a11..0e3c49aabe9 100644 --- a/neutron/tests/unit/common/test_utils.py +++ b/neutron/tests/unit/common/test_utils.py @@ -703,3 +703,53 @@ class ImportModulesRecursivelyTestCase(base.BaseTestCase): for module in expected_modules: self.assertIn(module, modules) self.assertIn(module, sys.modules) + + +class TestThrottler(base.BaseTestCase): + def test_throttler(self): + threshold = 1 + orig_function = mock.Mock() + # Add this magic name as it's required by functools + orig_function.__name__ = 'mock_func' + throttled_func = utils.throttler(threshold)(orig_function) + + throttled_func() + + sleep = utils.eventlet.sleep + + def sleep_mock(amount_to_sleep): + sleep(amount_to_sleep) + self.assertTrue(threshold > amount_to_sleep) + + with mock.patch.object(utils.eventlet, "sleep", + side_effect=sleep_mock): + throttled_func() + + self.assertEqual(2, orig_function.call_count) + + lock_with_timer = six.get_function_closure( + throttled_func)[1].cell_contents + timestamp = lock_with_timer.timestamp - threshold + lock_with_timer.timestamp = timestamp + + throttled_func() + + self.assertEqual(3, orig_function.call_count) + self.assertTrue(timestamp < lock_with_timer.timestamp) + + def test_method_docstring_is_preserved(self): + class Klass(object): + @utils.throttler() + def method(self): + """Docstring""" + + self.assertEqual("Docstring", Klass.method.__doc__) + + def test_method_still_callable(self): + class Klass(object): + @utils.throttler() + def method(self): + pass + + obj = Klass() + obj.method()