Merge "Add conntrackd support to HA routers in L3 agent"

This commit is contained in:
Zuul 2024-12-24 04:58:17 +00:00 committed by Gerrit Code Review
commit a45feadd44
17 changed files with 961 additions and 55 deletions

View File

@ -57,6 +57,8 @@ keepalived: CommandFilter, keepalived, root
keepalived_env: EnvFilter, env, root, PROCESS_TAG=, keepalived
keepalived_state_change: CommandFilter, neutron-keepalived-state-change, root
keepalived_state_change_env: EnvFilter, env, root, PROCESS_TAG=, neutron-keepalived-state-change
conntrackd: CommandFilter, conntrackd, root
conntrackd_env: EnvFilter, env, root, PROCESS_TAG=, conntrackd
# OPEN VSWITCH
ovs-ofctl: CommandFilter, ovs-ofctl, root

View File

@ -24,6 +24,7 @@ from oslo_log import log as logging
from neutron.agent.l3 import namespaces
from neutron.agent.l3 import router_info as router
from neutron.agent.linux import conntrackd
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.agent.linux import keepalived
@ -82,6 +83,7 @@ class HaRouter(router.RouterInfo):
self.keepalived_manager = None
self._ha_state = None
self._ha_state_path = None
self.conntrackd_manager = None
def create_router_namespace_object(
self, router_id, agent_conf, iface_driver, use_ipv6):
@ -161,11 +163,31 @@ class HaRouter(router.RouterInfo):
super().initialize(process_monitor)
self.set_ha_port()
self._init_conntrackd_manager(process_monitor)
self._init_keepalived_manager(process_monitor)
self._check_and_set_real_state()
self.ha_network_added()
self.spawn_state_change_monitor(process_monitor)
def _get_ha_port_fixed_ip_with_subnet(self, subnet):
for fixed_ip in self.ha_port.get('fixed_ips', []):
if fixed_ip['subnet_id'] == subnet['id']:
return fixed_ip['ip_address']
return None
def _get_conntrackd_ipv4_interface(self):
return self.ha_port['fixed_ips'][0]['ip_address']
def _init_conntrackd_manager(self, process_monitor):
self.conntrackd_manager = conntrackd.ConntrackdManager(
self.router['id'],
process_monitor,
self.agent_conf,
self._get_conntrackd_ipv4_interface(),
self.ha_vr_id,
self.get_ha_device_name(),
namespace=self.ha_namespace)
def _init_keepalived_manager(self, process_monitor):
self.keepalived_manager = keepalived.KeepalivedManager(
self.router['id'],
@ -185,6 +207,9 @@ class HaRouter(router.RouterInfo):
interface_name = self.get_ha_device_name()
subnets = self.ha_port.get('subnets', [])
ha_port_cidrs = [subnet['cidr'] for subnet in subnets]
notify_script = (self.agent_conf.ha_conntrackd_enabled and
self.conntrackd_manager.get_ha_script_path() or
None)
instance = keepalived.KeepalivedInstance(
'BACKUP',
interface_name,
@ -195,7 +220,9 @@ class HaRouter(router.RouterInfo):
priority=self.ha_priority,
vrrp_health_check_interval=(
self.agent_conf.ha_vrrp_health_check_interval),
ha_conf_dir=self.keepalived_manager.get_conf_dir())
ha_conf_dir=self.keepalived_manager.get_conf_dir(),
notify_script=notify_script,
)
instance.track_interfaces.append(interface_name)
if self.agent_conf.ha_vrrp_auth_password:
@ -206,20 +233,31 @@ class HaRouter(router.RouterInfo):
config.add_instance(instance)
def _disable_manager(self, manager, remove_config):
if not manager:
LOG.debug('Error while disabling manager for %s - no manager',
self.router_id)
return
manager.disable()
if remove_config:
conf_dir = manager.get_conf_dir()
try:
shutil.rmtree(conf_dir)
except FileNotFoundError:
pass
def enable_keepalived(self):
self.keepalived_manager.spawn()
def disable_keepalived(self):
if not self.keepalived_manager:
LOG.debug('Error while disabling keepalived for %s - no manager',
self.router_id)
return
self.keepalived_manager.disable()
conf_dir = self.keepalived_manager.get_conf_dir()
try:
shutil.rmtree(conf_dir)
except FileNotFoundError:
pass
def disable_keepalived(self, remove_config=True):
self._disable_manager(self.keepalived_manager, remove_config)
def enable_conntrackd(self):
self.conntrackd_manager.spawn()
def disable_conntrackd(self, remove_config=True):
self._disable_manager(self.conntrackd_manager, remove_config)
def _get_keepalived_instance(self):
return self.keepalived_manager.config.get_instance(self.ha_vr_id)
@ -519,7 +557,16 @@ class HaRouter(router.RouterInfo):
def delete(self):
if self.process_monitor:
self.destroy_state_change_monitor(self.process_monitor)
self.disable_keepalived()
# Only remove the conf_dir after keepalived and conntrackd have been
# disabled. They share the same configuration directory.
self.disable_keepalived(
remove_config=not self.agent_conf.ha_conntrackd_enabled,
)
if self.agent_conf.ha_conntrackd_enabled:
self.disable_conntrackd(remove_config=True)
self.ha_network_removed()
super().delete()
@ -544,6 +591,11 @@ class HaRouter(router.RouterInfo):
"port": self.ha_port})
if (self.ha_port and
self.ha_port['status'] == n_consts.PORT_STATUS_ACTIVE):
# Conntrackd needs to be enabled first, otherwise the keepalived
# script would try to start it (possibly with the wrong
# configuration).
if self.agent_conf.ha_conntrackd_enabled:
self.enable_conntrackd()
self.enable_keepalived()
@runtime.synchronized('enable_radvd')

