From 23ded198e6f9c433b5edc350fb23abd33ccbd127 Mon Sep 17 00:00:00 2001 From: John Schwarz Date: Mon, 5 Dec 2016 14:15:17 +0200 Subject: [PATCH] Throttle SIGHUPs to keepalived Multiple SIGHUPs in quick succession might cause the master keepalived to forfeit its mastership (which will cause keepalived to remove IPs of its external devices, severing connectivity). This can happen when, for example, associating or disassociating multiple floatingips. The patch makes the agent throttle SIGHUP sent to keepalived: the very first SIGHUP is always sent; as for subsequent signals, they are delayed till agent threshold is reached. (It's 3 seconds by default.) As an example, when three consequent router updates trigger keepalived respawn then: * the very first signal is sent as usual; * the second signal is deferred and sent in up to 3 seconds since the first signal; * the third signal is ignored, though the change that triggered it will be correctly applied by the second signal handler when it is triggered after threshold delay. If the last time a spawn request occurred is older than current-time minus threshold then there is no delay. Co-Authored-By: Jakub Libosvar Co-Authored-By: Cedric Brandily Co-Authored-By: Ihar Hrachyshka Conflicts: neutron/agent/linux/keepalived.py neutron/common/utils.py neutron/tests/fullstack/test_l3_agent.py Closes-Bug: 1647432 Change-Id: I2955e0de835458a2eea4dd088addf33b656f8670 (cherry picked from commit 977d254cc69915819cf4226dc8cfc8c36969735b) --- neutron/agent/l3/ha_router.py | 9 +- neutron/agent/linux/keepalived.py | 10 ++- neutron/common/utils.py | 48 +++++++++++ neutron/tests/common/agents/l3_agent.py | 36 ++++++++ neutron/tests/fullstack/test_l3_agent.py | 105 +++++++++++++++++++---- neutron/tests/unit/common/test_utils.py | 50 +++++++++++ 6 files changed, 240 insertions(+), 18 deletions(-) diff --git a/neutron/agent/l3/ha_router.py b/neutron/agent/l3/ha_router.py index b87929faa57..e31bb027eaf 100644 --- a/neutron/agent/l3/ha_router.py +++ b/neutron/agent/l3/ha_router.py @@ -35,6 +35,11 @@ HA_DEV_PREFIX = 'ha-' IP_MONITOR_PROCESS_SERVICE = 'ip_monitor' SIGTERM_TIMEOUT = 10 +# The multiplier is used to compensate execution time of function sending +# SIGHUP to keepalived process. The constant multiplies ha_vrrp_advert_int +# config option and the result is the throttle delay. +THROTTLER_MULTIPLIER = 1.5 + class HaRouterNamespace(namespaces.RouterNamespace): """Namespace for HA router. @@ -116,7 +121,9 @@ class HaRouter(router.RouterInfo): keepalived.KeepalivedConf(), process_monitor, conf_path=self.agent_conf.ha_confs_path, - namespace=self.ha_namespace) + namespace=self.ha_namespace, + throttle_restart_value=( + self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER)) config = self.keepalived_manager.config diff --git a/neutron/agent/linux/keepalived.py b/neutron/agent/linux/keepalived.py index 35b4a16f16b..222d164253f 100644 --- a/neutron/agent/linux/keepalived.py +++ b/neutron/agent/linux/keepalived.py @@ -347,12 +347,20 @@ class KeepalivedManager(object): """ def __init__(self, resource_id, config, process_monitor, conf_path='/tmp', - namespace=None): + namespace=None, throttle_restart_value=None): self.resource_id = resource_id self.config = config self.namespace = namespace self.process_monitor = process_monitor self.conf_path = conf_path + # configure throttler for spawn to introduce delay between SIGHUPs, + # otherwise keepalived master may unnecessarily flip to slave + if throttle_restart_value is not None: + self._throttle_spawn(throttle_restart_value) + + #pylint: disable=method-hidden + def _throttle_spawn(self, threshold): + self.spawn = common_utils.throttler(threshold)(self.spawn) def get_conf_dir(self): confs_dir = os.path.abspath(os.path.normpath(self.conf_path)) diff --git a/neutron/common/utils.py b/neutron/common/utils.py index 2ce52ca6974..09282f650ad 100644 --- a/neutron/common/utils.py +++ b/neutron/common/utils.py @@ -31,6 +31,7 @@ import signal import socket import sys import tempfile +import threading import time import uuid @@ -55,9 +56,56 @@ TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" LOG = logging.getLogger(__name__) SYNCHRONIZED_PREFIX = 'neutron-' +DEFAULT_THROTTLER_VALUE = 2 + synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX) +class LockWithTimer(object): + def __init__(self, threshold): + self._threshold = threshold + self.timestamp = 0 + self._lock = threading.Lock() + + def acquire(self): + return self._lock.acquire(False) + + def release(self): + return self._lock.release() + + def time_to_wait(self): + return self.timestamp - time.time() + self._threshold + + +# REVISIT(jlibosva): Some parts of throttler may be similar to what +# neutron.notifiers.batch_notifier.BatchNotifier does. They +# could be refactored and unified. +def throttler(threshold=DEFAULT_THROTTLER_VALUE): + """Throttle number of calls to a function to only once per 'threshold'. + """ + def decorator(f): + lock_with_timer = LockWithTimer(threshold) + + @functools.wraps(f) + def wrapper(*args, **kwargs): + if lock_with_timer.acquire(): + try: + fname = f.__name__ + time_to_wait = lock_with_timer.time_to_wait() + if time_to_wait > 0: + LOG.debug("Call of function %s scheduled, sleeping " + "%.1f seconds", fname, time_to_wait) + # Decorated function has been called recently, wait. + eventlet.sleep(time_to_wait) + lock_with_timer.timestamp = time.time() + finally: + lock_with_timer.release() + LOG.debug("Calling throttled function %s", fname) + return f(*args, **kwargs) + return wrapper + return decorator + + def ensure_dir(dir_path): """Ensure a directory with 755 permissions mode.""" try: diff --git a/neutron/tests/common/agents/l3_agent.py b/neutron/tests/common/agents/l3_agent.py index 3557bdaaf09..7afedf669fa 100755 --- a/neutron/tests/common/agents/l3_agent.py +++ b/neutron/tests/common/agents/l3_agent.py @@ -13,6 +13,7 @@ # under the License. import sys +import types import mock from oslo_config import cfg @@ -21,6 +22,7 @@ from neutron._i18n import _ from neutron.agent.l3 import agent from neutron.agent.l3 import namespaces from neutron.agent import l3_agent +from neutron.agent.linux import iptables_firewall class L3NATAgentForTest(agent.L3NATAgentWithStateReport): @@ -59,6 +61,40 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport): super(L3NATAgentForTest, self).__init__(host, conf) + def _create_router(self, router_id, router): + """Create a router with suffix added to the router namespace name. + + This is needed to be able to run two agents serving the same router + on the same node. + """ + router = ( + super(L3NATAgentForTest, self)._create_router(router_id, router)) + + router.get_internal_device_name = types.MethodType( + get_internal_device_name, router) + router.get_external_device_name = types.MethodType( + get_external_device_name, router) + + return router + + +def _append_suffix(dev_name): + # If dev_name = 'xyz123' and the suffix is 'hostB' then the result + # will be 'xy_stB' + return '%s_%s' % (dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:]) + + +def get_internal_device_name(ri, port_id): + return _append_suffix( + (namespaces.INTERNAL_DEV_PREFIX + port_id) + [:iptables_firewall.LINUX_DEV_LEN]) + + +def get_external_device_name(ri, port_id): + return _append_suffix( + (namespaces.EXTERNAL_DEV_PREFIX + port_id) + [:iptables_firewall.LINUX_DEV_LEN]) + OPTS = [ cfg.StrOpt('test_namespace_suffix', default='testprefix', diff --git a/neutron/tests/fullstack/test_l3_agent.py b/neutron/tests/fullstack/test_l3_agent.py index 0f2acb45f68..33cb684933f 100644 --- a/neutron/tests/fullstack/test_l3_agent.py +++ b/neutron/tests/fullstack/test_l3_agent.py @@ -14,11 +14,14 @@ import functools import netaddr +import os +import time from neutron_lib import constants from oslo_utils import uuidutils from neutron.agent.l3 import agent as l3_agent +from neutron.agent.l3 import ha_router from neutron.agent.l3 import namespaces from neutron.agent.linux import ip_lib from neutron.common import utils as common_utils @@ -51,25 +54,34 @@ class TestL3Agent(base.BaseFullStackTestCase): return port['port']['status'] == 'ACTIVE' common_utils.wait_until_true(lambda: is_port_status_active(), sleep=1) + def _create_and_attach_subnet( + self, tenant_id, subnet_cidr, network_id, router_id): + # For IPv6 subnets, enable_dhcp should be set to true. + enable_dhcp = (netaddr.IPNetwork(subnet_cidr).version == + constants.IP_VERSION_6) + subnet = self.safe_client.create_subnet( + tenant_id, network_id, subnet_cidr, enable_dhcp=enable_dhcp) + + router_interface_info = self.safe_client.add_router_interface( + router_id, subnet['id']) + self.block_until_port_status_active( + router_interface_info['port_id']) + + def _boot_fake_vm_in_network(self, host, tenant_id, network_id, wait=True): + vm = self.useFixture( + machine.FakeFullstackMachine( + host, network_id, tenant_id, self.safe_client)) + if wait: + vm.block_until_boot() + return vm + def _create_net_subnet_and_vm(self, tenant_id, subnet_cidrs, host, router): network = self.safe_client.create_network(tenant_id) for cidr in subnet_cidrs: - # For IPv6 subnets, enable_dhcp should be set to true. - enable_dhcp = (netaddr.IPNetwork(cidr).version == - constants.IP_VERSION_6) - subnet = self.safe_client.create_subnet( - tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp) + self._create_and_attach_subnet( + tenant_id, cidr, network['id'], router['id']) - router_interface_info = self.safe_client.add_router_interface( - router['id'], subnet['id']) - self.block_until_port_status_active( - router_interface_info['port_id']) - - vm = self.useFixture( - machine.FakeFullstackMachine( - host, network['id'], tenant_id, self.safe_client)) - vm.block_until_boot() - return vm + return self._boot_fake_vm_in_network(host, tenant_id, network['id']) class TestLegacyL3Agent(TestL3Agent): @@ -149,7 +161,7 @@ class TestLegacyL3Agent(TestL3Agent): external_vm.block_until_ping(fip['floating_ip_address']) -class TestHAL3Agent(base.BaseFullStackTestCase): +class TestHAL3Agent(TestL3Agent): def setUp(self): host_descriptions = [ @@ -180,3 +192,64 @@ class TestHAL3Agent(base.BaseFullStackTestCase): self._is_ha_router_active_on_one_agent, router['id']), timeout=90) + + def _get_keepalived_state(self, keepalived_state_file): + with open(keepalived_state_file, "r") as fd: + return fd.read() + + def _get_state_file_for_master_agent(self, router_id): + for host in self.environment.hosts: + keepalived_state_file = os.path.join( + host.neutron_config.state_path, "ha_confs", router_id, "state") + + if self._get_keepalived_state(keepalived_state_file) == "master": + return keepalived_state_file + + def test_keepalived_multiple_sighups_does_not_forfeit_mastership(self): + """Setup a complete "Neutron stack" - both an internal and an external + network+subnet, and a router connected to both. + """ + tenant_id = uuidutils.generate_uuid() + ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id) + router = self.safe_client.create_router(tenant_id, ha=True, + external_network=ext_net['id']) + common_utils.wait_until_true( + lambda: + len(self.client.list_l3_agent_hosting_routers( + router['id'])['agents']) == 2, + timeout=90) + common_utils.wait_until_true( + functools.partial( + self._is_ha_router_active_on_one_agent, + router['id']), + timeout=90) + keepalived_state_file = self._get_state_file_for_master_agent( + router['id']) + self.assertIsNotNone(keepalived_state_file) + network = self.safe_client.create_network(tenant_id) + self._create_and_attach_subnet( + tenant_id, '13.37.0.0/24', network['id'], router['id']) + + # Create 10 fake VMs, each with a floating ip. Each floating ip + # association should send a SIGHUP to the keepalived's parent process, + # unless the Throttler works. + host = self.environment.hosts[0] + vms = [self._boot_fake_vm_in_network(host, tenant_id, network['id'], + wait=False) + for i in range(10)] + for vm in vms: + self.safe_client.create_floatingip( + tenant_id, ext_net['id'], vm.ip, vm.neutron_port['id']) + + # Check that the keepalived's state file has not changed and is still + # master. This will indicate that the Throttler works. We want to check + # for ha_vrrp_advert_int (the default is 2 seconds), plus a bit more. + time_to_stop = (time.time() + + (common_utils.DEFAULT_THROTTLER_VALUE * + ha_router.THROTTLER_MULTIPLIER * 1.3)) + while True: + if time.time() > time_to_stop: + break + self.assertEqual( + "master", + self._get_keepalived_state(keepalived_state_file)) diff --git a/neutron/tests/unit/common/test_utils.py b/neutron/tests/unit/common/test_utils.py index a94aa35f291..3d775b2ff1c 100644 --- a/neutron/tests/unit/common/test_utils.py +++ b/neutron/tests/unit/common/test_utils.py @@ -854,3 +854,53 @@ class ImportModulesRecursivelyTestCase(base.BaseTestCase): for module in expected_modules: self.assertIn(module, modules) self.assertIn(module, sys.modules) + + +class TestThrottler(base.BaseTestCase): + def test_throttler(self): + threshold = 1 + orig_function = mock.Mock() + # Add this magic name as it's required by functools + orig_function.__name__ = 'mock_func' + throttled_func = utils.throttler(threshold)(orig_function) + + throttled_func() + + sleep = utils.eventlet.sleep + + def sleep_mock(amount_to_sleep): + sleep(amount_to_sleep) + self.assertTrue(threshold > amount_to_sleep) + + with mock.patch.object(utils.eventlet, "sleep", + side_effect=sleep_mock): + throttled_func() + + self.assertEqual(2, orig_function.call_count) + + lock_with_timer = six.get_function_closure( + throttled_func)[1].cell_contents + timestamp = lock_with_timer.timestamp - threshold + lock_with_timer.timestamp = timestamp + + throttled_func() + + self.assertEqual(3, orig_function.call_count) + self.assertTrue(timestamp < lock_with_timer.timestamp) + + def test_method_docstring_is_preserved(self): + class Klass(object): + @utils.throttler() + def method(self): + """Docstring""" + + self.assertEqual("Docstring", Klass.method.__doc__) + + def test_method_still_callable(self): + class Klass(object): + @utils.throttler() + def method(self): + pass + + obj = Klass() + obj.method()