Merge "Admin network address pool robustness"

This commit is contained in:
Zuul 2023-03-15 14:13:59 +00:00 committed by Gerrit Code Review
commit bad5dee5ed
7 changed files with 403 additions and 8 deletions

View File

@ -338,6 +338,12 @@ class AddressPoolController(rest.RestController):
self._check_valid_range(network, start, end, ipset)
ipset.update(netaddr.IPRange(start, end))
def _check_valid_address(self, addrpool_dict, address):
subnet = netaddr.IPNetwork(
addrpool_dict['network'] + "/" + str(addrpool_dict['prefix']))
addr = netaddr.IPAddress(address)
utils.is_valid_address_within_subnet(addr, subnet)
def _check_pool_readonly(self, addrpool):
# The admin and system controller address pools which exist on the
# subcloud are expected for re-home a subcloud to new system controllers.
@ -487,18 +493,22 @@ class AddressPoolController(rest.RestController):
# Create addresses if specified
if floating_address:
self._check_valid_address(addrpool_dict, floating_address)
f_addr = self._address_create(addrpool_dict, floating_address)
addrpool_dict[ADDRPOOL_FLOATING_ADDRESS_ID] = f_addr.id
if controller0_address:
self._check_valid_address(addrpool_dict, controller0_address)
c0_addr = self._address_create(addrpool_dict, controller0_address)
addrpool_dict[ADDRPOOL_CONTROLLER0_ADDRESS_ID] = c0_addr.id
if controller1_address:
self._check_valid_address(addrpool_dict, controller1_address)
c1_addr = self._address_create(addrpool_dict, controller1_address)
addrpool_dict[ADDRPOOL_CONTROLLER1_ADDRESS_ID] = c1_addr.id
if gateway_address:
self._check_valid_address(addrpool_dict, gateway_address)
g_addr = self._address_create(addrpool_dict, gateway_address)
addrpool_dict[ADDRPOOL_GATEWAY_ADDRESS_ID] = g_addr.id

View File

@ -163,7 +163,8 @@ class InterfaceNetworkController(rest.RestController):
# Update address mode based on network type
if network_type in [constants.NETWORK_TYPE_MGMT,
constants.NETWORK_TYPE_OAM,
constants.NETWORK_TYPE_CLUSTER_HOST]:
constants.NETWORK_TYPE_CLUSTER_HOST,
constants.NETWORK_TYPE_ADMIN]:
pool_uuid = pecan.request.dbapi.network_get_by_type(network_type).pool_uuid
pool = pecan.request.dbapi.address_pool_get(pool_uuid)
if pool.family == constants.IPV4_FAMILY:

View File

@ -221,10 +221,24 @@ class NetworkController(rest.RestController):
return addresses
def _create_admin_network_address(self, pool):
addresses = collections.OrderedDict()
addresses[constants.CONTROLLER_HOSTNAME] = None
addresses[constants.CONTROLLER_0_HOSTNAME] = None
addresses[constants.CONTROLLER_1_HOSTNAME] = None
addresses = {}
if pool.floating_address:
addresses.update(
{constants.CONTROLLER_HOSTNAME: pool.floating_address})
else:
addresses.update({constants.CONTROLLER_HOSTNAME: None})
if pool.controller0_address:
addresses.update(
{constants.CONTROLLER_0_HOSTNAME: pool.controller0_address})
else:
addresses.update({constants.CONTROLLER_0_HOSTNAME: None})
if pool.controller1_address:
addresses.update(
{constants.CONTROLLER_1_HOSTNAME: pool.controller1_address})
else:
addresses.update({constants.CONTROLLER_1_HOSTNAME: None})
if pool.gateway_address is not None:
if utils.get_distributed_cloud_role() == \

View File

@ -771,7 +771,7 @@ class AddressesStillExist(Conflict):
class AddressPoolAlreadyExists(Conflict):
message = _("Address pool %(uuid)s already exists")
message = _("Address pool %(name)s already exists")
class AddressPoolFamilyMismatch(Conflict):

View File

@ -5688,7 +5688,7 @@ class Connection(api.Connection):
session.add(address_pool)
session.flush()
except db_exc.DBDuplicateEntry:
raise exception.AddressPoolAlreadyExists(uuid=values['uuid'])
raise exception.AddressPoolAlreadyExists(name=values['name'])
return self._address_pool_get(values['uuid'])
def _address_pool_range_update(self, session, address_pool, ranges):

View File