View File

@ -0,0 +1,384 @@
# Copyright (c) 2015 UnitedStack, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import signal
import stat
import jinja2
from neutron_lib.utils import file as file_utils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils.fileutils import ensure_tree
from neutron._i18n import _
from neutron.agent.linux import external_process
from neutron.agent.linux import utils
from neutron.common import utils as common_utils
CONNTRACKD_SERVICE_NAME = 'conntrackd'
SIGTERM_TIMEOUT = 5
LOG = logging.getLogger(__name__)
CONFIG_TEMPLATE = jinja2.Template(
"""
General {
HashSize {{ hash_size }}
HashLimit {{ hash_limit }}
Syslog on
LockFile {{ lockfile_path }}
UNIX {
Path {{ socket_path }}
Backlog {{ unix_backlog }}
}
SocketBufferSize {{ socket_buffer_size }}
SocketBufferSizeMaxGrown {{ socket_buffer_size_max_grown }}
Filter From Kernelspace {
Protocol Accept {
{%- for proto in protocol_accept %}
{{ proto }}
{%- endfor %}
}
Address Ignore {
{%- for version, addr in address_ignore %}
IPv{{ version }}_address {{ addr }}
{%- endfor %}
}
}
}
Sync {
Mode FTFW {
}
Multicast Default {
IPv4_address {{ ipv4_mcast_addr }}
IPv4_interface {{ ipv4_interface }}
Group {{ mcast_group }}
Interface {{ interface }}
SndSocketBuffer {{ snd_socket_buffer }}
RcvSocketBuffer {{ rcv_socket_buffer }}
Checksum on
}
}
""")
HA_SCRIPT_TEMPLATE = jinja2.Template(
"""#!/usr/bin/env sh
#
# (C) 2006-2011 by Pablo Neira Ayuso <pablo@netfilter.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# Description:
#
# This is the script for primary-backup setups for keepalived
# (http://www.keepalived.org). You may adapt it to make it work with other
# high-availability managers.
#
# Do not forget to include the required modifications to your keepalived.conf
# file to invoke this script during keepalived's state transitions.
#
# Contributions to improve this script are welcome :).
#
CONNTRACKD_BIN=/usr/sbin/conntrackd
CONNTRACKD_LOCK={{ lock }}
CONNTRACKD_CONFIG={{ config }}
case "$1" in
primary)
#
# commit the external cache into the kernel table
#
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -c
if [ $? -eq 1 ]
then
logger "ERROR: failed to invoke conntrackd -c"
fi
#
# flush the internal and the external caches
#
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -f
if [ $? -eq 1 ]
then
logger "ERROR: failed to invoke conntrackd -f"
fi
#
# resynchronize my internal cache to the kernel table
#
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -R
if [ $? -eq 1 ]
then
logger "ERROR: failed to invoke conntrackd -R"
fi
#
# send a bulk update to backups
#
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -B
if [ $? -eq 1 ]
then
logger "ERROR: failed to invoke conntrackd -B"
fi
;;
backup)
#
# is conntrackd running? request some statistics to check it
#
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -s
if [ $? -eq 1 ]
then
#
# something's wrong, do we have a lock file?
#
if [ -f $CONNTRACKD_LOCK ]
then
logger "WARNING: conntrackd was not cleanly stopped."
logger "If you suspect that it has crashed:"
logger "1) Enable coredumps"
logger "2) Try to reproduce the problem"
logger "3) Post the coredump to netfilter-devel@vger.kernel.org"
rm -f $CONNTRACKD_LOCK
fi
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -d
if [ $? -eq 1 ]
then
logger "ERROR: cannot launch conntrackd"
exit 1
fi
fi
#
# shorten kernel conntrack timers to remove the zombie entries.
#
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -t
if [ $? -eq 1 ]
then
logger "ERROR: failed to invoke conntrackd -t"
fi
#
# request resynchronization with master firewall replica (if any)
# Note: this does nothing in the alarm approach.
#
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -n
if [ $? -eq 1 ]
then
logger "ERROR: failed to invoke conntrackd -n"
fi
;;
fault)
#
# shorten kernel conntrack timers to remove the zombie entries.
#
$CONNTRACKD_BIN -C $CONNTRACKD_CONFIG -t
if [ $? -eq 1 ]
then
logger "ERROR: failed to invoke conntrackd -t"
fi
;;
*)
logger "ERROR: unknown state transition"
echo "Usage: primary-backup.sh {primary|backup|fault}"
exit 1
;;
esac
exit 0
""")
class ConntrackdManager:
"""Wrapper for conntrackd.
This wrapper permits to write conntrackd config file,
to start/restart conntrackd process.
"""
def __init__(self, resource_id, process_monitor, agent_conf,
mcast_iface_addr, ha_vr_id, ha_iface, namespace=None):
self.resource_id = resource_id
self.process_monitor = process_monitor
self.agent_conf = agent_conf
self.mcast_iface_addr = mcast_iface_addr
self.ha_vr_id = ha_vr_id
self.ha_iface = ha_iface
self.namespace = namespace
def build_ha_script(self):
ha_script_content = HA_SCRIPT_TEMPLATE.render(
lock=self.get_lockfile_path(),
config=self.get_conffile_path(),
)
ha_script_path = self.get_ha_script_path()
file_utils.replace_file(ha_script_path, ha_script_content)
os.chmod(ha_script_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
def get_full_config_file_path(self, filename, maxlen=255):
# Maximum PATH lenght for most paths in conntrackd is limited to
# 255 characters.
conf_dir = self.get_conf_dir()
ensure_tree(conf_dir, 0o755)
path = os.path.join(conf_dir, filename)
if len(path) > maxlen:
raise ValueError(_('Configuration file path "%(path)s" exceeds '
'maximum length of %(maxlen)i characters.') %
{'path': path,
'maxlen': maxlen})
return path
def get_conf_dir(self):
confs_dir = os.path.abspath(
os.path.normpath(self.agent_conf.ha_confs_path))
conf_dir = os.path.join(confs_dir, self.resource_id)
return conf_dir
def get_ha_script_path(self):
return self.get_full_config_file_path('primary-backup.sh')
def get_pid_file_path(self):
return self.get_full_config_file_path('conntrackd.pid')
def get_lockfile_path(self):
return self.get_full_config_file_path('conntrackd.lock')
def get_ctlfile_path(self):
# Unix socket path length is limited to 107 characters in the
# conntrackd source code. See UNIX_PATH_MAX constant in include/local.h
return self.get_full_config_file_path('conntrackd.ctl', maxlen=107)
def get_conffile_path(self):
return self.get_full_config_file_path('conntrackd.conf')
def create_pid_file(self):
config_path = self.get_conffile_path()
pid_file = self.get_pid_file_path()
cmd = 'conntrackd -d -C %s' % config_path
pid = utils.find_pid_by_cmd(cmd)
file_utils.replace_file(pid_file, pid)
def spawn(self):
config_path = self.output_config_file()
self.build_ha_script()
def callback(pidfile):
cmd = ['conntrackd', '-d',
'-C', config_path]
return cmd
def pre_cmd_callback():
# conntrackd.lock and conntrackd.ctl must be removed before
# starting a new conntrackd.
utils.delete_if_exists(self.get_lockfile_path(), run_as_root=True)
utils.delete_if_exists(self.get_ctlfile_path(), run_as_root=True)
def post_cmd_callback():
self.create_pid_file()
# Synchronize connection tracking state with peer
cmd = ['conntrackd', '-C', config_path, '-n']
utils.execute(cmd, run_as_root=True, check_exit_code=True)
pm = self.get_process(callback=callback,
pre_cmd_callback=pre_cmd_callback,
post_cmd_callback=post_cmd_callback)
pm.enable(reload_cfg=False)
self.process_monitor.register(uuid=self.resource_id,
service_name=CONNTRACKD_SERVICE_NAME,
monitored_process=pm)
LOG.debug('Conntrackd spawned with config %s', config_path)
def get_process(self, callback=None, pre_cmd_callback=None,
post_cmd_callback=None):
return external_process.ProcessManager(
cfg.CONF,
self.resource_id,
self.namespace,
default_cmd_callback=callback,
default_pre_cmd_callback=pre_cmd_callback,
default_post_cmd_callback=post_cmd_callback,
pid_file=self.get_pid_file_path())
def disable(self):
self.process_monitor.unregister(uuid=self.resource_id,
service_name=CONNTRACKD_SERVICE_NAME)
pm = self.get_process()
if not pm.active:
return
# First try to stop conntrackd by using it's own control command
config_path = self.get_conffile_path()
cmd = ['conntrackd', '-C', config_path, '-k']
utils.execute(cmd, run_as_root=True)
try:
common_utils.wait_until_true(lambda: not pm.active,
timeout=SIGTERM_TIMEOUT)
except common_utils.WaitTimeout:
LOG.warning('Conntrackd process %s did not finish after asking it '
'to shut down in %s seconds, sending SIGKILL signal.',
pm.pid, SIGTERM_TIMEOUT)
pm.disable(sig=str(int(signal.SIGKILL)))
def build_config(self):
return CONFIG_TEMPLATE.render(
hash_size=self.agent_conf.ha_conntrackd_hashsize,
hash_limit=self.agent_conf.ha_conntrackd_hashlimit,
lockfile_path=self.get_lockfile_path(),
socket_path=self.get_ctlfile_path(),
unix_backlog=self.agent_conf.ha_conntrackd_unix_backlog,
socket_buffer_size=self.agent_conf.ha_conntrackd_socketbuffersize,
socket_buffer_size_max_grown=(
self.agent_conf.ha_conntrackd_socketbuffersize_max_grown
),
protocol_accept=[
'TCP', 'SCTP', 'DCCP', 'UDP', 'ICMP', 'IPv6-ICMP'
],
# Ignore loopback and HA sync addresses
address_ignore=[
(4, '127.0.0.1'),
(6, '::1'),
(4, self.mcast_iface_addr),
],
ipv4_mcast_addr=self.agent_conf.ha_conntrackd_ipv4_mcast_addr,
ipv4_interface=self.mcast_iface_addr,
mcast_group=self.agent_conf.ha_conntrackd_group + self.ha_vr_id,
interface=self.ha_iface,
snd_socket_buffer=self.agent_conf.ha_conntrackd_sndsocketbuffer,
rcv_socket_buffer=self.agent_conf.ha_conntrackd_rcvsocketbuffer,
)
def output_config_file(self):
config_path = self.get_conffile_path()
file_utils.replace_file(config_path, self.build_config())
return config_path

View File

@ -57,12 +57,17 @@ class ProcessManager(MonitoredProcess):
def __init__(self, conf, uuid, namespace=None, service=None,
pids_path=None, default_cmd_callback=None,
cmd_addl_env=None, pid_file=None, run_as_root=False,
custom_reload_callback=None):
custom_reload_callback=None,
default_pre_cmd_callback=None,
default_post_cmd_callback=None):
self.conf = conf
self.uuid = uuid
self.namespace = namespace
self.default_cmd_callback = default_cmd_callback
self.default_pre_cmd_callback = default_pre_cmd_callback
self.default_post_cmd_callback = default_post_cmd_callback
self.cmd_addl_env = cmd_addl_env
self.pids_path = pids_path or self.conf.external_pids
self.pid_file = pid_file
self.run_as_root = run_as_root or self.namespace is not None
@ -83,8 +88,15 @@ class ProcessManager(MonitoredProcess):
fileutils.ensure_tree(os.path.dirname(self.get_pid_file_name()),
mode=0o755)
def enable(self, cmd_callback=None, reload_cfg=False, ensure_active=False):
def enable(self, cmd_callback=None, reload_cfg=False, ensure_active=False,
pre_cmd_callback=None, post_cmd_callback=None):
if not self.active:
pre_cmd_callback = (pre_cmd_callback or
self.default_pre_cmd_callback)
if pre_cmd_callback:
pre_cmd_callback()
if not cmd_callback:
cmd_callback = self.default_cmd_callback
# Always try and remove the pid file, as it's existence could
@ -104,6 +116,12 @@ class ProcessManager(MonitoredProcess):
ip_wrapper = ip_lib.IPWrapper(namespace=self.namespace)
ip_wrapper.netns.execute(cmd, addl_env=self.cmd_addl_env,
run_as_root=self.run_as_root)
post_cmd_callback = (post_cmd_callback or
self.default_post_cmd_callback)
if post_cmd_callback:
post_cmd_callback()
elif reload_cfg:
self.reload_cfg()
if ensure_active:

View File

@ -185,11 +185,11 @@ class KeepalivedInstance:
"""Instance section of a keepalived configuration."""
def __init__(self, state, interface, vrouter_id, ha_cidrs,
priority=HA_DEFAULT_PRIORITY, advert_int=None,
mcast_src_ip=None, nopreempt=False,
priority=HA_DEFAULT_PRIORITY,
advert_int=None, mcast_src_ip=None, nopreempt=False,
garp_primary_delay=GARP_PRIMARY_DELAY,
vrrp_health_check_interval=0,
ha_conf_dir=None):
ha_conf_dir=None, notify_script=None):
self.name = 'VR_%s' % vrouter_id
if state not in VALID_STATES:
@ -201,6 +201,7 @@ class KeepalivedInstance:
self.priority = priority
self.nopreempt = nopreempt
self.advert_int = advert_int
self.notify_script = notify_script
self.mcast_src_ip = mcast_src_ip
self.garp_primary_delay = garp_primary_delay
self.track_interfaces = []
@ -317,6 +318,13 @@ class KeepalivedInstance:
' priority %s' % self.priority,
' garp_master_delay %s' % self.garp_primary_delay])
if self.notify_script:
config.extend([
' notify_master "%s primary"' % self.notify_script,
' notify_backup "%s backup"' % self.notify_script,
' notify_fault "%s fault"' % self.notify_script,
])
if self.nopreempt:
config.append(' nopreempt')

