Agent - Conntrack Helper

Implements the L3 agent conntrack helper extension.

Closes: #1823633
Change-Id: I21cc5683839bbb3fb8a649908080919c1557811d
This commit is contained in:
Harald Jensås 2019-03-28 13:54:05 +01:00
parent 16679e9700
commit b8576b7be2
5 changed files with 873 additions and 136 deletions

View File

@ -0,0 +1,280 @@
# Copyright (c) 2019 Red Hat Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron_lib.agent import l3_extension
from neutron_lib import constants
from neutron_lib import rpc as n_rpc
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
LOG = logging.getLogger(__name__)
DEFAULT_CONNTRACK_HELPER_CHAIN = 'cth'
CONNTRACK_HELPER_PREFIX = 'cthelper-'
CONNTRACK_HELPER_CHAIN_PREFIX = DEFAULT_CONNTRACK_HELPER_CHAIN + '-'
class ConntrackHelperMapping(object):
def __init__(self):
self._managed_conntrack_helpers = {}
"""
router_conntrack_helper_mapping = {
router_id_1: set(cth_id_1, cth_id_2),
router_id_2: set(cth_id_3, cth_id_4)
}
"""
self._router_conntrack_helper_mapping = collections.defaultdict(set)
def set_conntrack_helpers(self, conntrack_helpers):
for cth in conntrack_helpers:
self._router_conntrack_helper_mapping[cth.router_id].add(cth.id)
self._managed_conntrack_helpers[cth.id] = cth
def update_conntrack_helpers(self, conntrack_helpers):
for cth in conntrack_helpers:
if (cth.id not in
self._router_conntrack_helper_mapping[cth.router_id]):
self._router_conntrack_helper_mapping[cth.router_id].add(
cth.id)
self._managed_conntrack_helpers[cth.id] = cth
def get_conntack_helper(self, conntrack_helper_id):
return self._managed_conntrack_helpers.get(conntrack_helper_id)
def get_managed_conntrack_helpers(self):
return self._managed_conntrack_helpers
def del_conntrack_helpers(self, conntrack_helpers):
for cth in conntrack_helpers:
if not self.get_conntack_helper(cth.id):
continue
del self._managed_conntrack_helpers[cth.id]
self._router_conntrack_helper_mapping[cth.router_id].remove(
cth.id)
if not self._router_conntrack_helper_mapping[cth.router_id]:
del self._router_conntrack_helper_mapping[cth.router_id]
def clear_by_router_id(self, router_id):
router_cth_ids = self._router_conntrack_helper_mapping.get(router_id)
if not router_cth_ids:
return
for cth_id in router_cth_ids:
del self._managed_conntrack_helpers[cth_id]
del self._router_conntrack_helper_mapping[router_id]
def check_conntrack_helper_changes(self, new_cth):
old_cth = self.get_conntack_helper(new_cth.id)
return old_cth != new_cth
class ConntrackHelperAgentExtension(l3_extension.L3AgentExtension):
SUPPORTED_RESOURCE_TYPES = [resources.CONNTRACKHELPER]
def initialize(self, connection, driver_type):
self.resource_rpc = resources_rpc.ResourcesPullRpcApi()
self._register_rpc_consumers()
self.mapping = ConntrackHelperMapping()
def _register_rpc_consumers(self):
registry.register(self._handle_notification, resources.CONNTRACKHELPER)
self._connection = n_rpc.Connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
topic = resources_rpc.resource_type_versioned_topic(
resources.CONNTRACKHELPER)
self._connection.create_consumer(topic, endpoints, fanout=True)
self._connection.consume_in_threads()
def consume_api(self, agent_api):
self.agent_api = agent_api
@lockutils.synchronized('conntrack-helpers')
def _handle_notification(self, context, resource_type, conntrack_helpers,
event_type):
for conntrack_helper in conntrack_helpers:
router_info = self.agent_api.get_router_info(
conntrack_helper.router_id)
if not router_info:
return
iptables_manager = self._get_iptables_manager(router_info)
if event_type == events.CREATED:
self._process_create([conntrack_helper], iptables_manager)
elif event_type == events.UPDATED:
self._process_update([conntrack_helper], iptables_manager)
elif event_type == events.DELETED:
self._process_delete([conntrack_helper], iptables_manager)
def _get_chain_name(self, id):
return (CONNTRACK_HELPER_CHAIN_PREFIX + id)[
:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
def _install_default_rules(self, iptables_manager, version):
default_rule = '-j %s-%s' % (iptables_manager.wrap_name,
DEFAULT_CONNTRACK_HELPER_CHAIN)
if version == constants.IPv4:
iptables_manager.ipv4['raw'].add_chain(
DEFAULT_CONNTRACK_HELPER_CHAIN)
iptables_manager.ipv4['raw'].add_rule('PREROUTING', default_rule)
elif version == constants.IPv6:
iptables_manager.ipv6['raw'].add_chain(
DEFAULT_CONNTRACK_HELPER_CHAIN)
iptables_manager.ipv6['raw'].add_rule('PREROUTING', default_rule)
iptables_manager.apply()
def _get_chain_rules_list(self, conntrack_helper, wrap_name):
chain_name = self._get_chain_name(conntrack_helper.id)
chain_rule_list = [(DEFAULT_CONNTRACK_HELPER_CHAIN,
'-j %s-%s' % (wrap_name, chain_name))]
chain_rule_list.append((chain_name,
'-p %(proto)s --dport %(dport)s -j CT '
'--helper %(helper)s' %
{'proto': conntrack_helper.protocol,
'dport': conntrack_helper.port,
'helper': conntrack_helper.helper}))
return chain_rule_list
def _rule_apply(self, iptables_manager, conntrack_helper):
tag = CONNTRACK_HELPER_PREFIX + conntrack_helper.id
iptables_manager.ipv4['raw'].clear_rules_by_tag(tag)
iptables_manager.ipv6['raw'].clear_rules_by_tag(tag)
for chain, rule in self._get_chain_rules_list(
conntrack_helper, iptables_manager.wrap_name):
if chain not in iptables_manager.ipv4['raw'].chains:
iptables_manager.ipv4['raw'].add_chain(chain)
if chain not in iptables_manager.ipv6['raw'].chains:
iptables_manager.ipv6['raw'].add_chain(chain)
iptables_manager.ipv4['raw'].add_rule(chain, rule, tag=tag)
iptables_manager.ipv6['raw'].add_rule(chain, rule, tag=tag)
def _process_create(self, conntrack_helpers, iptables_manager):
if not conntrack_helpers:
return
if (DEFAULT_CONNTRACK_HELPER_CHAIN not in
iptables_manager.ipv4['raw'].chains):
self._install_default_rules(iptables_manager, constants.IPv4)
if (DEFAULT_CONNTRACK_HELPER_CHAIN not in
iptables_manager.ipv6['raw'].chains):
self._install_default_rules(iptables_manager, constants.IPv6)
for conntrack_helper in conntrack_helpers:
self._rule_apply(iptables_manager, conntrack_helper)
iptables_manager.apply()
self.mapping.set_conntrack_helpers(conntrack_helpers)
def _process_update(self, conntrack_helpers, iptables_manager):
if not conntrack_helpers:
return
for conntrack_helper in conntrack_helpers:
if not self.mapping.check_conntrack_helper_changes(
conntrack_helper):
LOG.debug("Skip conntrack helper %s for update, as there is "
"no difference between the memory managed by agent",
conntrack_helper.id)
continue
current_chain = self._get_chain_name(conntrack_helper.id)
iptables_manager.ipv4['raw'].remove_chain(current_chain)
iptables_manager.ipv6['raw'].remove_chain(current_chain)
self._rule_apply(iptables_manager, conntrack_helper)
iptables_manager.apply()
self.mapping.update_conntrack_helpers(conntrack_helpers)
def _process_delete(self, conntrack_helpers, iptables_manager):
if not conntrack_helpers:
return
for conntrack_helper in conntrack_helpers:
chain_name = self._get_chain_name(conntrack_helper.id)
iptables_manager.ipv4['raw'].remove_chain(chain_name)
iptables_manager.ipv6['raw'].remove_chain(chain_name)
iptables_manager.apply()
self.mapping.del_conntrack_helpers(conntrack_helpers)
def _get_iptables_manager(self, router_info):
if router_info.router.get('distributed'):
return router_info.snat_iptables_manager
return router_info.iptables_manager
def check_local_conntrack_helpers(self, context, router_info):
local_ct_helpers = set(self.mapping.get_managed_conntrack_helpers()
.keys())
new_ct_helpers = []
updated_cth_helpers = []
current_ct_helpers = set()
ct_helpers = self.resource_rpc.bulk_pull(
context, resources.CONNTRACKHELPER, filter_kwargs={
'router_id': router_info.router['id']})
for cth in ct_helpers:
# Split request conntrack helpers into update, new and current
if (cth.id in self.mapping.get_managed_conntrack_helpers() and
self.mapping.check_conntrack_helper_changes(cth)):
updated_cth_helpers.append(cth)
elif cth.id not in self.mapping.get_managed_conntrack_helpers():
new_ct_helpers.append(cth)
current_ct_helpers.add(cth.id)
remove_ct_helpers = [
self.mapping.get_managed_conntrack_helpers().get(cth_id) for cth_id
in local_ct_helpers.difference(current_ct_helpers)]
iptables_manager = self._get_iptables_manager(router_info)
self._process_update(updated_cth_helpers, iptables_manager)
self._process_create(new_ct_helpers, iptables_manager)
self._process_delete(remove_ct_helpers, iptables_manager)
def process_conntrack_helper(self, context, data):
router_info = self.agent_api.get_router_info(data['id'])
if not router_info:
LOG.debug("Router %s is not managed by this agent. "
"It was possibly deleted concurrently.", data['id'])
return
self.check_local_conntrack_helpers(context, router_info)
@lockutils.synchronized('conntrack-helpers')
def add_router(self, context, data):
self.process_conntrack_helper(context, data)
@lockutils.synchronized('conntrack-helpers')
def update_router(self, context, data):
self.process_conntrack_helper(context, data)
def delete_router(self, context, data):
self.mapping.clear_by_router_id(data['id'])
def ha_state_change(self, context, data):
pass

View File

@ -0,0 +1,136 @@
# Copyright (c) 2019 Red Hat Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import mock
from neutron_lib import constants
from oslo_utils import uuidutils
from neutron.agent.l3 import agent as neutron_l3_agent
from neutron.agent.l3.extensions import conntrack_helper
from neutron.agent.linux import iptables_manager as iptable_mng
from neutron.common import utils as common_utils
from neutron.objects import conntrack_helper as cth_obj
from neutron.tests.functional.agent.l3 import framework
from neutron.tests.functional.agent.l3 import test_dvr_router
class L3AgentConntrackHelperExtensionTestFramework(
framework.L3AgentTestFramework):
def setUp(self):
super(L3AgentConntrackHelperExtensionTestFramework, self).setUp()
self.conf.set_override('extensions', ['conntrack_helper'], 'agent')
self.agent = neutron_l3_agent.L3NATAgentWithStateReport('agent1',
self.conf)
self.cth_ext = conntrack_helper.ConntrackHelperAgentExtension()
self.router_id1 = uuidutils.generate_uuid()
self.router_id2 = uuidutils.generate_uuid()
self.conntrackhelper1 = cth_obj.ConntrackHelper(
context=None, id=uuidutils.generate_uuid(), protocol='udp',
port=69, helper='tftp', router_id=self.router_id1)
self.conntrackhelper2 = cth_obj.ConntrackHelper(
context=None, id=uuidutils.generate_uuid(), protocol='tcp',
port=21, helper='ftp', router_id=self.router_id2)
self.conntrack_helpers = [self.conntrackhelper1, self.conntrackhelper2]
self.managed_cths = {}
self.managed_cths[self.conntrackhelper1.id] = self.conntrackhelper1
self.managed_cths[self.conntrackhelper2.id] = self.conntrackhelper2
self.router_cth_map = collections.defaultdict(set)
self.router_cth_map[self.router_id1].add(self.conntrackhelper1.id)
self.router_cth_map[self.router_id2].add(self.conntrackhelper2.id)
self._set_bulk_poll_mock()
def _set_bulk_poll_mock(self):
def _bulk_pull_mock(context, resource_type, filter_kwargs=None):
if 'router_id' in filter_kwargs:
result = []
for cthobj in self.conntrack_helpers:
if cthobj.router_id in filter_kwargs['router_id']:
result.append(cthobj)
return result
return self.conntrack_helpers
self.bulk_pull = mock.patch('neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPullRpcApi.bulk_pull').start()
self.bulk_pull.side_effect = _bulk_pull_mock
def _assert_conntrack_helper_iptables_is_set(self, router_info, cth):
iptables_manager = self.cth_ext._get_iptables_manager(router_info)
tag = conntrack_helper.CONNTRACK_HELPER_PREFIX + cth.id
chain_name = (conntrack_helper.CONNTRACK_HELPER_CHAIN_PREFIX +
cth.id)[:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
rule = ('-p %s --dport %s -j CT --helper %s' %
(cth.protocol, cth.port, cth.helper))
rule_obj = iptable_mng.IptablesRule(chain_name, rule, True, False,
iptables_manager.wrap_name, tag,
None)
def check_chain_rules_set():
existing_ipv4_chains = iptables_manager.ipv4['raw'].chains
existing_ipv6_chains = iptables_manager.ipv6['raw'].chains
if (chain_name not in existing_ipv4_chains or
chain_name not in existing_ipv6_chains):
return False
existing_ipv4_rules = iptables_manager.ipv4['raw'].rules
existing_ipv6_rules = iptables_manager.ipv6['raw'].rules
return (rule_obj in existing_ipv4_rules and
rule_obj in existing_ipv6_rules)
common_utils.wait_until_true(check_chain_rules_set)
def _test_centralized_routers(self, router_info):
router_id = router_info['id']
for cthobj in self.conntrack_helpers:
cthobj.router_id = router_id
router_info['managed_conntrack_helpers'] = self.managed_cths
router_info['router_conntrack_helper_mapping'] = self.router_cth_map
ri = self.manage_router(self.agent, router_info)
for cthobj in self.conntrack_helpers:
self._assert_conntrack_helper_iptables_is_set(ri, cthobj)
class TestL3AgentConntrackHelperExtension(
test_dvr_router.DvrRouterTestFramework,
L3AgentConntrackHelperExtensionTestFramework):
def test_legacy_router_conntrack_helper(self):
router_info = self.generate_router_info(enable_ha=False)
self._test_centralized_routers(router_info)
def test_ha_router_conntrack_helper(self):
router_info = self.generate_router_info(enable_ha=True)
self._test_centralized_routers(router_info)
def test_dvr_edge_router(self):
self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT
router_info = self.generate_dvr_router_info(enable_ha=False)
self._test_centralized_routers(router_info)
def test_dvr_ha_router(self):
self.agent.conf.agent_mode = constants.L3_AGENT_MODE_DVR_SNAT
router_info = self.generate_dvr_router_info(enable_ha=True)
self._test_centralized_routers(router_info)

View File

@ -43,7 +43,145 @@ from neutron.tests.functional.agent.l3 import framework
DEVICE_OWNER_COMPUTE = lib_constants.DEVICE_OWNER_COMPUTE_PREFIX + 'fake'
class TestDvrRouter(framework.L3AgentTestFramework):
class DvrRouterTestFramework(framework.L3AgentTestFramework):
def generate_dvr_router_info(self,
enable_ha=False,
enable_snat=False,
enable_gw=True,
snat_bound_fip=False,
agent=None,
extra_routes=False,
enable_floating_ip=True,
enable_centralized_fip=False,
vrrp_id=None,
**kwargs):
if not agent:
agent = self.agent
router = l3_test_common.prepare_router_data(
enable_snat=enable_snat,
enable_floating_ip=enable_floating_ip,
enable_ha=enable_ha,
extra_routes=extra_routes,
num_internal_ports=2,
enable_gw=enable_gw,
snat_bound_fip=snat_bound_fip,
vrrp_id=vrrp_id,
**kwargs)
internal_ports = router.get(lib_constants.INTERFACE_KEY, [])
router['distributed'] = True
router['gw_port_host'] = agent.conf.host
if enable_floating_ip:
for floating_ip in router[lib_constants.FLOATINGIP_KEY]:
floating_ip['host'] = agent.conf.host
if enable_floating_ip and enable_centralized_fip:
# For centralizing the fip, we are emulating the legacy
# router behavior were the fip dict does not contain any
# host information.
router[lib_constants.FLOATINGIP_KEY][0]['host'] = None
# In order to test the mixed dvr_snat and compute scenario, we create
# two floating IPs, one is distributed, another is centralized.
# The distributed floating IP should have the host, which was
# just set to None above, then we set it back. The centralized
# floating IP has host None, and this IP will be used to test
# migration from centralized to distributed.
if snat_bound_fip:
router[lib_constants.FLOATINGIP_KEY][0]['host'] = agent.conf.host
router[lib_constants.FLOATINGIP_KEY][1][
lib_constants.DVR_SNAT_BOUND] = True
router[lib_constants.FLOATINGIP_KEY][1]['host'] = None
if enable_gw:
external_gw_port = router['gw_port']
router['gw_port'][portbindings.HOST_ID] = agent.conf.host
self._add_snat_port_info_to_router(router, internal_ports)
# FIP has a dependency on external gateway. So we need to create
# the snat_port info and fip_agent_gw_port_info irrespective of
# the agent type the dvr supports. The namespace creation is
# dependent on the agent_type.
if enable_floating_ip:
for index, floating_ip in enumerate(router['_floatingips']):
floating_ip['floating_network_id'] = (
external_gw_port['network_id'])
floating_ip['port_id'] = internal_ports[index]['id']
floating_ip['status'] = 'ACTIVE'
self._add_fip_agent_gw_port_info_to_router(router,
external_gw_port)
# Router creation is delegated to router_factory. We have to
# re-register here so that factory can find override agent mode
# normally.
self.agent._register_router_cls(self.agent.router_factory)
return router
def _add_fip_agent_gw_port_info_to_router(self, router, external_gw_port):
# Add fip agent gateway port information to the router_info
fip_gw_port_list = router.get(
lib_constants.FLOATINGIP_AGENT_INTF_KEY, [])
if not fip_gw_port_list and external_gw_port:
# Get values from external gateway port
fixed_ip = external_gw_port['fixed_ips'][0]
float_subnet = external_gw_port['subnets'][0]
port_ip = fixed_ip['ip_address']
# Pick an ip address which is not the same as port_ip
fip_gw_port_ip = str(netaddr.IPAddress(port_ip) + 5)
# Add floatingip agent gateway port info to router
prefixlen = netaddr.IPNetwork(float_subnet['cidr']).prefixlen
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = [
{'subnets': [
{'cidr': float_subnet['cidr'],
'gateway_ip': float_subnet['gateway_ip'],
'id': fixed_ip['subnet_id']}],
'extra_subnets': external_gw_port['extra_subnets'],
'network_id': external_gw_port['network_id'],
'device_owner': lib_constants.DEVICE_OWNER_AGENT_GW,
'mac_address': 'fa:16:3e:80:8d:89',
portbindings.HOST_ID: self.agent.conf.host,
'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'],
'ip_address': fip_gw_port_ip,
'prefixlen': prefixlen}],
'id': framework._uuid(),
'device_id': framework._uuid()}
]
def _add_snat_port_info_to_router(self, router, internal_ports):
# Add snat port information to the router
snat_port_list = router.get(lib_constants.SNAT_ROUTER_INTF_KEY, [])
if not snat_port_list and internal_ports:
router[lib_constants.SNAT_ROUTER_INTF_KEY] = []
for port in internal_ports:
# Get values from internal port
fixed_ip = port['fixed_ips'][0]
snat_subnet = port['subnets'][0]
port_ip = fixed_ip['ip_address']
# Pick an ip address which is not the same as port_ip
snat_ip = str(netaddr.IPAddress(port_ip) + 5)
# Add the info to router as the first snat port
# in the list of snat ports
prefixlen = netaddr.IPNetwork(snat_subnet['cidr']).prefixlen
snat_router_port = {
'subnets': [
{'cidr': snat_subnet['cidr'],
'gateway_ip': snat_subnet['gateway_ip'],
'id': fixed_ip['subnet_id']}],
'network_id': port['network_id'],
'device_owner': lib_constants.DEVICE_OWNER_ROUTER_SNAT,
'mac_address': 'fa:16:3e:80:8d:89',
'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'],
'ip_address': snat_ip,
'prefixlen': prefixlen}],
'id': framework._uuid(),
'device_id': framework._uuid()}
# Get the address scope if there is any
if 'address_scopes' in port:
snat_router_port['address_scopes'] = port['address_scopes']
router[lib_constants.SNAT_ROUTER_INTF_KEY].append(
snat_router_port)
class TestDvrRouter(DvrRouterTestFramework, framework.L3AgentTestFramework):
def manage_router(self, agent, router):
def _safe_fipnamespace_delete_on_ext_net(ext_net_id):
try:
@ -513,77 +651,6 @@ class TestDvrRouter(framework.L3AgentTestFramework):
self._assert_router_does_not_exist(router)
self._assert_snat_namespace_does_not_exist(router)
def generate_dvr_router_info(self,
enable_ha=False,
enable_snat=False,
enable_gw=True,
snat_bound_fip=False,
agent=None,
extra_routes=False,
enable_floating_ip=True,
enable_centralized_fip=False,
vrrp_id=None,
**kwargs):
if not agent:
agent = self.agent
router = l3_test_common.prepare_router_data(
enable_snat=enable_snat,
enable_floating_ip=enable_floating_ip,
enable_ha=enable_ha,
extra_routes=extra_routes,
num_internal_ports=2,
enable_gw=enable_gw,
snat_bound_fip=snat_bound_fip,
vrrp_id=vrrp_id,
**kwargs)
internal_ports = router.get(lib_constants.INTERFACE_KEY, [])
router['distributed'] = True
router['gw_port_host'] = agent.conf.host
if enable_floating_ip:
for floating_ip in router[lib_constants.FLOATINGIP_KEY]:
floating_ip['host'] = agent.conf.host
if enable_floating_ip and enable_centralized_fip:
# For centralizing the fip, we are emulating the legacy
# router behavior were the fip dict does not contain any
# host information.
router[lib_constants.FLOATINGIP_KEY][0]['host'] = None
# In order to test the mixed dvr_snat and compute scenario, we create
# two floating IPs, one is distributed, another is centralized.
# The distributed floating IP should have the host, which was
# just set to None above, then we set it back. The centralized
# floating IP has host None, and this IP will be used to test
# migration from centralized to distributed.
if snat_bound_fip:
router[lib_constants.FLOATINGIP_KEY][0]['host'] = agent.conf.host
router[lib_constants.FLOATINGIP_KEY][1][
lib_constants.DVR_SNAT_BOUND] = True
router[lib_constants.FLOATINGIP_KEY][1]['host'] = None
if enable_gw:
external_gw_port = router['gw_port']
router['gw_port'][portbindings.HOST_ID] = agent.conf.host
self._add_snat_port_info_to_router(router, internal_ports)
# FIP has a dependency on external gateway. So we need to create
# the snat_port info and fip_agent_gw_port_info irrespective of
# the agent type the dvr supports. The namespace creation is
# dependent on the agent_type.
if enable_floating_ip:
for index, floating_ip in enumerate(router['_floatingips']):
floating_ip['floating_network_id'] = (
external_gw_port['network_id'])
floating_ip['port_id'] = internal_ports[index]['id']
floating_ip['status'] = 'ACTIVE'
self._add_fip_agent_gw_port_info_to_router(router,
external_gw_port)
# Router creation is delegated to router_factory. We have to
# re-register here so that factory can find override agent mode
# normally.
self.agent._register_router_cls(self.agent.router_factory)
return router
def _get_fip_agent_gw_port_for_router(self, external_gw_port):
# Add fip agent gateway port information to the router_info
if external_gw_port:
@ -613,70 +680,6 @@ class TestDvrRouter(framework.L3AgentTestFramework):
}
return fip_agent_gw_port_info
def _add_fip_agent_gw_port_info_to_router(self, router, external_gw_port):
# Add fip agent gateway port information to the router_info
fip_gw_port_list = router.get(
lib_constants.FLOATINGIP_AGENT_INTF_KEY, [])
if not fip_gw_port_list and external_gw_port:
# Get values from external gateway port
fixed_ip = external_gw_port['fixed_ips'][0]
float_subnet = external_gw_port['subnets'][0]
port_ip = fixed_ip['ip_address']
# Pick an ip address which is not the same as port_ip
fip_gw_port_ip = str(netaddr.IPAddress(port_ip) + 5)
# Add floatingip agent gateway port info to router
prefixlen = netaddr.IPNetwork(float_subnet['cidr']).prefixlen
router[lib_constants.FLOATINGIP_AGENT_INTF_KEY] = [
{'subnets': [
{'cidr': float_subnet['cidr'],
'gateway_ip': float_subnet['gateway_ip'],
'id': fixed_ip['subnet_id']}],
'extra_subnets': external_gw_port['extra_subnets'],
'network_id': external_gw_port['network_id'],
'device_owner': lib_constants.DEVICE_OWNER_AGENT_GW,
'mac_address': 'fa:16:3e:80:8d:89',
portbindings.HOST_ID: self.agent.conf.host,
'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'],
'ip_address': fip_gw_port_ip,
'prefixlen': prefixlen}],
'id': framework._uuid(),
'device_id': framework._uuid()}
]
def _add_snat_port_info_to_router(self, router, internal_ports):
# Add snat port information to the router
snat_port_list = router.get(lib_constants.SNAT_ROUTER_INTF_KEY, [])
if not snat_port_list and internal_ports:
router[lib_constants.SNAT_ROUTER_INTF_KEY] = []
for port in internal_ports:
# Get values from internal port
fixed_ip = port['fixed_ips'][0]
snat_subnet = port['subnets'][0]
port_ip = fixed_ip['ip_address']
# Pick an ip address which is not the same as port_ip
snat_ip = str(netaddr.IPAddress(port_ip) + 5)
# Add the info to router as the first snat port
# in the list of snat ports
prefixlen = netaddr.IPNetwork(snat_subnet['cidr']).prefixlen
snat_router_port = {
'subnets': [
{'cidr': snat_subnet['cidr'],
'gateway_ip': snat_subnet['gateway_ip'],
'id': fixed_ip['subnet_id']}],
'network_id': port['network_id'],
'device_owner': lib_constants.DEVICE_OWNER_ROUTER_SNAT,
'mac_address': 'fa:16:3e:80:8d:89',
'fixed_ips': [{'subnet_id': fixed_ip['subnet_id'],
'ip_address': snat_ip,
'prefixlen': prefixlen}],
'id': framework._uuid(),
'device_id': framework._uuid()}
# Get the address scope if there is any
if 'address_scopes' in port:
snat_router_port['address_scopes'] = port['address_scopes']
router[lib_constants.SNAT_ROUTER_INTF_KEY].append(
snat_router_port)
def _assert_dvr_external_device(self, router):
external_port = router.get_ex_gw_port()
snat_ns_name = dvr_snat_ns.SnatNamespace.get_snat_ns_name(

View File

@ -0,0 +1,317 @@
# Copyright (c) 2019 Red Hat Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from neutron_lib import constants
from neutron_lib import context
from oslo_utils import uuidutils
from neutron.agent.l3 import agent as l3_agent
from neutron.agent.l3.extensions import conntrack_helper as cth
from neutron.agent.l3 import l3_agent_extension_api as l3_ext_api
from neutron.agent.l3 import router_info as l3router
from neutron.agent.linux import iptables_manager
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron.objects import conntrack_helper as cth_obj
from neutron.tests import base
from neutron.tests.unit.agent.l3 import test_agent
BINARY_NAME = iptables_manager.get_binary_name()
DEFAULT_RULE = ('PREROUTING', '-j %s-' % BINARY_NAME +
cth.DEFAULT_CONNTRACK_HELPER_CHAIN)
HOSTNAME = 'testhost'
class ConntrackHelperExtensionBaseTestCase(
test_agent.BasicRouterOperationsFramework):
def setUp(self):
super(ConntrackHelperExtensionBaseTestCase, self).setUp()
self.cth_ext = cth.ConntrackHelperAgentExtension()
self.context = context.get_admin_context()
self.connection = mock.Mock()
self.router_id = uuidutils.generate_uuid()
self.conntrack_helper1 = cth_obj.ConntrackHelper(
context=None, id=uuidutils.generate_uuid(), protocol='udp',
port=69, helper='tftp', router_id=self.router_id)
self.agent = l3_agent.L3NATAgent(HOSTNAME, self.conf)
self.router = {'id': self.router_id,
'ha': False,
'distributed': False}
self.router_info = l3router.RouterInfo(self.agent, self.router_id,
self.router, **self.ri_kwargs)
self.agent.router_info[self.router['id']] = self.router_info
self.get_router_info = mock.patch(
'neutron.agent.l3.l3_agent_extension_api.'
'L3AgentExtensionAPI.get_router_info').start()
self.get_router_info.return_value = self.router_info
self.agent_api = l3_ext_api.L3AgentExtensionAPI(None, None)
self.cth_ext.consume_api(self.agent_api)
self.conntrack_helpers = [self.conntrack_helper1]
class ConntrackHelperExtensionInitializeTestCase(
ConntrackHelperExtensionBaseTestCase):
@mock.patch.object(registry, 'register')
@mock.patch.object(resources_rpc, 'ResourcesPushRpcCallback')
def test_initialize_subscribed_to_rpc(self, rpc_mock, subscribe_mock):
call_to_patch = 'neutron_lib.rpc.Connection'
with mock.patch(call_to_patch,
return_value=self.connection) as create_connection:
self.cth_ext.initialize(
self.connection, constants.L3_AGENT_MODE)
create_connection.assert_has_calls([mock.call()])
self.connection.create_consumer.assert_has_calls(
[mock.call(
resources_rpc.resource_type_versioned_topic(
resources.CONNTRACKHELPER),
[rpc_mock()],
fanout=True)]
)
subscribe_mock.assert_called_with(
mock.ANY, resources.CONNTRACKHELPER)
class ConntrackHelperExtensionTestCase(ConntrackHelperExtensionBaseTestCase):
def setUp(self):
super(ConntrackHelperExtensionTestCase, self).setUp()
self.cth_ext.initialize(
self.connection, constants.L3_AGENT_MODE)
self._set_bulk_pull_mock()
def _set_bulk_pull_mock(self):
def _bulk_pull_mock(context, resource_type, filter_kwargs=None):
if 'router_id' in filter_kwargs:
result = []
for cthobj in self.conntrack_helpers:
if cthobj.router_id in filter_kwargs['router_id']:
result.append(cthobj)
return result
return self.conntrack_helpers
self.bulk_pull = mock.patch(
'neutron.api.rpc.handlers.resources_rpc.'
'ResourcesPullRpcApi.bulk_pull').start()
self.bulk_pull.side_effect = _bulk_pull_mock
@mock.patch.object(iptables_manager.IptablesTable, 'add_rule')
@mock.patch.object(iptables_manager.IptablesTable, 'add_chain')
def test_create_router(self, mock_add_chain, mock_add_rule):
self.cth_ext.add_router(self.context, self.router)
chain_name = (cth.CONNTRACK_HELPER_CHAIN_PREFIX +
self.conntrack_helper1.id)[
:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
chain_rule = ('-p %(protocol)s --dport %(dport)s -j CT --helper '
'%(helper)s' %
{'protocol': self.conntrack_helper1.protocol,
'dport': self.conntrack_helper1.port,
'helper': self.conntrack_helper1.helper})
tag = cth.CONNTRACK_HELPER_PREFIX + self.conntrack_helper1.id
self.assertEqual(mock_add_chain.call_count, 6)
self.assertEqual(mock_add_rule.call_count, 6)
mock_add_chain.assert_has_calls([
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN),
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN),
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN),
mock.call(chain_name),
mock.call(chain_name)
])
mock_add_rule.assert_has_calls([
mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]),
mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]),
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN, '-j %s-' %
BINARY_NAME + chain_name, tag=tag),
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN, '-j %s-' %
BINARY_NAME + chain_name, tag=tag),
mock.call(chain_name, chain_rule, tag=tag),
mock.call(chain_name, chain_rule, tag=tag)
])
@mock.patch.object(iptables_manager.IptablesTable, 'add_rule')
@mock.patch.object(iptables_manager.IptablesTable, 'add_chain')
def test_update_roter(self, mock_add_chain, mock_add_rule):
self.cth_ext.add_router(self.context, self.router)
mock_add_chain.reset_mock()
mock_add_rule.reset_mock()
self.cth_ext.update_router(self.context, self.router)
mock_add_chain.assert_not_called()
mock_add_rule.assert_not_called()
@mock.patch.object(iptables_manager.IptablesTable, 'add_rule')
@mock.patch.object(iptables_manager.IptablesTable, 'add_chain')
def test_add_conntrack_helper_update_router(self, mock_add_chain,
mock_add_rule):
self.cth_ext.add_router(self.context, self.router)
# Create another conntrack helper with the same router_id
mock_add_chain.reset_mock()
mock_add_rule.reset_mock()
test_conntrackhelper = cth_obj.ConntrackHelper(
context=None,
id=uuidutils.generate_uuid(),
protocol='tcp',
port=21,
helper='ftp',
router_id=self.conntrack_helper1.router_id)
self.conntrack_helpers.append(test_conntrackhelper)
self.cth_ext.update_router(self.context, self.router)
chain_name = (cth.CONNTRACK_HELPER_CHAIN_PREFIX +
test_conntrackhelper.id)[
:constants.MAX_IPTABLES_CHAIN_LEN_WRAP]
chain_rule = ('-p %(protocol)s --dport %(dport)s -j CT --helper '
'%(helper)s' %
{'protocol': test_conntrackhelper.protocol,
'dport': test_conntrackhelper.port,
'helper': test_conntrackhelper.helper})
tag = cth.CONNTRACK_HELPER_PREFIX + test_conntrackhelper.id
self.assertEqual(mock_add_chain.call_count, 6)
self.assertEqual(mock_add_rule.call_count, 6)
mock_add_chain.assert_has_calls([
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN),
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN),
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN),
mock.call(chain_name),
mock.call(chain_name)
])
mock_add_rule.assert_has_calls([
mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]),
mock.call(DEFAULT_RULE[0], DEFAULT_RULE[1]),
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN, '-j %s-' %
BINARY_NAME + chain_name, tag=tag),
mock.call(cth.DEFAULT_CONNTRACK_HELPER_CHAIN, '-j %s-' %
BINARY_NAME + chain_name, tag=tag),
mock.call(chain_name, chain_rule, tag=tag),
mock.call(chain_name, chain_rule, tag=tag)
])
@mock.patch.object(cth.ConntrackHelperMapping, 'clear_by_router_id')
def test_delete_router(self, mock_clear_by_router_id):
router_data = {'id': self.router_id,
'ha': False,
'distributed': False}
self.cth_ext.delete_router(self.context, router_data)
mock_clear_by_router_id.assert_called_with(self.router_id)
class ConntrackHelperMappingTestCase(base.BaseTestCase):
def setUp(self):
super(ConntrackHelperMappingTestCase, self).setUp()
self.mapping = cth.ConntrackHelperMapping()
self.router1 = uuidutils.generate_uuid()
self.router2 = uuidutils.generate_uuid()
self.conntrack_helper1 = cth_obj.ConntrackHelper(
context=None, id=uuidutils.generate_uuid(), protocol='udp',
port=69, helper='tftp', router_id=self.router1)
self.conntrack_helper2 = cth_obj.ConntrackHelper(
context=None, id=uuidutils.generate_uuid(), protocol='udp',
port=69, helper='tftp', router_id=self.router2)
self.conntrack_helper3 = cth_obj.ConntrackHelper(
context=None, id=uuidutils.generate_uuid(), protocol='udp',
port=21, helper='ftp', router_id=self.router1)
self.conntrack_helper4 = cth_obj.ConntrackHelper(
context=None, id=uuidutils.generate_uuid(), protocol='udp',
port=21, helper='ftp', router_id=self.router2)
self.conntrack_helper_dict = {
self.conntrack_helper1.id: self.conntrack_helper1,
self.conntrack_helper2.id: self.conntrack_helper2,
self.conntrack_helper3.id: self.conntrack_helper3,
self.conntrack_helper4.id: self.conntrack_helper4}
def _set_cth(self):
self.mapping.set_conntrack_helpers(
self.conntrack_helper_dict.values())
def test_set_conntrack_helpers(self):
self._set_cth()
cth_ids = self.conntrack_helper_dict.keys()
managed_cths = self.mapping.get_managed_conntrack_helpers()
for cth_id, obj in managed_cths.items():
self.assertIn(cth_id, cth_ids)
self.assertEqual(obj, self.conntrack_helper_dict[cth_id])
self.assertEqual(
len(cth_ids), len(managed_cths.keys()))
def test_update_conntrack_helper(self):
self._set_cth()
new_conntrack_helper1 = cth_obj.ConntrackHelper(
context=None, id=self.conntrack_helper1.id, protocol='udp',
port=6969, helper='tftp', router_id=self.router1)
self.mapping.update_conntrack_helpers([new_conntrack_helper1])
managed_cths = self.mapping.get_managed_conntrack_helpers()
self.assertEqual(
new_conntrack_helper1,
managed_cths[self.conntrack_helper1.id])
for router_id in self.mapping._router_conntrack_helper_mapping.keys():
self.assertIn(router_id, [self.router1, self.router2])
self.assertEqual(
len([self.router1, self.router2]),
len(self.mapping._router_conntrack_helper_mapping.keys()))
def test_del_conntrack_helper(self):
self._set_cth()
self.mapping.del_conntrack_helpers([self.conntrack_helper3,
self.conntrack_helper2,
self.conntrack_helper4])
managed_cths = self.mapping.get_managed_conntrack_helpers()
self.assertEqual([self.conntrack_helper1.id],
list(managed_cths.keys()))
self.assertNotIn(self.conntrack_helper3.id,
self.mapping._router_conntrack_helper_mapping[
self.conntrack_helper3.router_id])
self.assertNotIn(self.router2,
self.mapping._router_conntrack_helper_mapping.keys())
def test_clear_by_router_id(self):
self._set_cth()
self.mapping.clear_by_router_id(self.router2)
managed_cths = self.mapping.get_managed_conntrack_helpers()
self.assertNotIn(self.conntrack_helper2, managed_cths.keys())
self.assertNotIn(self.conntrack_helper4, managed_cths.keys())
def test_check_conntrack_helper_changes(self):
self._set_cth()
new_cth = cth_obj.ConntrackHelper(
context=None, id=self.conntrack_helper1.id, protocol='udp',
port=6969, helper='tftp', router_id=self.router1)
self.assertTrue(self.mapping.check_conntrack_helper_changes(new_cth))
def test_check_conntrack_helper_changes_no_change(self):
self._set_cth()
new_cth = cth_obj.ConntrackHelper(
context=None, id=self.conntrack_helper1.id, protocol='udp',
port=69, helper='tftp', router_id=self.router1)
self.assertFalse(self.mapping.check_conntrack_helper_changes(new_cth))

View File

@ -118,6 +118,7 @@ neutron.agent.l3.extensions =
gateway_ip_qos = neutron.agent.l3.extensions.qos.gateway_ip:RouterGatewayIPQosAgentExtension
port_forwarding = neutron.agent.l3.extensions.port_forwarding:PortForwardingAgentExtension
snat_log = neutron.agent.l3.extensions.snat_log:SNATLoggingExtension
conntrack_helper = neutron.agent.l3.extensions.conntrack_helper:ConntrackHelperAgentExtension
neutron.services.logapi.drivers =
ovs = neutron.services.logapi.drivers.openvswitch.ovs_firewall_log:OVSFirewallLoggingDriver
neutron.qos.agent_drivers =