Merge "Add information to network ip availabilities"

This commit is contained in:
Zuul
2026-02-03 23:35:33 +00:00
committed by Gerrit Code Review
5 changed files with 645 additions and 3 deletions

View File

@@ -0,0 +1,227 @@
# Copyright 2025 Samsung SDS. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import ipaddress
import netaddr
import neutron.db.models_v2 as mod
from neutron.db import network_ip_availability_db
from neutron_lib import constants
from neutron_lib.db import api as db_api
TOTAL_IPS_IN_SUBNET = 'total_ips_in_subnet'
TOTAL_IPS_IN_ALLOCATION_POOL = 'total_ips_in_allocation_pool'
USED_IPS_IN_SUBNET = 'used_ips_in_subnet'
USED_IPS_IN_ALLOCATION_POOL = 'used_ips_in_allocation_pool'
FIRST_IP = 'first_ip'
LAST_IP = 'last_ip'
IP_AVAILABILITY_DETAILS = 'ip_availability_details'
class IpAvailabilityDetailsDbMixin(
network_ip_availability_db.IpAvailabilityMixin):
"""Mixin class to provide detailed IP availability information."""
@classmethod
@db_api.CONTEXT_READER
def get_network_ip_availabilities(cls, context, filters=None):
res = super().get_network_ip_availabilities(context, filters)
subnet_total_ips = cls._generate_subnet_total_ips(context,
filters)
subnet_used_ips = cls._generat_subnet_used_ips(context,
filters)
return cls._add_details_to_result(res, subnet_total_ips,
subnet_used_ips)
@classmethod
def _generate_subnet_total_ips(cls, context, filters):
"""Generates a dict whose key=subnet_id, value=total_ips_dict.
total_ips_dict has key-value:
- 'total_ips_in_subnet': subnet cidr
- 'total_ips_in_allocation_pool': the sum of IPs in each allocation
pools, 0 if there are no allocation pools in the subnet
"""
subnet_cidr_alloc_query = cls._build_subnet_cidr_alloc_query(context,
filters)
subnet_totals_dict = {}
for row in subnet_cidr_alloc_query:
# Skip networks without subnets
if not row.subnet_id:
continue
# CIDR data
total_ips_in_subnet = netaddr.IPNetwork(
row.cidr, version=row.ip_version).size
if row.ip_version == constants.IP_VERSION_4:
# Exclude network and broadcast addresses.
total_ips_in_subnet -= 2
# IPAllocationPool data
total_ips_in_allocation_pool = 0
if row.first_ip:
pool_total = netaddr.IPRange(
netaddr.IPAddress(row.first_ip),
netaddr.IPAddress(row.last_ip)).size
cur_total = (
subnet_totals_dict.get(row.subnet_id)
.get(TOTAL_IPS_IN_ALLOCATION_POOL)
if subnet_totals_dict.get(row.subnet_id) else 0
)
total_ips_in_allocation_pool = cur_total + pool_total
subnet_totals_dict[row.subnet_id] = {
TOTAL_IPS_IN_SUBNET: total_ips_in_subnet,
TOTAL_IPS_IN_ALLOCATION_POOL: total_ips_in_allocation_pool
}
return subnet_totals_dict
@classmethod
@db_api.CONTEXT_READER
def _build_subnet_cidr_alloc_query(cls, context, filters):
query = context.session.query(
*cls.common_columns,
mod.IPAllocationPool.first_ip,
mod.IPAllocationPool.last_ip,
)
query = query.outerjoin(
mod.Subnet,
mod.Network.id == mod.Subnet.network_id
)
query = query.outerjoin(
mod.IPAllocationPool,
mod.Subnet.id == mod.IPAllocationPool.subnet_id)
return cls._adjust_query_for_filters(query, filters)
@classmethod
def _generat_subnet_used_ips(cls, context, filters):
"""Generates a dict whose key=subnet_id, value=used_ips_dict.
used_ips_dict has key-value:
- 'used_ips_in_subnet': the sum of used IPs in the subnet (does not
consider allocation pools)
- 'used_ips_in_allocation_pool': the sum of used IPs in each
allocation pool, 0 in case of no allocation pools
"""
allocation_pools_query = (
cls._build_allocation_pools_query(context, filters))
pools_dict = {}
used_ips_dict = {}
for row in allocation_pools_query:
if pools_dict.get(row.subnet_id) is None:
pools_dict[row.subnet_id] = []
pools_dict[row.subnet_id].append({
FIRST_IP: (int(ipaddress.ip_address(row.first_ip))
if row.first_ip else None),
LAST_IP: (int(ipaddress.ip_address(row.last_ip))
if row.last_ip else None)
})
if used_ips_dict.get(row.subnet_id) is None:
used_ips_dict[row.subnet_id] = {
USED_IPS_IN_SUBNET: 0,
USED_IPS_IN_ALLOCATION_POOL: 0
}
allocations_query = cls._build_allocations_query(context, filters)
for row in allocations_query:
if pools_dict.get(row.subnet_id) is None:
continue
used_ips_dict[row.subnet_id][USED_IPS_IN_SUBNET] += 1
ip_address = int(ipaddress.ip_address(row.ip_address))
for pool in pools_dict[row.subnet_id]:
if pool[FIRST_IP] is None:
continue
if pool[FIRST_IP] <= ip_address <= pool[LAST_IP]:
used_ips_dict[row.subnet_id][USED_IPS_IN_ALLOCATION_POOL]\
+= 1
return used_ips_dict
@classmethod
@db_api.CONTEXT_READER
def _build_allocation_pools_query(cls, context, filters):
query = context.session.query(
*cls.common_columns,
mod.IPAllocationPool.first_ip,
mod.IPAllocationPool.last_ip
)
query = query.outerjoin(
mod.Subnet,
mod.Network.id == mod.Subnet.network_id
)
query = query.outerjoin(
mod.IPAllocationPool,
mod.Subnet.id == mod.IPAllocationPool.subnet_id)
return cls._adjust_query_for_filters(query, filters)
@classmethod
@db_api.CONTEXT_READER
def _build_allocations_query(cls, context, filters):
query = context.session.query(
*cls.common_columns,
mod.IPAllocation.ip_address
)
query = query.outerjoin(
mod.Subnet,
mod.Network.id == mod.Subnet.network_id
)
query = query.join(
mod.IPAllocation,
mod.Subnet.id == mod.IPAllocation.subnet_id
)
return cls._adjust_query_for_filters(query, filters)
@classmethod
def _add_details_to_result(cls, res, subnet_total_ips, subnet_used_ips):
for i, net in enumerate(res):
net_details = {
TOTAL_IPS_IN_SUBNET: 0,
TOTAL_IPS_IN_ALLOCATION_POOL: 0,
USED_IPS_IN_SUBNET: 0,
USED_IPS_IN_ALLOCATION_POOL: 0
}
res_sub = net['subnet_ip_availability']
for j, sub in enumerate(res_sub):
sub_id = sub['subnet_id']
sub_details = {
TOTAL_IPS_IN_SUBNET:
subnet_total_ips[sub_id][TOTAL_IPS_IN_SUBNET],
TOTAL_IPS_IN_ALLOCATION_POOL:
subnet_total_ips[sub_id][TOTAL_IPS_IN_ALLOCATION_POOL],
USED_IPS_IN_SUBNET:
subnet_used_ips[sub_id][USED_IPS_IN_SUBNET],
USED_IPS_IN_ALLOCATION_POOL:
subnet_used_ips[sub_id][USED_IPS_IN_ALLOCATION_POOL]
}
res_sub[j][IP_AVAILABILITY_DETAILS] = sub_details
net_details[TOTAL_IPS_IN_SUBNET]\
+= sub_details[TOTAL_IPS_IN_SUBNET]
net_details[TOTAL_IPS_IN_ALLOCATION_POOL]\
+= sub_details[TOTAL_IPS_IN_ALLOCATION_POOL]
net_details[USED_IPS_IN_SUBNET]\
+= sub_details[USED_IPS_IN_SUBNET]
net_details[USED_IPS_IN_ALLOCATION_POOL]\
+= sub_details[USED_IPS_IN_ALLOCATION_POOL]
res[i][IP_AVAILABILITY_DETAILS] = net_details
return res