@ -0,0 +1,366 @@
#
# Copyright (c) 2023 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Tests for the API / address pool / methods.
"""
import netaddr
from six.moves import http_client
from oslo_utils import uuidutils
from sysinv.tests.api import base
from sysinv.tests.db import base as dbbase
from sysinv.tests.db import utils as dbutils
class AddressPoolTestCase(base.FunctionalTest, dbbase.BaseHostTestCase):
# API_HEADERS are a generic header passed to most API calls
API_HEADERS = {'User-Agent': 'sysinv-test'}
# API_PREFIX is the prefix for the URL
API_PREFIX = '/addrpools'
# RESULT_KEY is the python table key for the list of results
RESULT_KEY = 'addrpools'
# COMMON_FIELD is a field that is known to exist for inputs and outputs
COMMON_FIELD = 'network'
# expected_api_fields are attributes that should be populated by
# an API query
expected_api_fields = ['id',
'uuid',
'name',
'network',
'prefix',
'order',
'controller0_address',
'controller0_address_id',
'controller1_address',
'controller1_address_id',
'floating_address',
'floating_address_id',
'gateway_address',
'gateway_address_id',
'ranges',
]
def setUp(self):
super(AddressPoolTestCase, self).setUp()
def get_single_url(self, uuid):
return '%s/%s' % (self.API_PREFIX, uuid)
def assert_fields(self, api_object):
# check the uuid is a uuid
assert(uuidutils.is_uuid_like(api_object['uuid']))
# Verify that expected attributes are returned
for field in self.expected_api_fields:
self.assertIn(field, api_object)
def get_post_object(self, name, network, prefix):
pool_db = dbutils.get_test_address_pool(
name=name,
network=network,
prefix=prefix
)
del pool_db['family']
return pool_db
def _create_db_object(self, name='testpool'):
return self._create_test_address_pool(
name=name,
subnet=self.mgmt_subnet
)
class TestPostMixin(AddressPoolTestCase):
def setUp(self):
super(TestPostMixin, self).setUp()
def _test_create_address_pool_success(self, name, network, prefix):
# Test creation of object
ndict = self.get_post_object(name, network, prefix)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
# Check that an expected field matches.
self.assertEqual(response.json[self.COMMON_FIELD],
ndict[self.COMMON_FIELD])
uuid = response.json['uuid']
# Verify that the object was created and some basic attribute matches
response = self.get_json(self.get_single_url(uuid))
self.assertEqual(response[self.COMMON_FIELD],
ndict[self.COMMON_FIELD])
def _test_create_address_pool_fail_duplicate(self, name, network, prefix):
ndict = self.get_post_object(name, network, prefix)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS)
# Check HTTP response is successful
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.OK)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.CONFLICT)
self.assertIn("Address pool %s already exists" % name,
response.json['error_message'])
def _test_create_address_pool_address_not_in_subnet(self, addr_type):
address = str(self.oam_subnet[1])
network = str(self.mgmt_subnet.network)
prefix = self.mgmt_subnet.prefixlen
ndict = self.get_post_object('test', network, prefix)
ndict['%s_address' % addr_type] = address
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
self.assertIn("IP Address %s is not in subnet: %s/%d" % (address, network, prefix),
response.json['error_message'])
def _test_create_address_pool_invalid_address_family(self, addr_type):
if self.mgmt_subnet.version == 6:
address = netaddr.IPAddress('1.1.1.1')
else:
address = netaddr.IPAddress('1111::1')
network = str(self.mgmt_subnet.network)
prefix = self.mgmt_subnet.prefixlen
ndict = self.get_post_object('test', network, prefix)
ndict['%s_address' % addr_type] = str(address)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
self.assertIn("Invalid IP version %s %s" % (address.version, network),
response.json['error_message'])
def _test_create_address_pool_invalid_address_network(self, addr_type):
# network address ie. 192.168.101.0
address = str(self.mgmt_subnet[0])
network = str(self.mgmt_subnet.network)
prefix = self.mgmt_subnet.prefixlen
ndict = self.get_post_object('test', network, prefix)
ndict['%s_address' % addr_type] = str(address)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
self.assertIn("Invalid IP address: %s. Cannot use network address: %s." % (address, network),
response.json['error_message'])
def _test_create_address_pool_invalid_address_broadcast(self, addr_type):
# broadcast address ie. 192.168.101.255
address = str(self.mgmt_subnet[-1])
network = str(self.mgmt_subnet.network)
prefix = self.mgmt_subnet.prefixlen
ndict = self.get_post_object('test', network, prefix)
ndict['%s_address' % addr_type] = str(address)
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.BAD_REQUEST)
self.assertIn("Cannot use broadcast address: %s." % address,
response.json['error_message'])
def test_address_pool_create_success(self):
self._test_create_address_pool_success(
'test', str(self.mgmt_subnet.network), self.mgmt_subnet.prefixlen)
def test_address_pool_create_fail_duplicate(self):
self._test_create_address_pool_fail_duplicate(
'test', str(self.mgmt_subnet.network), self.mgmt_subnet.prefixlen)
def test_address_pool_create_reversed_ranges(self):
start = str(self.mgmt_subnet[-2])
end = str(self.mgmt_subnet[1])
network = str(self.mgmt_subnet.network)
prefix = self.mgmt_subnet.prefixlen
ndict = self.get_post_object('test', network, prefix)
ndict['ranges'] = [[start, end]]
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.CONFLICT)
self.assertIn("start address must be less than end address",
response.json['error_message'])
def test_address_pool_create_invalid_ranges(self):
start = str(self.mgmt_subnet[1])
end = str(self.oam_subnet[-2])
network = str(self.mgmt_subnet.network)
prefix = self.mgmt_subnet.prefixlen
ndict = self.get_post_object('test', network, prefix)
ndict['ranges'] = [[start, end]]
response = self.post_json(self.API_PREFIX,
ndict,
headers=self.API_HEADERS,
expect_errors=True)
# Check HTTP response is failed
self.assertEqual('application/json', response.content_type)
self.assertEqual(response.status_code, http_client.CONFLICT)
self.assertIn("Address %s is not within network %s/%d" % (end, network, prefix),
response.json['error_message'])
def test_address_pool_create_floating_ip_not_in_subnet(self):
self._test_create_address_pool_address_not_in_subnet('floating')
def test_address_pool_create_floating_ip_has_invalid_family(self):
self._test_create_address_pool_invalid_address_family('floating')
def test_address_pool_create_floating_ip_is_network_address(self):
self._test_create_address_pool_invalid_address_network('floating')
def test_address_pool_create_floating_ip_is_broadcast(self):
self._test_create_address_pool_invalid_address_broadcast('floating')
def test_address_pool_create_controller0_ip_not_in_subnet(self):
self._test_create_address_pool_address_not_in_subnet('controller0')
def test_address_pool_create_controller0_ip_has_invalid_family(self):
self._test_create_address_pool_invalid_address_family('controller0')
def test_address_pool_create_controller0_ip_is_network_address(self):
self._test_create_address_pool_invalid_address_network('controller0')
def test_address_pool_create_controller0_ip_is_broadcast(self):
self._test_create_address_pool_invalid_address_broadcast('controller0')
def test_address_pool_create_controller1_ip_not_in_subnet(self):
self._test_create_address_pool_address_not_in_subnet('controller1')
def test_address_pool_create_controller1_ip_has_invalid_family(self):
self._test_create_address_pool_invalid_address_family('controller1')
def test_address_pool_create_controller1_ip_is_network_address(self):
self._test_create_address_pool_invalid_address_network('controller1')
def test_address_pool_create_controller1_ip_is_broadcast(self):
self._test_create_address_pool_invalid_address_broadcast('controller1')
def test_address_pool_create_gateway_ip_not_in_subnet(self):
self._test_create_address_pool_address_not_in_subnet('gateway')
def test_address_pool_create_gateway_ip_has_invalid_family(self):
self._test_create_address_pool_invalid_address_family('gateway')
def test_address_pool_create_gateway_ip_is_network_address(self):
self._test_create_address_pool_invalid_address_network('gateway')
def test_address_pool_create_gateway_ip_is_broadcast(self):
self._test_create_address_pool_invalid_address_broadcast('gateway')
class TestDelete(AddressPoolTestCase):
""" Tests deletion.
Typically delete APIs return NO CONTENT.
python2 and python3 libraries may return different
content_type (None, or empty json) when NO_CONTENT returned.
"""
def setUp(self):
super(TestDelete, self).setUp()
def test_address_pool_delete(self):
# Delete the API object
self.delete_object = self._create_db_object()
uuid = self.delete_object.uuid
response = self.delete(self.get_single_url(uuid),
headers=self.API_HEADERS)
# Verify the expected API response for the delete
self.assertEqual(response.status_code, http_client.NO_CONTENT)
class TestList(AddressPoolTestCase):
""" Address Pool list operations
"""
def setUp(self):
super(TestList, self).setUp()
def test_address_pool_list(self):
current_pools = self.get_json(self.API_PREFIX)
num = len(current_pools[self.RESULT_KEY]) + 1
# create a single object
self.single_object = self._create_db_object()
response = self.get_json(self.API_PREFIX)
self.assertEqual(
num,
len(response[self.RESULT_KEY]))
class IPv4TestPost(TestPostMixin,
AddressPoolTestCase):
pass
class IPv6TestPost(TestPostMixin,
dbbase.BaseIPv6Mixin,
AddressPoolTestCase):
pass

View File

@ -668,7 +668,11 @@ def get_test_address_pool(**kw):
'ranges': kw.get('ranges'),
'prefix': kw.get('prefix'),
'order': kw.get('order', 'random'),
'uuid': kw.get('uuid')
'uuid': kw.get('uuid'),
'controller0_address': kw.get('controller0_address'),
'controller1_address': kw.get('controller1_address'),
'floating_address': kw.get('floating_address'),
'gateway_address': kw.get('gateway_address'),
}
return inv