Merge "Add test coverage for update: SOA and NS managed records"

This commit is contained in:
Zuul 2022-05-11 15:36:26 +00:00 committed by Gerrit Code Review
commit 90247e8561
4 changed files with 89 additions and 4 deletions

View File

@ -216,6 +216,19 @@ def rand_ns_records():
return ns_records
def rand_soa_records(number_of_records=2):
return ['{} {} {} {} {} {}.'.format(
'{}.{}.{}'.format(rand_string(3), rand_string(7), rand_string(3)),
random.randint(1000000000, 2020080302), random.randint(3000, 7200),
random.randint(1000, 3600), random.randint(1000000, 1209600),
random.randint(1000, 3600)) for i in range(0, number_of_records)]
def rand_soa_recordset(zone_name, **kwargs):
return rand_recordset_data(
'SOA', zone_name, records=rand_soa_records(), **kwargs)
def rand_tld():
data = {
"name": rand_zone_name(prefix='tld', suffix='')

View File

@ -191,7 +191,8 @@ class DnsClientBase(rest_client.RestClient):
return resp, self.deserialize(resp, body)
def _put_request(self, resource, uuid, data, params=None):
def _put_request(self, resource, uuid, data, params=None,
headers=None, extra_headers=False):
"""Updates the specified object using PUT request.
:param resource: The name of the REST resource, e.g., 'zones'.
:param uuid: Unique identifier of the object in UUID format.
@ -200,11 +201,18 @@ class DnsClientBase(rest_client.RestClient):
is sent as-is.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:param headers (dict): The headers to use for the request.
:param extra_headers (bool): Boolean value than indicates if the
headers returned by the get_headers()
method are to be used but additional
headers are needed in the request
pass them in as a dict.
:returns: Serialized object as a dictionary.
"""
body = self.serialize(data)
uri = self.get_uri(resource, uuid=uuid, params=params)
resp, body = self.put(uri, body=body)
resp, body = self.put(
uri, body=body, headers=headers, extra_headers=extra_headers)
self.expected_success(self.PUT_STATUS_CODES, resp.status)

View File

@ -59,7 +59,8 @@ class RecordsetClient(base.DnsClientV2Base):
@base.handle_errors
def update_recordset(self, zone_uuid, recordset_uuid,
recordet_data, params=None):
recordet_data, params=None,
headers=None, extra_headers=None):
"""Update the recordset related to the specified zone.
:param zone_uuid: Unique identifier of the zone in UUID format.
:param recordset_uuid: Unique identifier of the recordset in UUID
@ -68,11 +69,18 @@ class RecordsetClient(base.DnsClientV2Base):
data.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:param headers (dict): The headers to use for the request.
:param extra_headers (bool): Boolean value than indicates if the
headers returned by the get_headers()
method are to be used but additional
headers are needed in the request
pass them in as a dict.
:return: A tuple with the server response and the created zone.
"""
resp, body = self._put_request(
'zones/{0}/recordsets'.format(zone_uuid), recordset_uuid,
data=recordet_data, params=params)
data=recordet_data, params=params,
headers=headers, extra_headers=extra_headers)
# Update Recordset should Return a HTTP 202, or a 200 if the recordset
# is already active

View File

@ -862,3 +862,59 @@ class RecordsetOwnershipTest(BaseRecordsetsTest):
{primary_project_id}, project_ids_api,
'Failed, unique project_ids {} are not as expected {}'.format(
project_ids_api, primary_project_id))
class AdminManagedRecordsetTest(BaseRecordsetsTest):
credentials = ["primary", "admin", "system_admin"]
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
cls.set_network_resources()
super(AdminManagedRecordsetTest, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(AdminManagedRecordsetTest, cls).setup_clients()
if CONF.enforce_scope.designate:
cls.admin_client = cls.os_system_admin.dns_v2.RecordsetClient()
else:
cls.admin_client = cls.os_admin.dns_v2.RecordsetClient()
cls.client = cls.os_primary.dns_v2.RecordsetClient()
cls.zone_client = cls.os_primary.dns_v2.ZonesClient()
@decorators.idempotent_id('84164ff4-8e68-11ec-983f-201e8823901f')
def test_admin_updates_soa_and_ns_recordsets(self):
# HTTP headers to be used in the test
sudo_header = {'X-Auth-All-Projects': True}
managed_records_header = {'X-Designate-Edit-Managed-Records': True}
sudo_managed_headers = sudo_header.copy()
sudo_managed_headers.update(managed_records_header)
LOG.info('Primary user creates a Zone')
zone = self.zone_client.create_zone(
description='Zone for "managed recordsets update" test',
wait_until=const.ACTIVE)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
recordsets = self.admin_client.list_recordset(
zone['id'], headers=sudo_header)[1]['recordsets']
LOG.info('As Admin try to update SOA and NS recordsets,'
' Expected not allowed')
for recordset in recordsets:
if recordset['type'] == 'NS':
self.assertRaisesDns(
lib_exc.BadRequest, 'bad_request', 400,
self.admin_client.update_recordset,
zone['id'], recordset['id'],
recordet_data=data_utils.rand_ns_records(),
headers=sudo_managed_headers, extra_headers=True)
if recordset['type'] == 'SOA':
self.assertRaisesDns(
lib_exc.BadRequest, 'bad_request', 400,
self.admin_client.update_recordset,
zone['id'], recordset['id'],
recordet_data=data_utils.rand_soa_recordset(zone['name']),
headers=sudo_managed_headers, extra_headers=True)