From 898faa82db5cf6aff6fb53a93fbf4dd5e6c6aee2 Mon Sep 17 00:00:00 2001 From: jimin3-shin Date: Wed, 26 Nov 2025 21:21:43 +0900 Subject: [PATCH] Add information to network ip availabilities This change provides full information on the number of ips in the subnet from the response of GET ``/v2.0/network-ip-availabilites`` - the number of ips in the subnet cidr, in the allocation pools, used ips in the subnet cidr, and used ips in the allocation pools. [0] https://review.opendev.org/c/openstack/neutron-specs/+/947826 Related-Bug: #2107316 Change-Id: I226416a511730d9cda916cc44cc78658f4bc987e Signed-off-by: Jimin Shin --- .../db/network_ip_availability_details_db.py | 227 +++++++++++ .../network_ip_availability_details.py | 48 +++ .../network_ip_availability/plugin.py | 12 +- .../test_network_ip_availability_details.py | 354 ++++++++++++++++++ ...-availability-detail-39802b4b6be3e997.yaml | 7 + 5 files changed, 645 insertions(+), 3 deletions(-) create mode 100644 neutron/db/network_ip_availability_details_db.py create mode 100644 neutron/extensions/network_ip_availability_details.py create mode 100644 neutron/tests/unit/extensions/test_network_ip_availability_details.py create mode 100644 releasenotes/notes/add-extension-network-ip-availability-detail-39802b4b6be3e997.yaml diff --git a/neutron/db/network_ip_availability_details_db.py b/neutron/db/network_ip_availability_details_db.py new file mode 100644 index 00000000000..8e537241cb6 --- /dev/null +++ b/neutron/db/network_ip_availability_details_db.py @@ -0,0 +1,227 @@ +# Copyright 2025 Samsung SDS. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import ipaddress + +import netaddr + +import neutron.db.models_v2 as mod +from neutron.db import network_ip_availability_db +from neutron_lib import constants +from neutron_lib.db import api as db_api + +TOTAL_IPS_IN_SUBNET = 'total_ips_in_subnet' +TOTAL_IPS_IN_ALLOCATION_POOL = 'total_ips_in_allocation_pool' +USED_IPS_IN_SUBNET = 'used_ips_in_subnet' +USED_IPS_IN_ALLOCATION_POOL = 'used_ips_in_allocation_pool' +FIRST_IP = 'first_ip' +LAST_IP = 'last_ip' +IP_AVAILABILITY_DETAILS = 'ip_availability_details' + + +class IpAvailabilityDetailsDbMixin( + network_ip_availability_db.IpAvailabilityMixin): + """Mixin class to provide detailed IP availability information.""" + + @classmethod + @db_api.CONTEXT_READER + def get_network_ip_availabilities(cls, context, filters=None): + res = super().get_network_ip_availabilities(context, filters) + subnet_total_ips = cls._generate_subnet_total_ips(context, + filters) + subnet_used_ips = cls._generat_subnet_used_ips(context, + filters) + return cls._add_details_to_result(res, subnet_total_ips, + subnet_used_ips) + + @classmethod + def _generate_subnet_total_ips(cls, context, filters): + """Generates a dict whose key=subnet_id, value=total_ips_dict. + total_ips_dict has key-value: + - 'total_ips_in_subnet': subnet cidr + - 'total_ips_in_allocation_pool': the sum of IPs in each allocation + pools, 0 if there are no allocation pools in the subnet + """ + subnet_cidr_alloc_query = cls._build_subnet_cidr_alloc_query(context, + filters) + + subnet_totals_dict = {} + for row in subnet_cidr_alloc_query: + # Skip networks without subnets + if not row.subnet_id: + continue + + # CIDR data + total_ips_in_subnet = netaddr.IPNetwork( + row.cidr, version=row.ip_version).size + if row.ip_version == constants.IP_VERSION_4: + # Exclude network and broadcast addresses. + total_ips_in_subnet -= 2 + + # IPAllocationPool data + total_ips_in_allocation_pool = 0 + if row.first_ip: + pool_total = netaddr.IPRange( + netaddr.IPAddress(row.first_ip), + netaddr.IPAddress(row.last_ip)).size + cur_total = ( + subnet_totals_dict.get(row.subnet_id) + .get(TOTAL_IPS_IN_ALLOCATION_POOL) + if subnet_totals_dict.get(row.subnet_id) else 0 + ) + total_ips_in_allocation_pool = cur_total + pool_total + + subnet_totals_dict[row.subnet_id] = { + TOTAL_IPS_IN_SUBNET: total_ips_in_subnet, + TOTAL_IPS_IN_ALLOCATION_POOL: total_ips_in_allocation_pool + } + return subnet_totals_dict + + @classmethod + @db_api.CONTEXT_READER + def _build_subnet_cidr_alloc_query(cls, context, filters): + query = context.session.query( + *cls.common_columns, + mod.IPAllocationPool.first_ip, + mod.IPAllocationPool.last_ip, + ) + query = query.outerjoin( + mod.Subnet, + mod.Network.id == mod.Subnet.network_id + ) + query = query.outerjoin( + mod.IPAllocationPool, + mod.Subnet.id == mod.IPAllocationPool.subnet_id) + return cls._adjust_query_for_filters(query, filters) + + @classmethod + def _generat_subnet_used_ips(cls, context, filters): + """Generates a dict whose key=subnet_id, value=used_ips_dict. + used_ips_dict has key-value: + - 'used_ips_in_subnet': the sum of used IPs in the subnet (does not + consider allocation pools) + - 'used_ips_in_allocation_pool': the sum of used IPs in each + allocation pool, 0 in case of no allocation pools + """ + allocation_pools_query = ( + cls._build_allocation_pools_query(context, filters)) + + pools_dict = {} + used_ips_dict = {} + + for row in allocation_pools_query: + if pools_dict.get(row.subnet_id) is None: + pools_dict[row.subnet_id] = [] + pools_dict[row.subnet_id].append({ + FIRST_IP: (int(ipaddress.ip_address(row.first_ip)) + if row.first_ip else None), + LAST_IP: (int(ipaddress.ip_address(row.last_ip)) + if row.last_ip else None) + }) + if used_ips_dict.get(row.subnet_id) is None: + used_ips_dict[row.subnet_id] = { + USED_IPS_IN_SUBNET: 0, + USED_IPS_IN_ALLOCATION_POOL: 0 + } + + allocations_query = cls._build_allocations_query(context, filters) + + for row in allocations_query: + if pools_dict.get(row.subnet_id) is None: + continue + + used_ips_dict[row.subnet_id][USED_IPS_IN_SUBNET] += 1 + + ip_address = int(ipaddress.ip_address(row.ip_address)) + for pool in pools_dict[row.subnet_id]: + if pool[FIRST_IP] is None: + continue + if pool[FIRST_IP] <= ip_address <= pool[LAST_IP]: + used_ips_dict[row.subnet_id][USED_IPS_IN_ALLOCATION_POOL]\ + += 1 + + return used_ips_dict + + @classmethod + @db_api.CONTEXT_READER + def _build_allocation_pools_query(cls, context, filters): + query = context.session.query( + *cls.common_columns, + mod.IPAllocationPool.first_ip, + mod.IPAllocationPool.last_ip + ) + query = query.outerjoin( + mod.Subnet, + mod.Network.id == mod.Subnet.network_id + ) + query = query.outerjoin( + mod.IPAllocationPool, + mod.Subnet.id == mod.IPAllocationPool.subnet_id) + return cls._adjust_query_for_filters(query, filters) + + @classmethod + @db_api.CONTEXT_READER + def _build_allocations_query(cls, context, filters): + query = context.session.query( + *cls.common_columns, + mod.IPAllocation.ip_address + ) + query = query.outerjoin( + mod.Subnet, + mod.Network.id == mod.Subnet.network_id + ) + query = query.join( + mod.IPAllocation, + mod.Subnet.id == mod.IPAllocation.subnet_id + ) + return cls._adjust_query_for_filters(query, filters) + + @classmethod + def _add_details_to_result(cls, res, subnet_total_ips, subnet_used_ips): + for i, net in enumerate(res): + net_details = { + TOTAL_IPS_IN_SUBNET: 0, + TOTAL_IPS_IN_ALLOCATION_POOL: 0, + USED_IPS_IN_SUBNET: 0, + USED_IPS_IN_ALLOCATION_POOL: 0 + } + + res_sub = net['subnet_ip_availability'] + for j, sub in enumerate(res_sub): + sub_id = sub['subnet_id'] + sub_details = { + TOTAL_IPS_IN_SUBNET: + subnet_total_ips[sub_id][TOTAL_IPS_IN_SUBNET], + TOTAL_IPS_IN_ALLOCATION_POOL: + subnet_total_ips[sub_id][TOTAL_IPS_IN_ALLOCATION_POOL], + USED_IPS_IN_SUBNET: + subnet_used_ips[sub_id][USED_IPS_IN_SUBNET], + USED_IPS_IN_ALLOCATION_POOL: + subnet_used_ips[sub_id][USED_IPS_IN_ALLOCATION_POOL] + } + res_sub[j][IP_AVAILABILITY_DETAILS] = sub_details + + net_details[TOTAL_IPS_IN_SUBNET]\ + += sub_details[TOTAL_IPS_IN_SUBNET] + net_details[TOTAL_IPS_IN_ALLOCATION_POOL]\ + += sub_details[TOTAL_IPS_IN_ALLOCATION_POOL] + net_details[USED_IPS_IN_SUBNET]\ + += sub_details[USED_IPS_IN_SUBNET] + net_details[USED_IPS_IN_ALLOCATION_POOL]\ + += sub_details[USED_IPS_IN_ALLOCATION_POOL] + + res[i][IP_AVAILABILITY_DETAILS] = net_details + + return res diff --git a/neutron/extensions/network_ip_availability_details.py b/neutron/extensions/network_ip_availability_details.py new file mode 100644 index 00000000000..503f04d3d97 --- /dev/null +++ b/neutron/extensions/network_ip_availability_details.py @@ -0,0 +1,48 @@ +# Copyright 2025 Samsung SDS. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from neutron_lib.api.definitions import network_ip_availability as base_apidef +from neutron_lib.api.definitions import network_ip_availability_details as \ + details_apidef +from neutron_lib.api import extensions as api_extensions + +from neutron.api import extensions +from neutron.api.v2 import base +from neutron.services.network_ip_availability import plugin + + +class Network_ip_availability_details(api_extensions.APIExtensionDescriptor): + """Extension class supporting + detailed network ip availability information. + """ + api_definition = details_apidef + + @classmethod + def get_resources(cls): + """Returns Extended Resource for service type management.""" + resource_attributes = ( + base_apidef.RESOURCE_ATTRIBUTE_MAP)[base_apidef.RESOURCE_PLURAL] + resource_attributes.update(details_apidef.RESOURCE_ATTRIBUTE_MAP) + controller = base.create_resource( + base_apidef.RESOURCE_PLURAL, + base_apidef.RESOURCE_NAME, + plugin.NetworkIPAvailabilityPlugin.get_instance(), + resource_attributes, + allow_pagination=True, + allow_sorting=True, + ) + return [extensions.ResourceExtension(base_apidef.COLLECTION_NAME, + controller, + attr_map=resource_attributes)] diff --git a/neutron/services/network_ip_availability/plugin.py b/neutron/services/network_ip_availability/plugin.py index 5ed30d8ab1d..c56c5b50368 100644 --- a/neutron/services/network_ip_availability/plugin.py +++ b/neutron/services/network_ip_availability/plugin.py @@ -14,19 +14,25 @@ # limitations under the License. from neutron_lib.api.definitions import network_ip_availability +from neutron_lib.api.definitions import network_ip_availability_details from neutron_lib.db import utils as db_utils from neutron_lib import exceptions from neutron.db import db_base_plugin_v2 from neutron.db import network_ip_availability_db as ip_availability_db +from neutron.db \ + import network_ip_availability_details_db as ip_availability_details_db -class NetworkIPAvailabilityPlugin(ip_availability_db.IpAvailabilityMixin, - db_base_plugin_v2.NeutronDbPluginV2): +class NetworkIPAvailabilityPlugin( + ip_availability_details_db.IpAvailabilityDetailsDbMixin, + ip_availability_db.IpAvailabilityMixin, + db_base_plugin_v2.NeutronDbPluginV2): """This plugin exposes IP availability data for networks and subnets.""" _instance = None - supported_extension_aliases = [network_ip_availability.ALIAS] + supported_extension_aliases = [network_ip_availability.ALIAS, + network_ip_availability_details.ALIAS] __filter_validation_support = True diff --git a/neutron/tests/unit/extensions/test_network_ip_availability_details.py b/neutron/tests/unit/extensions/test_network_ip_availability_details.py new file mode 100644 index 00000000000..70f7390dbe8 --- /dev/null +++ b/neutron/tests/unit/extensions/test_network_ip_availability_details.py @@ -0,0 +1,354 @@ +# Copyright 2025 Samsung SDS. All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import netaddr +from neutron_lib import constants + +from neutron.api import extensions as api_ext +from neutron.common import config +from neutron import extensions +from neutron.services.network_ip_availability import plugin as plugin_module +from neutron.tests.common import test_db_base_plugin_v2 + +API_RESOURCE = 'network-ip-availabilities' +IP_AVAIL_KEY = 'network_ip_availability' +IP_AVAILS_KEY = 'network_ip_availabilities' +IP_AVAIL_DETAILS_KEY = 'ip_availability_details' +TOTAL_IPS_IN_SUBNET = 'total_ips_in_subnet' +TOTAL_IPS_IN_ALLOCATION_POOL = 'total_ips_in_allocation_pool' +USED_IPS_IN_SUBNET = 'used_ips_in_subnet' +USED_IPS_IN_ALLOCATION_POOL = 'used_ips_in_allocation_pool' +EXTENSIONS_PATH = ':'.join(extensions.__path__) +PLUGIN_NAME = '{}.{}'.format( + plugin_module.NetworkIPAvailabilityPlugin.__module__, + plugin_module.NetworkIPAvailabilityPlugin.__name__) + + +class TestNetworkIPAvailabilityDetails( + test_db_base_plugin_v2.NeutronDbPluginV2TestCase): + def setUp(self): + svc_plugins = {'plugin_name': PLUGIN_NAME} + super().setUp( + service_plugins=svc_plugins) + self.plugin = plugin_module.NetworkIPAvailabilityPlugin() + ext_mgr = api_ext.PluginAwareExtensionManager( + EXTENSIONS_PATH, {"network-ip-availability": self.plugin} + ) + app = config.load_paste_app('extensions_test_app') + self.ext_api = api_ext.ExtensionMiddleware(app, ext_mgr=ext_mgr) + + def _validate_availability_details(self, availability_details, + expected_total_in_subnet, + expected_total_in_allocation_pools, + expected_used_in_subnet, + expected_used_in_allocation_pools): + self.assertEqual(expected_total_in_subnet, + availability_details[TOTAL_IPS_IN_SUBNET]) + self.assertEqual(expected_total_in_allocation_pools, + availability_details[TOTAL_IPS_IN_ALLOCATION_POOL]) + self.assertEqual(expected_used_in_subnet, + availability_details[USED_IPS_IN_SUBNET]) + self.assertEqual(expected_used_in_allocation_pools, + availability_details[USED_IPS_IN_ALLOCATION_POOL]) + + def _validate_from_availabilities(self, availabilities, wrapped_network, + expected_total_in_subnet, + expected_total_in_allocation_pools, + expected_used_in_subnet, + expected_used_in_allocation_pools): + network = wrapped_network['network'] + availability = self._find_availability(availabilities, network['id']) + self.assertIsNotNone(availability) + self.assertIsNotNone(availability[IP_AVAIL_DETAILS_KEY]) + self._validate_availability_details(availability[IP_AVAIL_DETAILS_KEY], + expected_total_in_subnet, + expected_total_in_allocation_pools, + expected_used_in_subnet, + expected_used_in_allocation_pools) + + def test_usages_query_list_with_fields_ip_availability_details(self): + with self.network() as net: + with self.subnet(network=net): + # list by query fields: total_ips + params = 'fields=ip_availability_details' + request = self.new_list_request(API_RESOURCE, + params=params, + as_admin=True) + response = self.deserialize(self.fmt, + request.get_response(self.ext_api)) + availability = response[IP_AVAILS_KEY][0] + self.assertIn('ip_availability_details', availability) + self.assertNotIn('network_id', availability) + + def test_usages_query_show_with_fields_total_ips(self): + with self.network() as net: + with self.subnet(network=net): + network = net['network'] + params = ['ip_availability_details'] + request = self.new_show_request(API_RESOURCE, + network['id'], + fields=params, + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + availability = response[IP_AVAIL_KEY] + self.assertIn('ip_availability_details', availability) + self.assertNotIn('network_id', availability) + + @staticmethod + def _find_availability(availabilities, net_id): + for ip_availability in availabilities: + if net_id == ip_availability['network_id']: + return ip_availability + + def test_basic(self): + with self.network() as net: + with self.subnet(network=net): + network = net['network'] + # Get ALL + request = self.new_list_request(API_RESOURCE, + self.fmt, + as_admin=True) + response = self.deserialize(self.fmt, + request.get_response(self.ext_api)) + self.assertIn(IP_AVAILS_KEY, response) + self.assertEqual(1, len(response[IP_AVAILS_KEY])) + self._validate_from_availabilities(response[IP_AVAILS_KEY], + net, 254, 253, 0, 0) + + # Get single via id + request = self.new_show_request(API_RESOURCE, network['id'], + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + self.assertIn(IP_AVAIL_KEY, response) + usage = response[IP_AVAIL_KEY] + self.assertIn(IP_AVAIL_DETAILS_KEY, usage) + self._validate_availability_details( + usage[IP_AVAIL_DETAILS_KEY], 254, 253, 0, 0) + + def test_usages_multi_nets_subnets(self): + with self.network(name='net1') as n1,\ + self.network(name='net2') as n2,\ + self.network(name='net3') as n3: + # n1 should have 2 subnets, n2 should have none, n3 has 1 + with self.subnet(network=n1) as subnet1_1, \ + self.subnet(cidr='40.0.0.0/24', network=n3) as subnet3_1: + # Consume 3 ports n1, none n2, 2 ports on n3 + with self.port(subnet=subnet1_1),\ + self.port(subnet=subnet1_1),\ + self.port(subnet=subnet1_1),\ + self.port(subnet=subnet3_1),\ + self.port(subnet=subnet3_1): + + # Test get ALL + request = self.new_list_request(API_RESOURCE, + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + self.assertIn(IP_AVAILS_KEY, response) + self.assertEqual(3, len(response[IP_AVAILS_KEY])) + + data = response[IP_AVAILS_KEY] + self._validate_from_availabilities(data, n1, + 254, 253, 3, 3) + self._validate_from_availabilities(data, n2, + 0, 0, 0, 0) + self._validate_from_availabilities(data, n3, + 254, 253, 2, 2) + + # Test get single via network id + network = n1['network'] + request = self.new_show_request(API_RESOURCE, + network['id'], + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + self.assertIn(IP_AVAIL_KEY, response) + data = response[IP_AVAIL_KEY] + self.assertIn(IP_AVAIL_DETAILS_KEY, data) + self._validate_availability_details( + data[IP_AVAIL_DETAILS_KEY], 254, 253, 3, 3) + + def test_usages_multi_nets_subnets_sums(self): + with self.network(name='net1') as n1: + # n1 has 2 subnets + with self.subnet(network=n1) as subnet1_1, \ + self.subnet(cidr='40.0.0.0/24', network=n1) as subnet1_2: + # Consume 3 ports n1: 1 on subnet 1 and 2 on subnet 2 + with self.port(subnet=subnet1_1),\ + self.port(subnet=subnet1_2),\ + self.port(subnet=subnet1_2): + # Get ALL + request = self.new_list_request(API_RESOURCE, + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + self.assertIn(IP_AVAILS_KEY, response) + self.assertEqual(1, len(response[IP_AVAILS_KEY])) + self._validate_from_availabilities(response[IP_AVAILS_KEY], + n1, 508, 506, 3, 3) + + # Get single via network id + network = n1['network'] + request = self.new_show_request(API_RESOURCE, + network['id'], + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + self.assertIn(IP_AVAIL_KEY, response) + data = response[IP_AVAIL_KEY] + self.assertIn(IP_AVAIL_DETAILS_KEY, data) + self._validate_availability_details( + data[IP_AVAIL_DETAILS_KEY], 508, 506, 3, 3) + + def test_usages_query_ip_version_v4(self): + with self.network() as net: + with self.subnet(network=net): + # Get IPv4 + params = 'ip_version=%s' % constants.IP_VERSION_4 + request = self.new_list_request(API_RESOURCE, params=params, + as_admin=True) + response = self.deserialize(self.fmt, + request.get_response(self.ext_api)) + self.assertIn(IP_AVAILS_KEY, response) + self.assertEqual(1, len(response[IP_AVAILS_KEY])) + self._validate_from_availabilities(response[IP_AVAILS_KEY], + net, 254, 253, 0, 0) + + # Get IPv6 should return empty array + params = 'ip_version=%s' % constants.IP_VERSION_6 + request = self.new_list_request(API_RESOURCE, params=params, + as_admin=True) + response = self.deserialize(self.fmt, + request.get_response(self.ext_api)) + self.assertEqual(0, len(response[IP_AVAILS_KEY])) + + def test_usages_query_ip_version_v6(self): + cidr_ipv6 = '2001:db8:1002:51::/64' + cidr_ipv6_net = netaddr.IPNetwork(cidr_ipv6) + with self.network() as net: + with self.subnet( + network=net, cidr=cidr_ipv6, + ip_version=constants.IP_VERSION_6, + ipv6_address_mode=constants.DHCPV6_STATELESS): + # Get IPv6 + params = 'ip_version=%s' % constants.IP_VERSION_6 + request = self.new_list_request(API_RESOURCE, params=params, + as_admin=True) + response = self.deserialize(self.fmt, + request.get_response(self.ext_api)) + self.assertEqual(1, len(response[IP_AVAILS_KEY])) + self._validate_from_availabilities( + response[IP_AVAILS_KEY], net, + cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 0, 0) + + # Get IPv4 should return empty array + params = 'ip_version=%s' % constants.IP_VERSION_4 + request = self.new_list_request(API_RESOURCE, params=params, + as_admin=True) + response = self.deserialize(self.fmt, + request.get_response(self.ext_api)) + self.assertEqual(0, len(response[IP_AVAILS_KEY])) + + def test_usages_ports_consumed_v6(self): + cidr_ipv6 = '2001:db8:1002:51::/64' + cidr_ipv6_net = netaddr.IPNetwork(cidr_ipv6) + with self.network() as net: + with self.subnet( + network=net, cidr=cidr_ipv6, + ip_version=constants.IP_VERSION_6, + ipv6_address_mode=constants.DHCPV6_STATELESS) as subnet: + request = self.new_list_request(API_RESOURCE, + as_admin=True) + # Consume 3 ports + with self.port(subnet=subnet),\ + self.port(subnet=subnet), \ + self.port(subnet=subnet): + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + + self._validate_from_availabilities( + response[IP_AVAILS_KEY], net, + cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 3, 3) + + def test_usages_multi_net_multi_subnet_46(self): + # Setup mixed v4/v6 networks with IPs consumed on each + cidr_ipv6 = '2001:db8:1003:52::/64' + cidr_ipv6_net = netaddr.IPNetwork(cidr_ipv6) + with self.network(name='net-v6-1') as net_v6_1, \ + self.network(name='net-v6-2') as net_v6_2, \ + self.network(name='net-v4-1') as net_v4_1, \ + self.network(name='net-v4-2') as net_v4_2: + with self.subnet(network=net_v6_1, cidr='2001:db8:1002:51::/64', + ip_version=constants.IP_VERSION_6) as s61, \ + self.subnet(network=net_v6_2, + cidr=cidr_ipv6, + ip_version=constants.IP_VERSION_6) as s62, \ + self.subnet(network=net_v4_1, cidr='10.0.0.0/24') as s41, \ + self.subnet(network=net_v4_2, cidr='10.0.1.0/24') as s42: + with self.port(subnet=s61),\ + self.port(subnet=s62), self.port(subnet=s62), \ + self.port(subnet=s41), \ + self.port(subnet=s42), self.port(subnet=s42): + + # Verify consumption across all + request = self.new_list_request(API_RESOURCE, + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + avails_list = response[IP_AVAILS_KEY] + self._validate_from_availabilities( + avails_list, net_v6_1, + cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 1, 1) + self._validate_from_availabilities( + avails_list, net_v6_2, + cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 2, 2) + self._validate_from_availabilities( + avails_list, net_v4_1, + 254, 253, 1, 1) + self._validate_from_availabilities( + avails_list, net_v4_2, + 254, 253, 2, 2) + + # Query by IP versions. Ensure subnet versions match + for ip_ver in [constants.IP_VERSION_4, + constants.IP_VERSION_6]: + params = 'ip_version=%i' % ip_ver + request = self.new_list_request(API_RESOURCE, + params=params, + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + for net_avail in response[IP_AVAILS_KEY]: + for sub in net_avail['subnet_ip_availability']: + self.assertEqual(ip_ver, sub['ip_version']) + + # Verify consumption querying 2 network ids (IN clause) + request = self.new_list_request( + API_RESOURCE, + params='network_id=%s&network_id=%s' + % (net_v4_2['network']['id'], + net_v6_2['network']['id']), + as_admin=True) + response = self.deserialize( + self.fmt, request.get_response(self.ext_api)) + avails_list = response[IP_AVAILS_KEY] + self._validate_from_availabilities( + avails_list, net_v6_2, + cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 2, 2) + self._validate_from_availabilities( + avails_list, net_v4_2, + 254, 253, 2, 2) diff --git a/releasenotes/notes/add-extension-network-ip-availability-detail-39802b4b6be3e997.yaml b/releasenotes/notes/add-extension-network-ip-availability-detail-39802b4b6be3e997.yaml new file mode 100644 index 00000000000..a5c2c3c9d62 --- /dev/null +++ b/releasenotes/notes/add-extension-network-ip-availability-detail-39802b4b6be3e997.yaml @@ -0,0 +1,7 @@ +--- +features: + - | + Add a new API extension, ``network-ip-availability-details``, that adds + the ``ip_availability_details`` attribute to network IP availabilities. + The value of this attribute contains detailed information about the + network IP usage statistics.