View File

@ -35,6 +35,7 @@ from oslo_utils import excutils
from oslo_utils import fileutils
import psutil
from neutron._i18n import _
from neutron.api import wsgi
from neutron.common import utils
from neutron.conf.agent import common as config
@ -181,6 +182,20 @@ def find_child_pids(pid, recursive=False):
return child_pids
def find_pid_by_cmd(cmd):
"""Retrieve a list of the pids by their cmd."""
pids = execute(['pgrep', '-f', cmd], log_fail_as_error=False).split()
if len(pids) > 1:
raise RuntimeError(
_('%i processes for "%s" found.' % (len(pids), cmd))
)
if pids == []:
raise RuntimeError(_('No process for "%s" found.' % cmd))
return pids[0]
def find_parent_pid(pid):
"""Retrieve the pid of the parent process of the given pid.

View File

@ -54,6 +54,41 @@ OPTS = [
'as primary, and a primary election will be repeated '
'in a round-robin fashion, until one of the routers '
'restores the gateway connection.')),
cfg.BoolOpt('ha_conntrackd_enabled',
default=False,
help=_("Enable conntrackd to synchronize connection "
"tracking states between HA routers.")),
cfg.IntOpt('ha_conntrackd_hashsize',
default=32768,
help=_('Number of buckets in the cache hashtable')),
cfg.IntOpt('ha_conntrackd_hashlimit',
default=131072,
help=_('Maximum number of conntracks')),
cfg.IntOpt('ha_conntrackd_unix_backlog',
default=20,
help=_('Unix socket backlog')),
cfg.IntOpt('ha_conntrackd_socketbuffersize',
default=262142,
help=_('Socket buffer size for events')),
cfg.IntOpt('ha_conntrackd_socketbuffersize_max_grown',
default=655355,
help=_('Maximum size of socket buffer')),
cfg.StrOpt('ha_conntrackd_ipv4_mcast_addr',
default='225.0.0.50',
help=_('Multicast address: The address that you use as '
'destination in the synchronization messages')),
cfg.IntOpt('ha_conntrackd_group',
default=3780,
help=_('The multicast base port number. The generated virtual '
'router ID added to this value.')),
cfg.IntOpt('ha_conntrackd_sndsocketbuffer',
default=24985600,
help=_('Buffer used to enqueue the packets that are going '
'to be transmitted')),
cfg.IntOpt('ha_conntrackd_rcvsocketbuffer',
default=24985600,
help=_('Buffer used to enqueue the packets that the socket '
'is pending to handle')),
]

View File

@ -69,7 +69,12 @@ class NeutronConfigFixture(ConfigFixture):
self.config.update({
'DEFAULT': {
'host': self._generate_host(),
'state_path': self._generate_state_path(self.temp_dir),
# Enable conntrackd for tests to get full test coverage
'ha_conntrackd_enabled': 'True',
# Conntrackd only supports 107 characters for it's control
# socket path. Thus the "state_path" should not be nested in
# a temporary directory to avoid the final path being too long.
'state_path': self.temp_dir,
'core_plugin': 'ml2',
'service_plugins': env_desc.service_plugins,
'auth_strategy': 'noauth',
@ -155,11 +160,6 @@ class NeutronConfigFixture(ConfigFixture):
def _generate_host(self):
return utils.get_rand_name(prefix='host-')
def _generate_state_path(self, temp_dir):
# Assume that temp_dir will be removed by the caller
self.state_path = tempfile.mkdtemp(prefix='state_path', dir=temp_dir)
return self.state_path
def _generate_api_paste(self):
return c_helpers.find_sample_file('api-paste.ini')

View File

@ -475,10 +475,27 @@ class TestHAL3Agent(TestL3Agent):
# Test external connectivity, failover, test again
pinger = net_helpers.Pinger(vm.namespace, external.ip, interval=0.1)
netcat_tcp = net_helpers.NetcatTester(
vm.namespace,
external.namespace,
external.ip,
3333,
net_helpers.NetcatTester.TCP,
)
netcat_udp = net_helpers.NetcatTester(
vm.namespace,
external.namespace,
external.ip,
3334,
net_helpers.NetcatTester.UDP,
)
pinger.start()
# Ensure connectivity before disconnect
vm.block_until_ping(external.ip)
netcat_tcp.establish_connection()
netcat_udp.establish_connection()
get_active_hosts = functools.partial(
self._get_hosts_with_ha_state,
@ -520,6 +537,11 @@ class TestHAL3Agent(TestL3Agent):
vm.block_until_ping(external.ip)
LOG.debug(f'Connectivity restored after {datetime.now() - start}')
# Ensure connection tracking states are synced to now active router
netcat_tcp.test_connectivity()
netcat_udp.test_connectivity()
LOG.debug(f'Connections restored after {datetime.now() - start}')
# Assert the backup host got active
timeout = self.environment.env_desc.agent_down_time * 1.2
common_utils.wait_until_true(
@ -540,6 +562,8 @@ class TestHAL3Agent(TestL3Agent):
# Stop probing processes
pinger.stop()
netcat_tcp.stop_processes()
netcat_udp.stop_processes()
# With the default advert_int of 2s the keepalived master timeout is
# about 6s. Assert less than 90 lost packets (9 seconds)
@ -567,7 +591,8 @@ class TestHAL3Agent(TestL3Agent):
def _get_state_file_for_primary_agent(self, router_id):
for host in self.environment.hosts:
keepalived_state_file = os.path.join(
host.neutron_config.state_path, "ha_confs", router_id, "state")
host.neutron_config.config.DEFAULT.state_path,
"ha_confs", router_id, "state")
if self._get_keepalived_state(keepalived_state_file) == "primary":
return keepalived_state_file

View File

@ -62,6 +62,9 @@ vrrp_instance VR_1 {
virtual_router_id 1
priority 50
garp_master_delay 60
notify_master "%(conf_dir)s/primary-backup.sh primary"
notify_backup "%(conf_dir)s/primary-backup.sh backup"
notify_fault "%(conf_dir)s/primary-backup.sh fault"
nopreempt
advert_int 2
track_interface {
@ -142,6 +145,9 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
# directly in the cfg.CONF module too
cfg.CONF.set_override('debug_iptables_rules', True, group='AGENT')
# Enable conntrackd to get full test coverage
conf.set_override('ha_conntrackd_enabled', True)
return conf
def _get_agent_ovs_integration_bridge(self, agent):
@ -474,6 +480,10 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
def get_expected_keepalive_configuration(self, router):
ha_device_name = router.get_ha_device_name()
conf_dir = os.path.join(
self.agent.conf.ha_confs_path,
router.router_id,
)
external_port = router.get_ex_gw_port()
ex_port_ipv6 = ip_lib.get_ipv6_lladdr(external_port['mac_address'])
ex_device_name = router.get_external_device_name(
@ -492,6 +502,7 @@ class L3AgentTestFramework(base.BaseSudoTestCase):
'email_from': keepalived.KEEPALIVED_EMAIL_FROM,
'router_id': keepalived.KEEPALIVED_ROUTER_ID,
'ha_device_name': ha_device_name,
'conf_dir': conf_dir,
'ex_device_name': ex_device_name,
'external_device_cidr': external_device_cidr,
'internal_device_name': internal_device_name,

View File

@ -206,6 +206,22 @@ class L3HATestCase(framework.L3AgentTestFramework):
(new_external_device_ip, external_device_name),
new_config)
def _is_conntrackd_running(self, router):
return router.conntrackd_manager.get_process().active
def test_conntrackd_running(self):
router_info = self.generate_router_info(enable_ha=True)
router = self.manage_router(self.agent, router_info)
self.assertTrue(self._is_conntrackd_running(router))
def test_conntrackd_not_enabled(self):
# Disable conntrackd support
self.agent.conf.set_override('ha_conntrackd_enabled', False)
router_info = self.generate_router_info(enable_ha=True)
router = self.manage_router(self.agent, router_info)
self.assertFalse(self._is_conntrackd_running(router))
def test_ha_router_conf_on_restarted_agent(self):
router_info = self.generate_router_info(enable_ha=True)
router1 = self.manage_router(self.agent, router_info)

View File

@ -0,0 +1,132 @@
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
from oslo_config import cfg
from neutron._i18n import _
from neutron.agent.linux import conntrackd
from neutron.agent.linux import external_process
from neutron.agent.linux import ip_lib
from neutron.common import utils as common_utils
from neutron.conf.agent.l3 import config as l3_config
from neutron.conf.agent.l3 import ha as ha_config
from neutron.tests.common import net_helpers
from neutron.tests.functional import base
from neutron.tests.unit.agent.l3.test_agent import FAKE_ID
from neutron.tests.unit.agent.linux.test_conntrackd import \
ConntrackdConfigTestCase
from neutron_lib.exceptions import ProcessExecutionError
class ConntrackdManagerTestCase(base.BaseSudoTestCase):
def setUp(self):
super(ConntrackdManagerTestCase, self).setUp()
l3_config.register_l3_agent_config_opts(l3_config.OPTS, cfg.CONF)
ha_config.register_l3_agent_ha_opts()
self.config(check_child_processes_interval=1, group='AGENT')
self.process_monitor = external_process.ProcessMonitor(cfg.CONF,
'router')
self.namespace = self.useFixture(net_helpers.NamespaceFixture()).name
self.ip_wrapper = ip_lib.IPWrapper(namespace=self.namespace)
self._prepare_device()
self.manager = conntrackd.ConntrackdManager(
FAKE_ID,
self.process_monitor,
cfg.CONF,
'192.168.0.5',
3,
'eth0',
namespace=self.namespace)
self.addCleanup(self._stop_conntrackd_manager)
def _stop_conntrackd_manager(self):
try:
self.manager.disable()
except ProcessExecutionError as process_err:
# self.manager.disable() will perform SIGTERM->wait->SIGKILL
# (if needed) on the process. However, it is sometimes possible
# that SIGKILL gets called on a process that just exited due to
# SIGTERM. Ignore this condition so the test is not marked as
# failed.
if not (len(process_err.args) > 0 and
"No such process" in process_err.args[0]):
raise
def _prepare_device(self):
# NOTE(gaudenz): this is the device used in the conntrackd config
# file
ip_device = self.ip_wrapper.add_dummy('eth0')
ip_device.link.set_up()
ip_device.addr.add('192.168.0.5/24')
def _spawn_conntrackd(self, conntrackd_manager):
conntrackd_manager.spawn()
process = conntrackd_manager.get_process()
common_utils.wait_until_true(
lambda: process.active,
timeout=5,
sleep=0.01,
exception=RuntimeError(_("Conntrackd didn't spawn")))
return process
def _get_conf_on_disk(self):
config_path = self.manager.get_conffile_path()
try:
with open(config_path) as conf:
return conf.read()
except (OSError, IOError) as e:
if e.errno != errno.ENOENT:
raise
return ''
def test_conntrackd_config(self):
self._spawn_conntrackd(self.manager)
expected_config = ConntrackdConfigTestCase.get_expected(
cfg.CONF.ha_confs_path,
)
self.assertEqual(expected_config,
self._get_conf_on_disk())
def test_conntrackd_spawn(self):
process = self._spawn_conntrackd(self.manager)
self.assertTrue(process.active)
def _test_conntrackd_respawns(self, normal_exit=True):
process = self._spawn_conntrackd(self.manager)
pid = process.pid
exit_code = '-15' if normal_exit else '-9'
# Exit the process, and see that when it comes back
# It's indeed a different process
self.ip_wrapper.netns.execute(['kill', exit_code, pid],
privsep_exec=True)
common_utils.wait_until_true(
lambda: process.active and pid != process.pid,
timeout=5,
sleep=0.01,
exception=RuntimeError(_("Conntrackd didn't respawn")))
def test_conntrackd_respawns(self):
self._test_conntrackd_respawns()
def test_conntrackd_respawn_with_unexpected_exit(self):
self._test_conntrackd_respawns(False)

View File

@ -22,6 +22,7 @@ from neutron.agent.linux import ip_lib
from neutron.agent.linux import keepalived
from neutron.common import utils as common_utils
from neutron.conf.agent.l3 import config as l3_config
from neutron.conf.agent.l3 import ha as ha_config
from neutron.tests.common import net_helpers
from neutron.tests.functional.agent.linux import helpers
from neutron.tests.functional import base
@ -35,7 +36,8 @@ class KeepalivedManagerTestCase(base.BaseSudoTestCase,
def setUp(self):
super().setUp()
l3_config.register_l3_agent_config_opts(l3_config.OPTS, cfg.CONF)
cfg.CONF.set_override('check_child_processes_interval', 1, 'AGENT')
ha_config.register_l3_agent_ha_opts()
self.config(check_child_processes_interval=1, group='AGENT')
self.expected_config = self._get_config()
self.process_monitor = external_process.ProcessMonitor(cfg.CONF,

View File

@ -98,6 +98,9 @@ class BasicRouterOperationsFramework(base.BaseTestCase):
self.conf.set_override('state_path', cfg.CONF.state_path)
self.conf.set_override('pd_dhcp_driver', '')
# Enable conntrackd support for tests for it to get full test coverage
self.conf.set_override('ha_conntrackd_enabled', True)
self.device_exists_p = mock.patch(
'neutron.agent.linux.ip_lib.device_exists')
self.device_exists = self.device_exists_p.start()

View File

@ -0,0 +1,137 @@
# Copyright (c) 2015 UnitedStack, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from textwrap import dedent
from unittest import mock
from neutron.agent.linux import conntrackd
from neutron.tests.unit.agent.l3.test_agent import \
BasicRouterOperationsFramework
from neutron.tests.unit.agent.l3.test_agent import FAKE_ID
class ConntrackdConfigTestCase(BasicRouterOperationsFramework):
@staticmethod
def get_expected(ha_confs_path):
return dedent(
"""
General {
HashSize 32768
HashLimit 131072
Syslog on
LockFile %(conf_path)s/%(uuid)s/conntrackd.lock
UNIX {
Path %(conf_path)s/%(uuid)s/conntrackd.ctl
Backlog 20
}
SocketBufferSize 262142
SocketBufferSizeMaxGrown 655355
Filter From Kernelspace {
Protocol Accept {
TCP
SCTP
DCCP
UDP
ICMP
IPv6-ICMP
}
Address Ignore {
IPv4_address 127.0.0.1
IPv6_address ::1
IPv4_address 192.168.0.5
}
}
}
Sync {
Mode FTFW {
}
Multicast Default {
IPv4_address 225.0.0.50
IPv4_interface 192.168.0.5
Group 3783
Interface eth0
SndSocketBuffer 24985600
RcvSocketBuffer 24985600
Checksum on
}
}""" % {'conf_path': ha_confs_path,
'uuid': FAKE_ID,
})
def get_manager(self):
return conntrackd.ConntrackdManager(
FAKE_ID,
self.process_monitor,
self.conf,
'192.168.0.5',
3,
'eth0',
)
def test_build_config(self):
conntrackd = self.get_manager()
with mock.patch('os.makedirs'):
config = conntrackd.build_config()
self.assertMultiLineEqual(
ConntrackdConfigTestCase.get_expected(self.conf.ha_confs_path),
config
)
def test_max_file_path_len(self):
"""The shortest file path affected by this will be the LockFile path.
There "/<uuid>/conntrackd.lock" is appended and should in total not
exceed 255 characters. So the maximum length for ha_confs_path is 202
characters.
"""
with mock.patch('os.makedirs'):
self.conf.set_override('ha_confs_path', '/' + 'a' * 202)
conntrackd = self.get_manager()
self.assertRaisesRegex(
ValueError,
'maximum length of 255 characters.',
conntrackd.build_config,
)
# If the path is below the file path limit, the UNIX socket path
# limit is hit.
self.conf.set_override('ha_confs_path', '/' + 'a' * 201)
conntrackd = self.get_manager()
self.assertRaisesRegex(
ValueError,
'maximum length of 107 characters.',
conntrackd.build_config,
)
def test_max_socket_path_len(self):
"""The UNIX socket path has a shorter maximum length of 107
characters. With "/<uuid>/conntrackd.ctl" appended this means the
maximum length for ha_confs_path is 55 characters.
"""
with mock.patch('os.makedirs'):
self.conf.set_override('ha_confs_path', '/' + 'a' * 55)
conntrackd = self.get_manager()
self.assertRaisesRegex(
ValueError,
'maximum length of 107 characters.',
conntrackd.build_config,
)
self.conf.set_override('ha_confs_path', '/' + 'a' * 54)
conntrackd = self.get_manager()
conntrackd.build_config()

View File

@ -26,6 +26,7 @@ import testtools
from neutron.agent.linux import external_process
from neutron.agent.linux import keepalived
from neutron.conf.agent.l3 import config as l3_config
from neutron.conf.agent.l3 import ha as ha_config
from neutron.tests import base
# Keepalived user guide:
@ -47,9 +48,13 @@ class KeepalivedBaseTestCase(base.BaseTestCase):
def setUp(self):
super().setUp()
l3_config.register_l3_agent_config_opts(l3_config.OPTS, cfg.CONF)
ha_config.register_l3_agent_ha_opts()
self._mock_no_track_supported = mock.patch.object(
keepalived, '_is_keepalived_use_no_track_supported')
# Enable conntrackd support for tests for it to get full test coverage
self.config(ha_conntrackd_enabled=True)
class KeepalivedGetFreeRangeTestCase(KeepalivedBaseTestCase):
def test_get_free_range(self):
@ -87,12 +92,23 @@ class KeepalivedGetFreeRangeTestCase(KeepalivedBaseTestCase):
class KeepalivedConfBaseMixin:
def _get_conntrackd_manager(self):
conntrackd_manager = mock.Mock()
conntrackd_manager.get_ha_script_path.return_value = '/tmp/ha.sh'
return conntrackd_manager
def _get_config(self, track=True):
config = keepalived.KeepalivedConf()
conntrackd_manager = self._get_conntrackd_manager()
instance1 = keepalived.KeepalivedInstance('MASTER', 'eth0', 1,
['169.254.192.0/18'],
advert_int=5)
notify_script = (cfg.CONF.ha_conntrackd_enabled and
conntrackd_manager.get_ha_script_path() or
None)
instance1 = keepalived.KeepalivedInstance(
'MASTER', 'eth0', 1, ['169.254.192.0/18'],
notify_script=notify_script,
advert_int=5)
instance1.set_authentication('AH', 'pass123')
instance1.track_interfaces.append("eth0")
@ -118,9 +134,10 @@ class KeepalivedConfBaseMixin:
"eth1", track=track)
instance1.virtual_routes.gateway_routes = [virtual_route]
instance2 = keepalived.KeepalivedInstance('MASTER', 'eth4', 2,
['169.254.192.0/18'],
mcast_src_ip='224.0.0.1')
instance2 = keepalived.KeepalivedInstance(
'MASTER', 'eth4', 2, ['169.254.192.0/18'],
notify_script=notify_script,
mcast_src_ip='224.0.0.1')
instance2.track_interfaces.append("eth4")
vip_address1 = keepalived.KeepalivedVipAddress('192.168.3.0/24',
@ -146,6 +163,9 @@ class KeepalivedConfTestCase(KeepalivedBaseTestCase,
virtual_router_id 1
priority 50
garp_master_delay 60
notify_master "/tmp/ha.sh primary"
notify_backup "/tmp/ha.sh backup"
notify_fault "/tmp/ha.sh fault"
advert_int 5
authentication {
auth_type AH
@ -173,6 +193,9 @@ class KeepalivedConfTestCase(KeepalivedBaseTestCase,
virtual_router_id 2
priority 50
garp_master_delay 60
notify_master "/tmp/ha.sh primary"
notify_backup "/tmp/ha.sh backup"
notify_fault "/tmp/ha.sh fault"
mcast_src_ip 224.0.0.1
track_interface {
eth4
@ -220,6 +243,17 @@ class KeepalivedConfTestCase(KeepalivedBaseTestCase,
current_vips = sorted(instance.get_existing_vip_ip_addresses('eth2'))
self.assertEqual(['192.168.2.0/24', '192.168.3.0/24'], current_vips)
def test_config_generation_no_conntrackd(self):
# Disable conntrackd support
self.config(ha_conntrackd_enabled=False)
config = self._get_config()
# Assert no notification scripts are configured
self.assertNotIn('notify_master', config.get_config_str())
self.assertNotIn('notify_backup', config.get_config_str())
self.assertNotIn('notify_fault', config.get_config_str())
class KeepalivedStateExceptionTestCase(KeepalivedBaseTestCase):
def test_state_exception(self):
@ -227,11 +261,11 @@ class KeepalivedStateExceptionTestCase(KeepalivedBaseTestCase):
self.assertRaises(keepalived.InvalidInstanceStateException,
keepalived.KeepalivedInstance,
invalid_vrrp_state, 'eth0', 33,
['169.254.192.0/18'])
['169.254.192.0/18'], None)
invalid_auth_type = 'into a club'
instance = keepalived.KeepalivedInstance('MASTER', 'eth0', 1,
['169.254.192.0/18'])
['169.254.192.0/18'], None)
self.assertRaises(keepalived.InvalidAuthenticationTypeException,
instance.set_authentication,
invalid_auth_type, 'some_password')
@ -316,7 +350,8 @@ class KeepalivedInstanceTestCase(KeepalivedBaseTestCase,
KeepalivedConfBaseMixin):
def test_get_primary_vip(self):
instance = keepalived.KeepalivedInstance('MASTER', 'ha0', 42,
['169.254.192.0/18'])
['169.254.192.0/18'],
None)
self.assertEqual('169.254.0.42/24', instance.get_primary_vip())
def _test_remove_addresses_by_interface(self, track=True):
@ -334,6 +369,9 @@ class KeepalivedInstanceTestCase(KeepalivedBaseTestCase,
virtual_router_id 1
priority 50
garp_master_delay 60
notify_master "/tmp/ha.sh primary"
notify_backup "/tmp/ha.sh backup"
notify_fault "/tmp/ha.sh fault"
advert_int 5
authentication {{
auth_type AH
@ -358,6 +396,9 @@ class KeepalivedInstanceTestCase(KeepalivedBaseTestCase,
virtual_router_id 2
priority 50
garp_master_delay 60
notify_master "/tmp/ha.sh primary"
notify_backup "/tmp/ha.sh backup"
notify_fault "/tmp/ha.sh fault"
mcast_src_ip 224.0.0.1
track_interface {{
eth4
@ -400,35 +441,50 @@ class KeepalivedInstanceTestCase(KeepalivedBaseTestCase,
virtual_router_id 1
priority 50
garp_master_delay 60
notify_master "/tmp/ha.sh primary"
notify_backup "/tmp/ha.sh backup"
notify_fault "/tmp/ha.sh fault"
virtual_ipaddress {
169.254.0.1/24 dev eth0
}
}""")
conntrackd_manager = self._get_conntrackd_manager()
instance = keepalived.KeepalivedInstance(
'MASTER', 'eth0', VRRP_ID, ['169.254.192.0/18'])
'MASTER', 'eth0', VRRP_ID, ['169.254.192.0/18'],
notify_script=conntrackd_manager.get_ha_script_path(),
)
self.assertEqual(expected, os.linesep.join(instance.build_config()))
def test_build_config_no_vips_track_script(self):
expected = """
vrrp_script ha_health_check_1 {
script "/etc/ha_confs/qrouter-x/ha_check_script_1.sh"
interval 5
fall 2
rise 2
}
expected = textwrap.dedent("""\
vrrp_instance VR_1 {
state MASTER
interface eth0
virtual_router_id 1
priority 50
garp_master_delay 60
virtual_ipaddress {
169.254.0.1/24 dev eth0
}
}"""
vrrp_script ha_health_check_1 {