Merge "New API test cases for a Zone test suite."

This commit is contained in:
Zuul 2021-05-11 16:48:32 +00:00 committed by Gerrit Code Review
commit 3675bd53b0
5 changed files with 311 additions and 12 deletions

View File

@ -19,3 +19,7 @@ ERROR = 'ERROR'
DELETED = 'DELETED'
ACTIVE = 'ACTIVE'
UP = 'UP'
# Zone types
PRIMARY_ZONE_TYPE = 'PRIMARY'
SECONDARY_ZONE_TYPE = 'SECONDARY'

View File

@ -63,12 +63,15 @@ class DnsClientBase(rest_client.RestClient):
return json.dumps(data)
def deserialize(self, resp, object_str):
if 'application/json' in resp['content-type']:
return json.loads(object_str)
elif 'text/dns' in resp['content-type']:
return models.ZoneFile.from_text(object_str.decode("utf-8"))
if 'content-type' in resp.keys():
if 'application/json' in resp['content-type']:
return json.loads(object_str)
elif 'text/dns' in resp['content-type']:
return models.ZoneFile.from_text(object_str.decode("utf-8"))
else:
raise lib_exc.InvalidContentType()
else:
raise lib_exc.InvalidContentType()
return None
@classmethod
def expected_success(cls, expected_code, read_code):
@ -103,7 +106,8 @@ class DnsClientBase(rest_client.RestClient):
params=params)
def _create_request(self, resource, data=None, params=None,
headers=None, extra_headers=False):
headers=None, extra_headers=False,
expected_statuses=None):
"""Create an object of the specified type.
:param resource: The name of the REST resource, e.g., 'zones'.
:param data: A Python dict that represents an object of the
@ -117,6 +121,9 @@ class DnsClientBase(rest_client.RestClient):
method are to be used but additional
headers are needed in the request
pass them in as a dict.
:param expected_statuses: If set, it will override the default expected
statuses list with the status codes provided
by caller function
:returns: A tuple with the server response and the deserialized created
object.
"""
@ -125,7 +132,11 @@ class DnsClientBase(rest_client.RestClient):
resp, body = self.post(uri, body=body, headers=headers,
extra_headers=extra_headers)
self.expected_success(self.CREATE_STATUS_CODES, resp.status)
if expected_statuses is None:
self.expected_success(self.CREATE_STATUS_CODES, resp.status)
else:
self.expected_success(expected_statuses, resp.status)
return resp, self.deserialize(resp, body)

View File

