Correctly validate subnet arguments when using a subnetpool

When creating a subnet using a subnetpool, we were
failing to validate all the passed API arguments in
the dictionary, leading to a case where you could
specify an invalid DNS nameserver. For example,
using an IPv4 nameserver on an IPv6 subnet. This
could cause daemons the l3-agent starts, like radvd,
to fail to start correctly, leading to a loss of
connectivity.

Specifying a subnet by cidr without a subnetpool
did already correctly fail with an IP version
mismatch error, this is just an edge case that
was never tested.

Since _validate_subnet() was called in so many places
it was moved to a common location and is only not
called for IPv6 prefix-delegation subnets.

Closes-bug: #2036877
Change-Id: I6302e9a373cf93e706cec10f87c3beaf632a0391
(cherry picked from commit 2f91d330da)
This commit is contained in:
Brian Haley 2023-11-06 15:03:50 -05:00 committed by Sven Kieske
parent a28ec9ed3c
commit f2ec3a6cec
2 changed files with 61 additions and 11 deletions

View File

@ -318,7 +318,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if project_ids - allowed_projects:
raise exc.InvalidSharedSetting(network=network.name)
def _validate_ipv6_attributes(self, subnet, cur_subnet):
def _validate_ipv6_attributes(self, subnet, cur_subnet, has_cidr):
if cur_subnet:
self._validate_ipv6_update_dhcp(subnet, cur_subnet)
return
@ -330,7 +330,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if ra_mode_set and address_mode_set:
self._validate_ipv6_combination(subnet['ipv6_ra_mode'],
subnet['ipv6_address_mode'])
if address_mode_set or ra_mode_set:
if has_cidr and (address_mode_set or ra_mode_set):
self._validate_eui64_applicable(subnet)
def _validate_eui64_applicable(self, subnet):
@ -607,8 +607,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
ip_ver = s['ip_version']
# We could be called without a cidr if a subnet pool is being used,
# so remember that since some checks below require one and should
# be skipped.
has_cidr = False
if validators.is_attr_set(s.get('cidr')):
self._validate_ip_version(ip_ver, s['cidr'], 'cidr')
has_cidr = True
# TODO(watanabe.isao): After we found a way to avoid the re-sync
# from the agent side, this restriction could be removed.
@ -616,7 +621,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
dhcp_was_enabled = cur_subnet.enable_dhcp
else:
dhcp_was_enabled = False
if s.get('enable_dhcp') and not dhcp_was_enabled:
if has_cidr and s.get('enable_dhcp') and not dhcp_was_enabled:
subnet_prefixlen = netaddr.IPNetwork(s['cidr']).prefixlen
error_message = _("Subnet has a prefix length that is "
"incompatible with DHCP service enabled")
@ -640,12 +645,13 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if validators.is_attr_set(s.get('gateway_ip')):
self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
is_gateway_not_valid = (
ipam.utils.check_gateway_invalid_in_subnet(
s['cidr'], s['gateway_ip']))
if is_gateway_not_valid:
error_message = _("Gateway is not valid on subnet")
raise exc.InvalidInput(error_message=error_message)
if has_cidr:
is_gateway_not_valid = (
ipam.utils.check_gateway_invalid_in_subnet(
s['cidr'], s['gateway_ip']))
if is_gateway_not_valid:
error_message = _("Gateway is not valid on subnet")
raise exc.InvalidInput(error_message=error_message)
# Ensure the gateway IP is not assigned to any port
# skip this check in case of create (s parameter won't have id)
# NOTE(salv-orlando): There is slight chance of a race, when
@ -698,7 +704,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
error_message=(_("ipv6_address_mode is not valid when "
"ip_version is 4")))
if ip_ver == 6:
self._validate_ipv6_attributes(s, cur_subnet)
self._validate_ipv6_attributes(s, cur_subnet, has_cidr)
def _validate_subnet_for_pd(self, subnet):
"""Validates that subnet parameters are correct for IPv6 PD"""
@ -860,6 +866,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
msg = _('a subnetpool must be specified in the absence of a cidr')
raise exc.BadRequest(resource='subnets', msg=msg)
validate = True
if subnetpool_id:
self.ipam.validate_pools_with_subnetpool(s)
if subnetpool_id == constants.IPV6_PD_POOL_ID:
@ -868,16 +875,18 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# cidr with IPv6 prefix delegation. Set the subnetpool_id
# to None and allow the request to continue as normal.
subnetpool_id = None
self._validate_subnet(context, s)
else:
prefix = constants.PROVISIONAL_IPV6_PD_PREFIX
subnet['subnet']['cidr'] = prefix
self._validate_subnet_for_pd(s)
validate = False
else:
if not has_cidr:
msg = _('A cidr must be specified in the absence of a '
'subnet pool')
raise exc.BadRequest(resource='subnets', msg=msg)
if validate:
self._validate_subnet(context, s)
with db_api.CONTEXT_WRITER.using(context):

View File

@ -6641,6 +6641,47 @@ class TestSubnetPoolsV2(NeutronDbPluginV2TestCase):
res = req.get_response(self.api)
self.assertEqual(400, res.status_int)
def _test_allocate_any_subnet_wrong_dns(self, pool_cidr, ip_version,
nameservers):
with self.network() as network:
sp = self._test_create_subnetpool([pool_cidr],
tenant_id=self._tenant_id,
name=self._POOL_NAME)
# Specify a DNS nameserver of the wrong IP version
data = {'subnet': {'network_id': network['network']['id'],
'subnetpool_id': sp['subnetpool']['id'],
'ip_version': ip_version,
'dns_nameservers': nameservers,
'tenant_id': network['network']['tenant_id']}}
req = self.new_create_request('subnets', data)
res = req.get_response(self.api)
self.assertEqual(400, res.status_int)
def test_allocate_any_v4_subnet_wrong_dns_v6(self):
self._test_allocate_any_subnet_wrong_dns('10.0.0.0/16',
constants.IP_VERSION_4, ['2001:db8:1:2::1'])
def test_allocate_any_v6_subnet_wrong_dns_v4(self):
self._test_allocate_any_subnet_wrong_dns('2001:db8:1:2::/63',
constants.IP_VERSION_6, ['10.0.0.1'])
def test_allocate_any_v4_subnet_wrong_dns_v6_multiple(self):
self._test_allocate_any_subnet_wrong_dns('10.0.0.0/16',
constants.IP_VERSION_4, ['2001:db8:1:2::1', '2001:db8:2:2::1'])
def test_allocate_any_v6_subnet_wrong_dns_v4_multiple(self):
self._test_allocate_any_subnet_wrong_dns('2001:db8:1:2::/63',
constants.IP_VERSION_6, ['11.0.0.11', '12.0.0.12'])
def test_allocate_any_v4_subnet_wrong_dns_mixed(self):
self._test_allocate_any_subnet_wrong_dns('10.0.0.0/16',
constants.IP_VERSION_4, ['11.0.0.11', '2001:db8:1:2::1'])
def test_allocate_any_v6_subnet_wrong_dns_mixed(self):
self._test_allocate_any_subnet_wrong_dns('2001:db8:1:2::/63',
constants.IP_VERSION_6, ['11.0.0.11', '2001:db8:1:2::1'])
class DbModelMixin(object):
"""DB model tests."""