View File

@@ -0,0 +1,48 @@
# Copyright 2025 Samsung SDS. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from neutron_lib.api.definitions import network_ip_availability as base_apidef
from neutron_lib.api.definitions import network_ip_availability_details as \
details_apidef
from neutron_lib.api import extensions as api_extensions
from neutron.api import extensions
from neutron.api.v2 import base
from neutron.services.network_ip_availability import plugin
class Network_ip_availability_details(api_extensions.APIExtensionDescriptor):
"""Extension class supporting
detailed network ip availability information.
"""
api_definition = details_apidef
@classmethod
def get_resources(cls):
"""Returns Extended Resource for service type management."""
resource_attributes = (
base_apidef.RESOURCE_ATTRIBUTE_MAP)[base_apidef.RESOURCE_PLURAL]
resource_attributes.update(details_apidef.RESOURCE_ATTRIBUTE_MAP)
controller = base.create_resource(
base_apidef.RESOURCE_PLURAL,
base_apidef.RESOURCE_NAME,
plugin.NetworkIPAvailabilityPlugin.get_instance(),
resource_attributes,
allow_pagination=True,
allow_sorting=True,
)
return [extensions.ResourceExtension(base_apidef.COLLECTION_NAME,
controller,
attr_map=resource_attributes)]

View File

