Merge "Throttle SIGHUPs to keepalived"
This commit is contained in:
commit
fd041d6471
@ -34,6 +34,11 @@ HA_DEV_PREFIX = 'ha-'
|
||||
IP_MONITOR_PROCESS_SERVICE = 'ip_monitor'
|
||||
SIGTERM_TIMEOUT = 10
|
||||
|
||||
# The multiplier is used to compensate execution time of function sending
|
||||
# SIGHUP to keepalived process. The constant multiplies ha_vrrp_advert_int
|
||||
# config option and the result is the throttle delay.
|
||||
THROTTLER_MULTIPLIER = 1.5
|
||||
|
||||
|
||||
class HaRouterNamespace(namespaces.RouterNamespace):
|
||||
"""Namespace for HA router.
|
||||
@ -124,7 +129,9 @@ class HaRouter(router.RouterInfo):
|
||||
keepalived.KeepalivedConf(),
|
||||
process_monitor,
|
||||
conf_path=self.agent_conf.ha_confs_path,
|
||||
namespace=self.ha_namespace)
|
||||
namespace=self.ha_namespace,
|
||||
throttle_restart_value=(
|
||||
self.agent_conf.ha_vrrp_advert_int * THROTTLER_MULTIPLIER))
|
||||
|
||||
config = self.keepalived_manager.config
|
||||
|
||||
|
@ -27,6 +27,7 @@ from oslo_utils import fileutils
|
||||
from neutron._i18n import _, _LE
|
||||
from neutron.agent.linux import external_process
|
||||
from neutron.common import constants
|
||||
from neutron.common import utils
|
||||
|
||||
VALID_STATES = ['MASTER', 'BACKUP']
|
||||
VALID_AUTH_TYPES = ['AH', 'PASS']
|
||||
@ -367,12 +368,20 @@ class KeepalivedManager(object):
|
||||
"""
|
||||
|
||||
def __init__(self, resource_id, config, process_monitor, conf_path='/tmp',
|
||||
namespace=None):
|
||||
namespace=None, throttle_restart_value=None):
|
||||
self.resource_id = resource_id
|
||||
self.config = config
|
||||
self.namespace = namespace
|
||||
self.process_monitor = process_monitor
|
||||
self.conf_path = conf_path
|
||||
# configure throttler for spawn to introduce delay between SIGHUPs,
|
||||
# otherwise keepalived master may unnecessarily flip to slave
|
||||
if throttle_restart_value is not None:
|
||||
self._throttle_spawn(throttle_restart_value)
|
||||
|
||||
#pylint: disable=method-hidden
|
||||
def _throttle_spawn(self, threshold):
|
||||
self.spawn = utils.throttler(threshold)(self.spawn)
|
||||
|
||||
def get_conf_dir(self):
|
||||
confs_dir = os.path.abspath(os.path.normpath(self.conf_path))
|
||||
|
@ -25,6 +25,7 @@ import os.path
|
||||
import random
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
import weakref
|
||||
@ -54,6 +55,8 @@ TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
|
||||
LOG = logging.getLogger(__name__)
|
||||
SYNCHRONIZED_PREFIX = 'neutron-'
|
||||
|
||||
DEFAULT_THROTTLER_VALUE = 2
|
||||
|
||||
synchronized = lockutils.synchronized_with_prefix(SYNCHRONIZED_PREFIX)
|
||||
|
||||
|
||||
@ -61,6 +64,51 @@ class WaitTimeout(Exception):
|
||||
"""Default exception coming from wait_until_true() function."""
|
||||
|
||||
|
||||
class LockWithTimer(object):
|
||||
def __init__(self, threshold):
|
||||
self._threshold = threshold
|
||||
self.timestamp = 0
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def acquire(self):
|
||||
return self._lock.acquire(False)
|
||||
|
||||
def release(self):
|
||||
return self._lock.release()
|
||||
|
||||
def time_to_wait(self):
|
||||
return self.timestamp - time.time() + self._threshold
|
||||
|
||||
|
||||
# REVISIT(jlibosva): Some parts of throttler may be similar to what
|
||||
# neutron.notifiers.batch_notifier.BatchNotifier does. They
|
||||
# could be refactored and unified.
|
||||
def throttler(threshold=DEFAULT_THROTTLER_VALUE):
|
||||
"""Throttle number of calls to a function to only once per 'threshold'.
|
||||
"""
|
||||
def decorator(f):
|
||||
lock_with_timer = LockWithTimer(threshold)
|
||||
|
||||
@functools.wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
if lock_with_timer.acquire():
|
||||
try:
|
||||
fname = f.__name__
|
||||
time_to_wait = lock_with_timer.time_to_wait()
|
||||
if time_to_wait > 0:
|
||||
LOG.debug("Call of function %s scheduled, sleeping "
|
||||
"%.1f seconds", fname, time_to_wait)
|
||||
# Decorated function has been called recently, wait.
|
||||
eventlet.sleep(time_to_wait)
|
||||
lock_with_timer.timestamp = time.time()
|
||||
finally:
|
||||
lock_with_timer.release()
|
||||
LOG.debug("Calling throttled function %s", fname)
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
@removals.remove(
|
||||
message="Use ensure_tree(path, 0o755) from oslo_utils.fileutils")
|
||||
def ensure_dir(dir_path):
|
||||
|
@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import sys
|
||||
import types
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
@ -21,6 +22,7 @@ from neutron._i18n import _
|
||||
from neutron.agent.l3 import agent
|
||||
from neutron.agent.l3 import namespaces
|
||||
from neutron.agent import l3_agent
|
||||
from neutron.common import constants
|
||||
|
||||
|
||||
class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
|
||||
@ -59,6 +61,40 @@ class L3NATAgentForTest(agent.L3NATAgentWithStateReport):
|
||||
|
||||
super(L3NATAgentForTest, self).__init__(host, conf)
|
||||
|
||||
def _create_router(self, router_id, router):
|
||||
"""Create a router with suffix added to the router namespace name.
|
||||
|
||||
This is needed to be able to run two agents serving the same router
|
||||
on the same node.
|
||||
"""
|
||||
router = (
|
||||
super(L3NATAgentForTest, self)._create_router(router_id, router))
|
||||
|
||||
router.get_internal_device_name = types.MethodType(
|
||||
get_internal_device_name, router)
|
||||
router.get_external_device_name = types.MethodType(
|
||||
get_external_device_name, router)
|
||||
|
||||
return router
|
||||
|
||||
|
||||
def _append_suffix(dev_name):
|
||||
# If dev_name = 'xyz123' and the suffix is 'hostB' then the result
|
||||
# will be 'xy_stB'
|
||||
return '%s_%s' % (dev_name[:-4], cfg.CONF.test_namespace_suffix[-3:])
|
||||
|
||||
|
||||
def get_internal_device_name(ri, port_id):
|
||||
return _append_suffix(
|
||||
(namespaces.INTERNAL_DEV_PREFIX + port_id)
|
||||
[:constants.LINUX_DEV_LEN])
|
||||
|
||||
|
||||
def get_external_device_name(ri, port_id):
|
||||
return _append_suffix(
|
||||
(namespaces.EXTERNAL_DEV_PREFIX + port_id)
|
||||
[:constants.LINUX_DEV_LEN])
|
||||
|
||||
|
||||
OPTS = [
|
||||
cfg.StrOpt('test_namespace_suffix', default='testprefix',
|
||||
|
@ -14,10 +14,13 @@
|
||||
|
||||
import functools
|
||||
import netaddr
|
||||
import os
|
||||
import time
|
||||
|
||||
from neutron_lib import constants
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron.agent.l3 import ha_router
|
||||
from neutron.agent.l3 import namespaces
|
||||
from neutron.agent.linux import ip_lib
|
||||
from neutron.common import utils as common_utils
|
||||
@ -50,25 +53,34 @@ class TestL3Agent(base.BaseFullStackTestCase):
|
||||
return port['port']['status'] == 'ACTIVE'
|
||||
common_utils.wait_until_true(lambda: is_port_status_active(), sleep=1)
|
||||
|
||||
def _create_and_attach_subnet(
|
||||
self, tenant_id, subnet_cidr, network_id, router_id):
|
||||
# For IPv6 subnets, enable_dhcp should be set to true.
|
||||
enable_dhcp = (netaddr.IPNetwork(subnet_cidr).version ==
|
||||
constants.IP_VERSION_6)
|
||||
subnet = self.safe_client.create_subnet(
|
||||
tenant_id, network_id, subnet_cidr, enable_dhcp=enable_dhcp)
|
||||
|
||||
router_interface_info = self.safe_client.add_router_interface(
|
||||
router_id, subnet['id'])
|
||||
self.block_until_port_status_active(
|
||||
router_interface_info['port_id'])
|
||||
|
||||
def _boot_fake_vm_in_network(self, host, tenant_id, network_id, wait=True):
|
||||
vm = self.useFixture(
|
||||
machine.FakeFullstackMachine(
|
||||
host, network_id, tenant_id, self.safe_client))
|
||||
if wait:
|
||||
vm.block_until_boot()
|
||||
return vm
|
||||
|
||||
def _create_net_subnet_and_vm(self, tenant_id, subnet_cidrs, host, router):
|
||||
network = self.safe_client.create_network(tenant_id)
|
||||
for cidr in subnet_cidrs:
|
||||
# For IPv6 subnets, enable_dhcp should be set to true.
|
||||
enable_dhcp = (netaddr.IPNetwork(cidr).version ==
|
||||
constants.IP_VERSION_6)
|
||||
subnet = self.safe_client.create_subnet(
|
||||
tenant_id, network['id'], cidr, enable_dhcp=enable_dhcp)
|
||||
self._create_and_attach_subnet(
|
||||
tenant_id, cidr, network['id'], router['id'])
|
||||
|
||||
router_interface_info = self.safe_client.add_router_interface(
|
||||
router['id'], subnet['id'])
|
||||
self.block_until_port_status_active(
|
||||
router_interface_info['port_id'])
|
||||
|
||||
vm = self.useFixture(
|
||||
machine.FakeFullstackMachine(
|
||||
host, network['id'], tenant_id, self.safe_client))
|
||||
vm.block_until_boot()
|
||||
return vm
|
||||
return self._boot_fake_vm_in_network(host, tenant_id, network['id'])
|
||||
|
||||
|
||||
class TestLegacyL3Agent(TestL3Agent):
|
||||
@ -175,7 +187,7 @@ class TestLegacyL3Agent(TestL3Agent):
|
||||
vm.block_until_ping(external_vm.ipv6)
|
||||
|
||||
|
||||
class TestHAL3Agent(base.BaseFullStackTestCase):
|
||||
class TestHAL3Agent(TestL3Agent):
|
||||
|
||||
def setUp(self):
|
||||
host_descriptions = [
|
||||
@ -206,3 +218,64 @@ class TestHAL3Agent(base.BaseFullStackTestCase):
|
||||
self._is_ha_router_active_on_one_agent,
|
||||
router['id']),
|
||||
timeout=90)
|
||||
|
||||
def _get_keepalived_state(self, keepalived_state_file):
|
||||
with open(keepalived_state_file, "r") as fd:
|
||||
return fd.read()
|
||||
|
||||
def _get_state_file_for_master_agent(self, router_id):
|
||||
for host in self.environment.hosts:
|
||||
keepalived_state_file = os.path.join(
|
||||
host.neutron_config.state_path, "ha_confs", router_id, "state")
|
||||
|
||||
if self._get_keepalived_state(keepalived_state_file) == "master":
|
||||
return keepalived_state_file
|
||||
|
||||
def test_keepalived_multiple_sighups_does_not_forfeit_mastership(self):
|
||||
"""Setup a complete "Neutron stack" - both an internal and an external
|
||||
network+subnet, and a router connected to both.
|
||||
"""
|
||||
tenant_id = uuidutils.generate_uuid()
|
||||
ext_net, ext_sub = self._create_external_network_and_subnet(tenant_id)
|
||||
router = self.safe_client.create_router(tenant_id, ha=True,
|
||||
external_network=ext_net['id'])
|
||||
common_utils.wait_until_true(
|
||||
lambda:
|
||||
len(self.client.list_l3_agent_hosting_routers(
|
||||
router['id'])['agents']) == 2,
|
||||
timeout=90)
|
||||
common_utils.wait_until_true(
|
||||
functools.partial(
|
||||
self._is_ha_router_active_on_one_agent,
|
||||
router['id']),
|
||||
timeout=90)
|
||||
keepalived_state_file = self._get_state_file_for_master_agent(
|
||||
router['id'])
|
||||
self.assertIsNotNone(keepalived_state_file)
|
||||
network = self.safe_client.create_network(tenant_id)
|
||||
self._create_and_attach_subnet(
|
||||
tenant_id, '13.37.0.0/24', network['id'], router['id'])
|
||||
|
||||
# Create 10 fake VMs, each with a floating ip. Each floating ip
|
||||
# association should send a SIGHUP to the keepalived's parent process,
|
||||
# unless the Throttler works.
|
||||
host = self.environment.hosts[0]
|
||||
vms = [self._boot_fake_vm_in_network(host, tenant_id, network['id'],
|
||||
wait=False)
|
||||
for i in range(10)]
|
||||
for vm in vms:
|
||||
self.safe_client.create_floatingip(
|
||||
tenant_id, ext_net['id'], vm.ip, vm.neutron_port['id'])
|
||||
|
||||
# Check that the keepalived's state file has not changed and is still
|
||||
# master. This will indicate that the Throttler works. We want to check
|
||||
# for ha_vrrp_advert_int (the default is 2 seconds), plus a bit more.
|
||||
time_to_stop = (time.time() +
|
||||
(common_utils.DEFAULT_THROTTLER_VALUE *
|
||||
ha_router.THROTTLER_MULTIPLIER * 1.3))
|
||||
while True:
|
||||
if time.time() > time_to_stop:
|
||||
break
|
||||
self.assertEqual(
|
||||
"master",
|
||||
self._get_keepalived_state(keepalived_state_file))
|
||||
|
@ -703,3 +703,53 @@ class ImportModulesRecursivelyTestCase(base.BaseTestCase):
|
||||
for module in expected_modules:
|
||||
self.assertIn(module, modules)
|
||||
self.assertIn(module, sys.modules)
|
||||
|
||||
|
||||
class TestThrottler(base.BaseTestCase):
|
||||
def test_throttler(self):
|
||||
threshold = 1
|
||||
orig_function = mock.Mock()
|
||||
# Add this magic name as it's required by functools
|
||||
orig_function.__name__ = 'mock_func'
|
||||
throttled_func = utils.throttler(threshold)(orig_function)
|
||||
|
||||
throttled_func()
|
||||
|
||||
sleep = utils.eventlet.sleep
|
||||
|
||||
def sleep_mock(amount_to_sleep):
|
||||
sleep(amount_to_sleep)
|
||||
self.assertTrue(threshold > amount_to_sleep)
|
||||
|
||||
with mock.patch.object(utils.eventlet, "sleep",
|
||||
side_effect=sleep_mock):
|
||||
throttled_func()
|
||||
|
||||
self.assertEqual(2, orig_function.call_count)
|
||||
|
||||
lock_with_timer = six.get_function_closure(
|
||||
throttled_func)[1].cell_contents
|
||||
timestamp = lock_with_timer.timestamp - threshold
|
||||
lock_with_timer.timestamp = timestamp
|
||||
|
||||
throttled_func()
|
||||
|
||||
self.assertEqual(3, orig_function.call_count)
|
||||
self.assertTrue(timestamp < lock_with_timer.timestamp)
|
||||
|
||||
def test_method_docstring_is_preserved(self):
|
||||
class Klass(object):
|
||||
@utils.throttler()
|
||||
def method(self):
|
||||
"""Docstring"""
|
||||
|
||||
self.assertEqual("Docstring", Klass.method.__doc__)
|
||||
|
||||
def test_method_still_callable(self):
|
||||
class Klass(object):
|
||||
@utils.throttler()
|
||||
def method(self):
|
||||
pass
|
||||
|
||||
obj = Klass()
|
||||
obj.method()
|
||||
|
Loading…
Reference in New Issue
Block a user