@ -13,6 +13,8 @@
# under the License.
from tempest.lib.common.utils import data_utils
from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.common import waiters
from designate_tempest_plugin.services.dns.v2.json import base
@ -23,7 +25,10 @@ class ZonesClient(base.DnsClientV2Base):
@base.handle_errors
def create_zone(self, name=None, email=None, ttl=None, description=None,
attributes=None, wait_until=False, params=None):
attributes=None, wait_until=False,
zone_type=const.PRIMARY_ZONE_TYPE,
primaries=None, params=None):
"""Create a zone with the specified parameters.
:param name: The name of the zone.
@ -39,10 +44,15 @@ class ZonesClient(base.DnsClientV2Base):
This information can be used by the scheduler to place
zones on the correct pool.
:param wait_until: Block until the zone reaches the desiered status
:param zone_type: PRIMARY or SECONDARY
Default: PRIMARY
:param primaries: List of Primary nameservers. Required for SECONDARY
Default: None
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:return: A tuple with the server response and the created zone.
"""
zone = {
'name': name or dns_data_utils.rand_zone_name(),
'email': email or dns_data_utils.rand_email(),
@ -51,7 +61,17 @@ class ZonesClient(base.DnsClientV2Base):
'attributes': attributes or {
'attribute_key': data_utils.rand_name('attribute_value')}
}
# If SECONDARY, "email" and "ttl" cannot be supplied
if zone_type == const.SECONDARY_ZONE_TYPE:
zone['type'] = zone_type
del zone['email']
del zone['ttl']
if primaries is None:
raise AttributeError(
'Error - "primaries" is mandatory parameter'
' for a SECONDARY zone type')
zone['masters'] = primaries
resp, body = self._create_request('zones', zone, params=params)
# Create Zone should Return a HTTP 202
@ -74,6 +94,18 @@ class ZonesClient(base.DnsClientV2Base):
return self._show_request(
'zones', uuid, params=params, headers=headers)
@base.handle_errors
def show_zone_nameservers(self, zone_uuid, params=None):
"""Gets list of Zone Name Servers
:param zone_uuid: Unique identifier of the zone in UUID format.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:return: Serialized nameservers as a list.
"""
return self._show_request(
'zones/{0}/nameservers'.format(zone_uuid), uuid=None,
params=params)
@base.handle_errors
def list_zones(self, params=None, headers=None):
"""Gets a list of zones.
@ -130,3 +162,34 @@ class ZonesClient(base.DnsClientV2Base):
waiters.wait_for_zone_status(self, body['id'], wait_until)
return resp, body
@base.handle_errors
def trigger_manual_update(self, zone_id, headers=None):
"""Trigger manually update for secondary zone.
:param zone_id: Secondary zone ID.
:param headers (dict): The headers to use for the request.
:return: A tuple with the server response and body.
"""
resp, body = self._create_request(
'zones/{}/tasks/xfr'.format(zone_id), headers=headers)
# Trigger Zone Update should Return a HTTP 202
self.expected_success(202, resp.status)
return resp, body
@base.handle_errors
def abandon_zone(self, zone_id, headers=None):
"""This removes a zone from the designate database without removing
it from the backends.
:param zone_id: Zone ID.
:param headers (dict): The headers to use for the request.
:return: A tuple with the server response and body.
"""
resp, body = self._create_request(
'zones/{}/tasks/abandon'.format(zone_id),
headers=headers,
expected_statuses=self.DELETE_STATUS_CODES)
self.expected_success(self.DELETE_STATUS_CODES, resp.status)
return resp, body

View File

@ -0,0 +1,175 @@
# Copyright 2021 Red Hat.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from socket import gaierror
from oslo_log import log as logging
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin.common import waiters
from designate_tempest_plugin.tests import base
from designate_tempest_plugin.services.dns.query.query_client \
import SingleQueryClient
LOG = logging.getLogger(__name__)
class BaseZonesTest(base.BaseDnsV2Test):
excluded_keys = ['created_at', 'updated_at', 'version', 'links',
'status', 'action']
class ZoneTasks(BaseZonesTest):
credentials = ['primary', 'alt', 'admin']
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
cls.set_network_resources()
super(ZoneTasks, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(ZoneTasks, cls).setup_clients()
cls.client = cls.os_primary.zones_client
cls.alt_client = cls.os_alt.zones_client
cls.admin_client = cls.os_admin.zones_client
cls.query_client = cls.os_primary.query_client
@decorators.idempotent_id('287e2cd0-a0e7-11eb-b962-74e5f9e2a801')
def test_zone_abandon(self):
LOG.info('Create a PRIMARY zone')
pr_zone = self.client.create_zone()[1]
LOG.info('Ensure we respond with CREATE+PENDING')
self.assertEqual('CREATE', pr_zone['action'])
self.assertEqual('PENDING', pr_zone['status'])
LOG.info('Fetch the zone')
self.client.show_zone(pr_zone['id'])
LOG.info('Check that the zone was created on Nameserver/BIND')
waiters.wait_for_query(self.query_client, pr_zone['name'], "SOA")
LOG.info('Abandon a zone')
self.admin_client.abandon_zone(
pr_zone['id'],
headers={'x-auth-sudo-project-id': pr_zone['project_id']})
LOG.info('Wait for the zone to become 404/NotFound in Designate')
waiters.wait_for_zone_404(self.client, pr_zone['id'])
LOG.info('Check that the zone is still exists in Nameserver/BIND')
waiters.wait_for_query(
self.query_client, pr_zone['name'], "SOA")
@decorators.idempotent_id('90b21d1a-a1ba-11eb-84fa-74e5f9e2a801')
def test_zone_abandon_forbidden(self):
LOG.info('Create a PRIMARY zone and add to the cleanup')
pr_zone = self.client.create_zone()[1]
self.addCleanup(self.wait_zone_delete, self.client, pr_zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
self.assertEqual('CREATE', pr_zone['action'])
self.assertEqual('PENDING', pr_zone['status'])
LOG.info('Fetch the zone')
self.client.show_zone(pr_zone['id'])
LOG.info('Check that the zone was created on Nameserver/BIND')
waiters.wait_for_query(self.query_client, pr_zone['name'], "SOA")
LOG.info('Abandon a zone as primary client, Expected: should '
'fail with: 403 forbidden')
self.assertRaises(
lib_exc.Forbidden, self.client.abandon_zone,
zone_id=pr_zone['id'])
class ZoneTasksNegative(BaseZonesTest):
credentials = ['primary', 'alt', 'admin']
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
cls.set_network_resources()
super(ZoneTasksNegative, cls).setup_credentials()
@classmethod
def setup_clients(cls):
super(ZoneTasksNegative, cls).setup_clients()
cls.client = cls.os_primary.zones_client
cls.alt_client = cls.os_alt.zones_client
cls.admin_client = cls.os_admin.zones_client
cls.query_client = cls.os_primary.query_client
def _query_nameserver(self, nameserver, query_timeout,
zone_name, zone_type='SOA'):
query_succeeded = False
ns_obj = SingleQueryClient(nameserver, query_timeout)
try:
ns_obj.query(zone_name, zone_type)
query_succeeded = True
except gaierror as e:
LOG.info('Function "_query_nameserver" failed with:{} '.format(e))
return query_succeeded
@decorators.idempotent_id('ca250d92-8a2b-11eb-b49b-74e5f9e2a801')
def test_manually_trigger_update_secondary_zone_negative(self):
# Create a PRIMARY zone
LOG.info('Create a PRIMARY zone')
pr_zone = self.client.create_zone()[1]
self.addCleanup(self.wait_zone_delete, self.client, pr_zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
self.assertEqual('CREATE', pr_zone['action'])
self.assertEqual('PENDING', pr_zone['status'])
# Get the Name Servers created for a PRIMARY zone
nameservers = [
dic['hostname'] for dic in self.client.show_zone_nameservers(
pr_zone['id'])[1]['nameservers']]
# Make sure that the nameservers are not available using DNS
# query and if it does, skip the test.
LOG.info('Check if NameServers are available, skip the test if not')
for ns in nameservers:
if self._query_nameserver(
ns, 5, pr_zone['name'], zone_type='SOA') is True:
raise self.skipException(
"Nameserver:{} is available, but negative test scenario "
"needs it to be unavailable, therefore test is "
"skipped.".format(ns.strip('.')))
# Create a SECONDARY zone
LOG.info('Create a SECONDARY zone')
sec_zone = self.client.create_zone(
zone_type=const.SECONDARY_ZONE_TYPE, primaries=nameservers)[1]
self.addCleanup(self.wait_zone_delete, self.client, sec_zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
self.assertEqual('CREATE', sec_zone['action'])
self.assertEqual('PENDING', sec_zone['status'])
# Manually trigger_update zone
LOG.info('Manually Trigger an Update of a Secondary Zone when the '
'nameservers not pingable. Expected: error status code 500')
with self.assertRaisesDns(lib_exc.ServerFault, 'unknown', 500):
self.client.trigger_manual_update(sec_zone['id'])

View File

@ -14,8 +14,11 @@
import uuid
from oslo_log import log as logging
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions as lib_exc
from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.tests import base
@ -30,6 +33,7 @@ class BaseZonesTest(base.BaseDnsV2Test):
class ZonesTest(BaseZonesTest):
credentials = ['admin', 'primary']
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
@ -41,11 +45,27 @@ class ZonesTest(BaseZonesTest):
super(ZonesTest, cls).setup_clients()
cls.client = cls.os_primary.zones_client
cls.pool_client = cls.os_admin.pool_client
@decorators.idempotent_id('9d2e20fc-e56f-4a62-9c61-9752a9ec615c')
def test_create_zone(self):
LOG.info('Create a zone')
_, zone = self.client.create_zone()
def test_create_zones(self):
# Create a PRIMARY zone
LOG.info('Create a PRIMARY zone')
zone = self.client.create_zone()[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
self.assertEqual('CREATE', zone['action'])
self.assertEqual('PENDING', zone['status'])
# Get the Name Servers (hosts) created in PRIMARY zone
nameservers = self.client.show_zone_nameservers(zone['id'])[1]
nameservers = [dic['hostname'] for dic in nameservers['nameservers']]
# Create a SECONDARY zone
LOG.info('Create a SECONDARY zone')
zone = self.client.create_zone(
zone_type=const.SECONDARY_ZONE_TYPE, primaries=nameservers)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
@ -144,6 +164,32 @@ class ZonesTest(BaseZonesTest):
self.assertRaises(lib_exc.NotFound,
lambda: self.client.get(uri))
@decorators.idempotent_id('d4ce813e-64a5-11eb-9f43-74e5f9e2a801')
def test_get_primary_zone_nameservers(self):
# Create a zone and get the associated "pool_id"
LOG.info('Create a zone')
zone = self.client.create_zone()[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
zone_pool_id = zone['pool_id']
# Get zone's Name Servers using dedicated API request
zone_nameservers = self.client.show_zone_nameservers(zone['id'])[1]
zone_nameservers = zone_nameservers['nameservers']
LOG.info('Zone Name Servers are: {}'.format(zone_nameservers))
self.assertIsNot(
0, len(zone_nameservers),
"Failed - received list of nameservers shouldn't be empty")
# Use "pool_id" to get the Name Servers used
pool = self.pool_client.show_pool(zone_pool_id)[1]
pool_nameservers = pool['ns_records']
LOG.info('Pool nameservers: {}'.format(pool_nameservers))
# Make sure that pool's and zone's Name Servers are same
self.assertCountEqual(
pool_nameservers, zone_nameservers,
'Failed - Pool and Zone nameservers should be the same')
class ZonesAdminTest(BaseZonesTest):
credentials = ['primary', 'admin', 'alt']