@@ -14,19 +14,25 @@
# limitations under the License.
from neutron_lib.api.definitions import network_ip_availability
from neutron_lib.api.definitions import network_ip_availability_details
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions
from neutron.db import db_base_plugin_v2
from neutron.db import network_ip_availability_db as ip_availability_db
from neutron.db \
import network_ip_availability_details_db as ip_availability_details_db
class NetworkIPAvailabilityPlugin(ip_availability_db.IpAvailabilityMixin,
db_base_plugin_v2.NeutronDbPluginV2):
class NetworkIPAvailabilityPlugin(
ip_availability_details_db.IpAvailabilityDetailsDbMixin,
ip_availability_db.IpAvailabilityMixin,
db_base_plugin_v2.NeutronDbPluginV2):
"""This plugin exposes IP availability data for networks and subnets."""
_instance = None
supported_extension_aliases = [network_ip_availability.ALIAS]
supported_extension_aliases = [network_ip_availability.ALIAS,
network_ip_availability_details.ALIAS]
__filter_validation_support = True

View File

@@ -0,0 +1,354 @@
# Copyright 2025 Samsung SDS. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import netaddr
from neutron_lib import constants
from neutron.api import extensions as api_ext
from neutron.common import config
from neutron import extensions
from neutron.services.network_ip_availability import plugin as plugin_module
from neutron.tests.common import test_db_base_plugin_v2
API_RESOURCE = 'network-ip-availabilities'
IP_AVAIL_KEY = 'network_ip_availability'
IP_AVAILS_KEY = 'network_ip_availabilities'
IP_AVAIL_DETAILS_KEY = 'ip_availability_details'
TOTAL_IPS_IN_SUBNET = 'total_ips_in_subnet'
TOTAL_IPS_IN_ALLOCATION_POOL = 'total_ips_in_allocation_pool'
USED_IPS_IN_SUBNET = 'used_ips_in_subnet'
USED_IPS_IN_ALLOCATION_POOL = 'used_ips_in_allocation_pool'
EXTENSIONS_PATH = ':'.join(extensions.__path__)
PLUGIN_NAME = '{}.{}'.format(
plugin_module.NetworkIPAvailabilityPlugin.__module__,
plugin_module.NetworkIPAvailabilityPlugin.__name__)
class TestNetworkIPAvailabilityDetails(
test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def setUp(self):
svc_plugins = {'plugin_name': PLUGIN_NAME}
super().setUp(
service_plugins=svc_plugins)
self.plugin = plugin_module.NetworkIPAvailabilityPlugin()
ext_mgr = api_ext.PluginAwareExtensionManager(
EXTENSIONS_PATH, {"network-ip-availability": self.plugin}
)
app = config.load_paste_app('extensions_test_app')
self.ext_api = api_ext.ExtensionMiddleware(app, ext_mgr=ext_mgr)
def _validate_availability_details(self, availability_details,
expected_total_in_subnet,
expected_total_in_allocation_pools,
expected_used_in_subnet,
expected_used_in_allocation_pools):
self.assertEqual(expected_total_in_subnet,
availability_details[TOTAL_IPS_IN_SUBNET])
self.assertEqual(expected_total_in_allocation_pools,
availability_details[TOTAL_IPS_IN_ALLOCATION_POOL])
self.assertEqual(expected_used_in_subnet,
availability_details[USED_IPS_IN_SUBNET])
self.assertEqual(expected_used_in_allocation_pools,
availability_details[USED_IPS_IN_ALLOCATION_POOL])
def _validate_from_availabilities(self, availabilities, wrapped_network,
expected_total_in_subnet,
expected_total_in_allocation_pools,
expected_used_in_subnet,
expected_used_in_allocation_pools):
network = wrapped_network['network']
availability = self._find_availability(availabilities, network['id'])
self.assertIsNotNone(availability)
self.assertIsNotNone(availability[IP_AVAIL_DETAILS_KEY])
self._validate_availability_details(availability[IP_AVAIL_DETAILS_KEY],
expected_total_in_subnet,
expected_total_in_allocation_pools,
expected_used_in_subnet,
expected_used_in_allocation_pools)
def test_usages_query_list_with_fields_ip_availability_details(self):
with self.network() as net:
with self.subnet(network=net):
# list by query fields: total_ips
params = 'fields=ip_availability_details'
request = self.new_list_request(API_RESOURCE,
params=params,
as_admin=True)
response = self.deserialize(self.fmt,
request.get_response(self.ext_api))
availability = response[IP_AVAILS_KEY][0]
self.assertIn('ip_availability_details', availability)
self.assertNotIn('network_id', availability)
def test_usages_query_show_with_fields_total_ips(self):
with self.network() as net:
with self.subnet(network=net):
network = net['network']
params = ['ip_availability_details']
request = self.new_show_request(API_RESOURCE,
network['id'],
fields=params,
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
availability = response[IP_AVAIL_KEY]
self.assertIn('ip_availability_details', availability)
self.assertNotIn('network_id', availability)
@staticmethod
def _find_availability(availabilities, net_id):
for ip_availability in availabilities:
if net_id == ip_availability['network_id']:
return ip_availability
def test_basic(self):
with self.network() as net:
with self.subnet(network=net):
network = net['network']
# Get ALL
request = self.new_list_request(API_RESOURCE,
self.fmt,
as_admin=True)
response = self.deserialize(self.fmt,
request.get_response(self.ext_api))
self.assertIn(IP_AVAILS_KEY, response)
self.assertEqual(1, len(response[IP_AVAILS_KEY]))
self._validate_from_availabilities(response[IP_AVAILS_KEY],
net, 254, 253, 0, 0)
# Get single via id
request = self.new_show_request(API_RESOURCE, network['id'],
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
self.assertIn(IP_AVAIL_KEY, response)
usage = response[IP_AVAIL_KEY]
self.assertIn(IP_AVAIL_DETAILS_KEY, usage)
self._validate_availability_details(
usage[IP_AVAIL_DETAILS_KEY], 254, 253, 0, 0)
def test_usages_multi_nets_subnets(self):
with self.network(name='net1') as n1,\
self.network(name='net2') as n2,\
self.network(name='net3') as n3:
# n1 should have 2 subnets, n2 should have none, n3 has 1
with self.subnet(network=n1) as subnet1_1, \
self.subnet(cidr='40.0.0.0/24', network=n3) as subnet3_1:
# Consume 3 ports n1, none n2, 2 ports on n3
with self.port(subnet=subnet1_1),\
self.port(subnet=subnet1_1),\
self.port(subnet=subnet1_1),\
self.port(subnet=subnet3_1),\
self.port(subnet=subnet3_1):
# Test get ALL
request = self.new_list_request(API_RESOURCE,
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
self.assertIn(IP_AVAILS_KEY, response)
self.assertEqual(3, len(response[IP_AVAILS_KEY]))
data = response[IP_AVAILS_KEY]
self._validate_from_availabilities(data, n1,
254, 253, 3, 3)
self._validate_from_availabilities(data, n2,
0, 0, 0, 0)
self._validate_from_availabilities(data, n3,
254, 253, 2, 2)
# Test get single via network id
network = n1['network']
request = self.new_show_request(API_RESOURCE,
network['id'],
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
self.assertIn(IP_AVAIL_KEY, response)
data = response[IP_AVAIL_KEY]
self.assertIn(IP_AVAIL_DETAILS_KEY, data)
self._validate_availability_details(
data[IP_AVAIL_DETAILS_KEY], 254, 253, 3, 3)
def test_usages_multi_nets_subnets_sums(self):
with self.network(name='net1') as n1:
# n1 has 2 subnets
with self.subnet(network=n1) as subnet1_1, \
self.subnet(cidr='40.0.0.0/24', network=n1) as subnet1_2:
# Consume 3 ports n1: 1 on subnet 1 and 2 on subnet 2
with self.port(subnet=subnet1_1),\
self.port(subnet=subnet1_2),\
self.port(subnet=subnet1_2):
# Get ALL
request = self.new_list_request(API_RESOURCE,
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
self.assertIn(IP_AVAILS_KEY, response)
self.assertEqual(1, len(response[IP_AVAILS_KEY]))
self._validate_from_availabilities(response[IP_AVAILS_KEY],
n1, 508, 506, 3, 3)
# Get single via network id
network = n1['network']
request = self.new_show_request(API_RESOURCE,
network['id'],
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
self.assertIn(IP_AVAIL_KEY, response)
data = response[IP_AVAIL_KEY]
self.assertIn(IP_AVAIL_DETAILS_KEY, data)
self._validate_availability_details(
data[IP_AVAIL_DETAILS_KEY], 508, 506, 3, 3)
def test_usages_query_ip_version_v4(self):
with self.network() as net:
with self.subnet(network=net):
# Get IPv4
params = 'ip_version=%s' % constants.IP_VERSION_4
request = self.new_list_request(API_RESOURCE, params=params,
as_admin=True)
response = self.deserialize(self.fmt,
request.get_response(self.ext_api))
self.assertIn(IP_AVAILS_KEY, response)
self.assertEqual(1, len(response[IP_AVAILS_KEY]))
self._validate_from_availabilities(response[IP_AVAILS_KEY],
net, 254, 253, 0, 0)
# Get IPv6 should return empty array
params = 'ip_version=%s' % constants.IP_VERSION_6
request = self.new_list_request(API_RESOURCE, params=params,
as_admin=True)
response = self.deserialize(self.fmt,
request.get_response(self.ext_api))
self.assertEqual(0, len(response[IP_AVAILS_KEY]))
def test_usages_query_ip_version_v6(self):
cidr_ipv6 = '2001:db8:1002:51::/64'
cidr_ipv6_net = netaddr.IPNetwork(cidr_ipv6)
with self.network() as net:
with self.subnet(
network=net, cidr=cidr_ipv6,
ip_version=constants.IP_VERSION_6,
ipv6_address_mode=constants.DHCPV6_STATELESS):
# Get IPv6
params = 'ip_version=%s' % constants.IP_VERSION_6
request = self.new_list_request(API_RESOURCE, params=params,
as_admin=True)
response = self.deserialize(self.fmt,
request.get_response(self.ext_api))
self.assertEqual(1, len(response[IP_AVAILS_KEY]))
self._validate_from_availabilities(
response[IP_AVAILS_KEY], net,
cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 0, 0)
# Get IPv4 should return empty array
params = 'ip_version=%s' % constants.IP_VERSION_4
request = self.new_list_request(API_RESOURCE, params=params,
as_admin=True)
response = self.deserialize(self.fmt,
request.get_response(self.ext_api))
self.assertEqual(0, len(response[IP_AVAILS_KEY]))
def test_usages_ports_consumed_v6(self):
cidr_ipv6 = '2001:db8:1002:51::/64'
cidr_ipv6_net = netaddr.IPNetwork(cidr_ipv6)
with self.network() as net:
with self.subnet(
network=net, cidr=cidr_ipv6,
ip_version=constants.IP_VERSION_6,
ipv6_address_mode=constants.DHCPV6_STATELESS) as subnet:
request = self.new_list_request(API_RESOURCE,
as_admin=True)
# Consume 3 ports
with self.port(subnet=subnet),\
self.port(subnet=subnet), \
self.port(subnet=subnet):
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
self._validate_from_availabilities(
response[IP_AVAILS_KEY], net,
cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 3, 3)
def test_usages_multi_net_multi_subnet_46(self):
# Setup mixed v4/v6 networks with IPs consumed on each
cidr_ipv6 = '2001:db8:1003:52::/64'
cidr_ipv6_net = netaddr.IPNetwork(cidr_ipv6)
with self.network(name='net-v6-1') as net_v6_1, \
self.network(name='net-v6-2') as net_v6_2, \
self.network(name='net-v4-1') as net_v4_1, \
self.network(name='net-v4-2') as net_v4_2:
with self.subnet(network=net_v6_1, cidr='2001:db8:1002:51::/64',
ip_version=constants.IP_VERSION_6) as s61, \
self.subnet(network=net_v6_2,
cidr=cidr_ipv6,
ip_version=constants.IP_VERSION_6) as s62, \
self.subnet(network=net_v4_1, cidr='10.0.0.0/24') as s41, \
self.subnet(network=net_v4_2, cidr='10.0.1.0/24') as s42:
with self.port(subnet=s61),\
self.port(subnet=s62), self.port(subnet=s62), \
self.port(subnet=s41), \
self.port(subnet=s42), self.port(subnet=s42):
# Verify consumption across all
request = self.new_list_request(API_RESOURCE,
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
avails_list = response[IP_AVAILS_KEY]
self._validate_from_availabilities(
avails_list, net_v6_1,
cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 1, 1)
self._validate_from_availabilities(
avails_list, net_v6_2,
cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 2, 2)
self._validate_from_availabilities(
avails_list, net_v4_1,
254, 253, 1, 1)
self._validate_from_availabilities(
avails_list, net_v4_2,
254, 253, 2, 2)
# Query by IP versions. Ensure subnet versions match
for ip_ver in [constants.IP_VERSION_4,
constants.IP_VERSION_6]:
params = 'ip_version=%i' % ip_ver
request = self.new_list_request(API_RESOURCE,
params=params,
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
for net_avail in response[IP_AVAILS_KEY]:
for sub in net_avail['subnet_ip_availability']:
self.assertEqual(ip_ver, sub['ip_version'])
# Verify consumption querying 2 network ids (IN clause)
request = self.new_list_request(
API_RESOURCE,
params='network_id=%s&network_id=%s'
% (net_v4_2['network']['id'],
net_v6_2['network']['id']),
as_admin=True)
response = self.deserialize(
self.fmt, request.get_response(self.ext_api))
avails_list = response[IP_AVAILS_KEY]
self._validate_from_availabilities(
avails_list, net_v6_2,
cidr_ipv6_net.size, cidr_ipv6_net.size - 1, 2, 2)
self._validate_from_availabilities(
avails_list, net_v4_2,
254, 253, 2, 2)

View File

@@ -0,0 +1,7 @@
---
features:
- |
Add a new API extension, ``network-ip-availability-details``, that adds
the ``ip_availability_details`` attribute to network IP availabilities.
The value of this attribute contains detailed information about the
network IP usage statistics.