diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index 105454df366..f63ee33f09a 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -430,10 +430,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, # raise the underlying exception raise e.errors[0].error - for subnet in subnets: - self._check_for_dup_router_subnet(context, router, - new_network_id, subnet['id'], - subnet['cidr']) + self._check_for_dup_router_subnets(context, router, + new_network_id, subnets) self._create_router_gw_port(context, router, new_network_id, ext_ips) registry.notify(resources.ROUTER_GATEWAY, @@ -532,38 +530,42 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, return self._get_collection_count(context, Router, filters=filters) - def _check_for_dup_router_subnet(self, context, router, - network_id, subnet_id, subnet_cidr): - try: - # It's possible these ports are on the same network, but - # different subnets. - new_ipnet = netaddr.IPNetwork(subnet_cidr) - for p in (rp.port for rp in router.attached_ports): - for ip in p['fixed_ips']: - if ip['subnet_id'] == subnet_id: - msg = (_("Router already has a port on subnet %s") - % subnet_id) - raise n_exc.BadRequest(resource='router', msg=msg) - # Ignore temporary Prefix Delegation CIDRs - if subnet_cidr == l3_constants.PROVISIONAL_IPV6_PD_PREFIX: - continue - sub_id = ip['subnet_id'] - cidr = self._core_plugin.get_subnet(context.elevated(), - sub_id)['cidr'] - ipnet = netaddr.IPNetwork(cidr) - match1 = netaddr.all_matching_cidrs(new_ipnet, [cidr]) - match2 = netaddr.all_matching_cidrs(ipnet, [subnet_cidr]) - if match1 or match2: - data = {'subnet_cidr': subnet_cidr, - 'subnet_id': subnet_id, - 'cidr': cidr, - 'sub_id': sub_id} - msg = (_("Cidr %(subnet_cidr)s of subnet " - "%(subnet_id)s overlaps with cidr %(cidr)s " - "of subnet %(sub_id)s") % data) - raise n_exc.BadRequest(resource='router', msg=msg) - except exc.NoResultFound: - pass + def _check_for_dup_router_subnets(self, context, router, + network_id, new_subnets): + # It's possible these ports are on the same network, but + # different subnets. + new_subnet_ids = {s['id'] for s in new_subnets} + router_subnets = [] + for p in (rp.port for rp in router.attached_ports): + for ip in p['fixed_ips']: + if ip['subnet_id'] in new_subnet_ids: + msg = (_("Router already has a port on subnet %s") + % ip['subnet_id']) + raise n_exc.BadRequest(resource='router', msg=msg) + router_subnets.append(ip['subnet_id']) + # Ignore temporary Prefix Delegation CIDRs + new_subnets = [s for s in new_subnets + if s['cidr'] != l3_constants.PROVISIONAL_IPV6_PD_PREFIX] + id_filter = {'id': router_subnets} + subnets = self._core_plugin.get_subnets(context.elevated(), + filters=id_filter) + for sub in subnets: + cidr = sub['cidr'] + ipnet = netaddr.IPNetwork(cidr) + for s in new_subnets: + new_cidr = s['cidr'] + new_ipnet = netaddr.IPNetwork(new_cidr) + match1 = netaddr.all_matching_cidrs(new_ipnet, [cidr]) + match2 = netaddr.all_matching_cidrs(ipnet, [new_cidr]) + if match1 or match2: + data = {'subnet_cidr': new_cidr, + 'subnet_id': s['id'], + 'cidr': cidr, + 'sub_id': sub['id']} + msg = (_("Cidr %(subnet_cidr)s of subnet " + "%(subnet_id)s overlaps with cidr %(cidr)s " + "of subnet %(sub_id)s") % data) + raise n_exc.BadRequest(resource='router', msg=msg) def _get_device_owner(self, context, router=None): """Get device_owner for the specified router.""" @@ -631,10 +633,11 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, subnet = self._core_plugin.get_subnet(context, fixed_ip['subnet_id']) subnets.append(subnet) - self._check_for_dup_router_subnet(context, router, - port['network_id'], - subnet['id'], - subnet['cidr']) + + if subnets: + self._check_for_dup_router_subnets(context, router, + port['network_id'], + subnets) # Keep the restriction against multiple IPv4 subnets if len([s for s in subnets if s['ip_version'] == 4]) > 1: @@ -665,10 +668,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase, 'external router cannot be added to Neutron Router.') % subnet['id']) raise n_exc.BadRequest(resource='router', msg=msg) - self._check_for_dup_router_subnet(context, router, - subnet['network_id'], - subnet_id, - subnet['cidr']) + self._check_for_dup_router_subnets(context, router, + subnet['network_id'], [subnet]) fixed_ip = {'ip_address': subnet['gateway_ip'], 'subnet_id': subnet['id']} diff --git a/neutron/tests/unit/extensions/test_l3.py b/neutron/tests/unit/extensions/test_l3.py index 8bd843be1c6..137c80e129e 100644 --- a/neutron/tests/unit/extensions/test_l3.py +++ b/neutron/tests/unit/extensions/test_l3.py @@ -1365,40 +1365,49 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): def test_router_add_interface_dup_subnet2_returns_400(self): with self.router() as r: - with self.subnet() as s: - with self.port(subnet=s) as p1: - with self.port(subnet=s) as p2: + with self.subnet() as s1, self.subnet(cidr='1.0.0.0/24') as s2: + with self.port(subnet=s1) as p1, self.port(subnet=s2) as p2: + with self.port(subnet=s1) as p3: + for p in [p1, p2]: + self._router_interface_action('add', + r['router']['id'], + None, + p['port']['id']) self._router_interface_action('add', r['router']['id'], None, - p1['port']['id']) - self._router_interface_action('add', - r['router']['id'], - None, - p2['port']['id'], + p3['port']['id'], expected_code=exc. HTTPBadRequest.code) def test_router_add_interface_overlapped_cidr_returns_400(self): with self.router() as r: - with self.subnet(cidr='10.0.1.0/24') as s1: + with self.subnet(cidr='10.0.1.0/24') as s1, self.subnet( + cidr='10.0.2.0/24') as s2: self._router_interface_action('add', r['router']['id'], s1['subnet']['id'], None) + self._router_interface_action('add', + r['router']['id'], + s2['subnet']['id'], + None) def try_overlapped_cidr(cidr): - with self.subnet(cidr=cidr) as s2: + with self.subnet(cidr=cidr) as s3: self._router_interface_action('add', r['router']['id'], - s2['subnet']['id'], + s3['subnet']['id'], None, expected_code=exc. HTTPBadRequest.code) # another subnet with same cidr try_overlapped_cidr('10.0.1.0/24') + try_overlapped_cidr('10.0.2.0/24') # another subnet with overlapped cidr including s1 try_overlapped_cidr('10.0.0.0/16') + # another subnet with overlapped cidr including s2 + try_overlapped_cidr('10.0.2.128/28') def test_router_add_interface_no_data_returns_400(self): with self.router() as r: