From b17f4ffeba6448e99cbedca5875d758a070e0dab Mon Sep 17 00:00:00 2001 From: Slawek Kaplonski Date: Fri, 31 Jul 2020 12:50:16 +0200 Subject: [PATCH] Fix validation of IPv6 subnets with external RAs In case when such subnet is added directly to the router, there was validation and subnet which is expected to get RA messages from the external router couldn't be added to the Neutron router. But in case when port was first created manually and then plugged to the router, there wasn't such validation. This patch fixes it by adding same validation to adding router interface by port. Change-Id: I054296c790b697198550acbeae29546758b422c2 Closes-Bug: #1889619 (cherry picked from commit 38c7fd7cef42e8c1e6430feb5ac01396c65560c2) --- neutron/db/l3_db.py | 23 ++++++++++++++------ neutron/tests/unit/extensions/test_l3.py | 27 ++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index 39db7db9194..e0e214c5a49 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -116,6 +116,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, if l3plugin and port_check: l3plugin.prevent_l3_port_deletion(context, port_id) + @staticmethod + def _validate_subnet_address_mode(subnet): + if (subnet['ip_version'] == 6 and subnet['ipv6_ra_mode'] is None and + subnet['ipv6_address_mode'] is not None): + msg = (_('IPv6 subnet %s configured to receive RAs from an ' + 'external router cannot be added to Neutron Router.') % + subnet['id']) + raise n_exc.BadRequest(resource='router', msg=msg) + @property def _is_dns_integration_supported(self): if self._dns_integration is None: @@ -703,6 +712,13 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, if not port['fixed_ips']: msg = _('Router port must have at least one fixed IP') raise n_exc.BadRequest(resource='router', msg=msg) + + fixed_ips = [ip for ip in port['fixed_ips']] + for fixed_ip in fixed_ips: + subnet = self._core_plugin.get_subnet( + context, fixed_ip['subnet_id']) + self._validate_subnet_address_mode(subnet) + return port def _validate_port_in_range_or_admin(self, context, subnets, port): @@ -827,12 +843,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, msg = (_('Cannot add interface to router because subnet %s is not ' 'owned by project making the request') % subnet_id) raise n_exc.BadRequest(resource='router', msg=msg) - if (subnet['ip_version'] == 6 and subnet['ipv6_ra_mode'] is None and - subnet['ipv6_address_mode'] is not None): - msg = (_('IPv6 subnet %s configured to receive RAs from an ' - 'external router cannot be added to Neutron Router.') % - subnet['id']) - raise n_exc.BadRequest(resource='router', msg=msg) + self._validate_subnet_address_mode(subnet) self._check_for_dup_router_subnets(context, router, subnet['network_id'], [subnet]) fixed_ip = {'ip_address': subnet['gateway_ip'], diff --git a/neutron/tests/unit/extensions/test_l3.py b/neutron/tests/unit/extensions/test_l3.py index 1d96b83322c..e70154929c8 100644 --- a/neutron/tests/unit/extensions/test_l3.py +++ b/neutron/tests/unit/extensions/test_l3.py @@ -1418,6 +1418,33 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): None, p['port']['id']) + def test_update_router_interface_port_ipv6_subnet_ext_ra(self): + use_cases = [{'msg': 'IPv6 Subnet Modes (none, slaac)', + 'ra_mode': None, + 'address_mode': lib_constants.IPV6_SLAAC}, + {'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateful)', + 'ra_mode': None, + 'address_mode': lib_constants.DHCPV6_STATEFUL}, + {'msg': 'IPv6 Subnet Modes (none, dhcpv6-stateless)', + 'ra_mode': None, + 'address_mode': lib_constants.DHCPV6_STATELESS}] + for uc in use_cases: + with self.network() as network, self.router() as router: + with self.subnet( + network=network, cidr='fd00::/64', + ip_version=lib_constants.IP_VERSION_6, + ipv6_ra_mode=uc['ra_mode'], + ipv6_address_mode=uc['address_mode']) as subnet: + fixed_ips = [{'subnet_id': subnet['subnet']['id']}] + with self.port(subnet=subnet, fixed_ips=fixed_ips) as port: + self._router_interface_action( + 'add', + router['router']['id'], + None, + port['port']['id'], + expected_code=exc.HTTPBadRequest.code, + msg=uc['msg']) + def _assert_body_port_id_and_update_port(self, body, mock_update_port, port_id, device_id): self.assertNotIn('port_id', body)