Merge "Stabilizing “PTR record” tests suite and adding new test cases"

This commit is contained in:
Zuul 2021-12-08 16:48:21 +00:00 committed by Gerrit Code Review
commit 8c553599c9
5 changed files with 186 additions and 23 deletions

View File

@ -19,6 +19,7 @@ ERROR = 'ERROR'
DELETED = 'DELETED'
ACTIVE = 'ACTIVE'
UP = 'UP'
CREATE = 'CREATE'
# Zone types
PRIMARY_ZONE_TYPE = 'PRIMARY'

View File

@ -233,3 +233,35 @@ def wait_for_query(client, name, rdatatype, found=True):
message = "(%s) %s" % (caller, message)
raise lib_exc.TimeoutException(message)
def wait_for_ptr_status(client, fip_id, status):
"""Waits for a PTR associated with FIP to reach given status."""
LOG.info('Waiting for PTR %s to reach %s', fip_id, status)
ptr = client.show_ptr_record(fip_id)
start = int(time.time())
while ptr['status'] != status:
time.sleep(client.build_interval)
ptr = client.show_ptr_record(fip_id)
status_curr = ptr['status']
if status_curr == status:
LOG.info('PTR %s reached %s', fip_id, status)
return
if int(time.time()) - start >= client.build_timeout:
message = ('PTR for FIP: %(fip_id)s failed to reach '
'status=%(status)s within the required time '
'(%(timeout)s s). Current status: %(status_curr)s' %
{'fip_id': fip_id,
'status': status,
'status_curr': status_curr,
'timeout': client.build_timeout})
caller = test_utils.find_test_caller()
if caller:
message = '(%s) %s' % (caller, message)
raise lib_exc.TimeoutException(message)

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import random
from string import ascii_lowercase
import netaddr
from oslo_log import log as logging
@ -34,7 +35,7 @@ def rand_ipv6():
return an.format(netaddr.ipv6_compact)
def rand_zone_name(name='', prefix=None, suffix='.com.'):
def rand_zone_name(name='', prefix='rand', suffix='.com.'):
"""Generate a random zone name
:param str name: The name that you want to include
:param prefix: the exact text to start the string. Defaults to "rand"
@ -261,3 +262,22 @@ def make_rand_recordset(zone_name, record_type):
func = globals()["rand_{}_recordset".format(record_type.lower())]
return func(zone_name)
def rand_string(size):
"""Create random string of ASCII chars by size
:param int size - length os the string to be create
:return - random creates string of ASCII lover characters
"""
return ''.join(random.choice(ascii_lowercase) for _ in range(size))
def rand_domain_name(tld=None):
"""Create random valid domain name
:param tld (optional) - TLD that will be used to random domain name
:return - valid domain name, for example: paka.zbabun.iuh
"""
domain_tld = tld or rand_string(3)
return rand_string(4) + '.' + rand_string(6) + '.' + domain_tld + '.'

View File

@ -24,7 +24,8 @@ class PtrClient(base.DnsClientV2Base):
@base.handle_errors
def set_ptr_record(self, floatingip_id, ptr_name=None,
ttl=None, description=None, headers=None):
ttl=None, description=None, headers=None,
tld=None):
"""Set a PTR record for the given FloatingIP
:param floatingip_id: valid UUID of floating IP to be used.
@ -32,13 +33,13 @@ class PtrClient(base.DnsClientV2Base):
:param ttl TTL or random valid value if not provided.
:param description Description or random if not provided.
:param headers (dict): The headers to use for the request.
:param tld, the TLD to be used in ptrdname generated value.
:return: created PTR dictionary.
"""
ptr = {
'ptrdname': ptr_name or dns_data_utils.rand_zone_name(),
'ptrdname': ptr_name or dns_data_utils.rand_domain_name(tld),
'ttl': ttl or dns_data_utils.rand_ttl(),
'description': description or data_utils.rand_name(
'test-ptr')}
'description': description or data_utils.rand_name('test-ptr')}
return self._update_request(
resource='reverse/floatingips/{}'.format(CONF.identity.region),

View File

@ -13,15 +13,22 @@
# under the License.
from oslo_log import log as logging
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from designate_tempest_plugin.tests import base
from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin.common import waiters
from designate_tempest_plugin import data_utils as dns_data_utils
import tempest.test
CONF = config.CONF
LOG = logging.getLogger(__name__)
TLD = dns_data_utils.rand_string(3)
class BasePtrTest(base.BaseDnsV2Test):
excluded_keys = ['created_at', 'updated_at', 'version', 'links',
@ -29,7 +36,8 @@ class BasePtrTest(base.BaseDnsV2Test):
class DesignatePtrRecord(BasePtrTest, tempest.test.BaseTestCase):
credentials = ["primary"]
credentials = ['primary', 'admin']
@classmethod
def setup_credentials(cls):
@ -41,20 +49,33 @@ class DesignatePtrRecord(BasePtrTest, tempest.test.BaseTestCase):
def setup_clients(cls):
super(DesignatePtrRecord, cls).setup_clients()
cls.primary_ptr_client = cls.os_primary.dns_v2.PtrClient()
cls.admin_ptr_client = cls.os_admin.dns_v2.PtrClient()
cls.primary_floating_ip_client = cls.os_primary.floating_ips_client
def _set_ptr(self):
fip = self.primary_floating_ip_client.create_floatingip(
floating_network_id=CONF.network.public_network_id)['floatingip']
fip_id = fip['id']
self.addCleanup(self.primary_floating_ip_client.delete_floatingip,
fip_id)
ptr = self.primary_ptr_client.set_ptr_record(fip_id)
def _set_ptr(self, ptr_name=None, ttl=None, description=None,
headers=None, tld=TLD, fip_id=None):
if not fip_id:
fip = self.primary_floating_ip_client.create_floatingip(
floating_network_id=CONF.network.public_network_id)[
'floatingip']
fip_id = fip['id']
self.addCleanup(
self.primary_floating_ip_client.delete_floatingip, fip_id)
ptr = self.primary_ptr_client.set_ptr_record(
fip_id, ptr_name=ptr_name, ttl=ttl, description=description,
headers=headers, tld=tld)
self.addCleanup(self.primary_ptr_client.unset_ptr_record, fip_id)
self.assertEqual('CREATE', ptr['action'])
self.assertEqual('PENDING', ptr['status'])
waiters.wait_for_ptr_status(
self.primary_ptr_client, fip_id=fip_id, status=const.ACTIVE)
return fip_id, ptr
def _unset_ptr(self, fip_id):
self.primary_ptr_client.unset_ptr_record(fip_id)
waiters.wait_for_ptr_status(
self.primary_ptr_client, fip_id=fip_id, status=const.DELETED)
@decorators.idempotent_id('2fb9d6ea-871d-11eb-9f9a-74e5f9e2a801')
def test_set_floatingip_ptr(self):
self._set_ptr()
@ -66,6 +87,19 @@ class DesignatePtrRecord(BasePtrTest, tempest.test.BaseTestCase):
floatingip_id=fip_id)
self.assertExpected(ptr, show_ptr, self.excluded_keys)
@decorators.idempotent_id('d3128a92-e3bd-11eb-a097-74e5f9e2a801')
def test_show_floatingip_ptr_impersonate_another_project(self):
fip_id, ptr = self._set_ptr()
LOG.info('As Admin user, show PTR record created by Primary'
' user by including "x-auth-sudo-project-id" HTTP header'
' in HTTP request.')
show_ptr = self.admin_ptr_client.show_ptr_record(
floatingip_id=fip_id,
headers={
'x-auth-sudo-project-id': self.primary_ptr_client.project_id})
self.assertExpected(ptr, show_ptr, self.excluded_keys)
@decorators.idempotent_id('9187a9c6-87d4-11eb-9f9a-74e5f9e2a801')
def test_list_floatingip_ptr_records(self):
number_of_ptr_records = 3
@ -81,14 +115,34 @@ class DesignatePtrRecord(BasePtrTest, tempest.test.BaseTestCase):
'Failed - received PTR IDs: {} are not as'
' expected: {}'.format(created_ptr_ids, received_ptr_ids))
@decorators.idempotent_id('a108d6f2-e3c0-11eb-a097-74e5f9e2a801')
@decorators.skip_because(bug="1935977")
def test_list_floatingip_ptr_all_projects(self):
ptr = self._set_ptr()[1]
LOG.info('Created PTR is:{}'.format(ptr))
LOG.info('As Admin user, try to list PTR record for all projects '
'by including "x-auth-all-projects" HTTP header.')
received_ptr_ids = [
item['id'] for item in self.admin_ptr_client.list_ptr_records(
headers={'x-auth-all-projects': True})]
self.assertGreater(
len(received_ptr_ids), 0,
'Failed, "received_ptr_ids" should not be empty')
self.assertIn(
ptr['id'], received_ptr_ids,
'Failed, expected ID was not found in "received_ptr_ids" list.')
@decorators.idempotent_id('499b5a7e-87e1-11eb-b412-74e5f9e2a801')
@decorators.skip_because(bug="1932026")
def test_unset_floatingip_ptr(self):
fip_id, ptr = self._set_ptr()
self.primary_ptr_client.unset_ptr_record(fip_id)
self._unset_ptr(fip_id)
class DesignatePtrRecordNegative(BasePtrTest, tempest.test.BaseTestCase):
credentials = ["primary"]
credentials = ['primary', 'admin']
@classmethod
def setup_credentials(cls):
@ -101,24 +155,79 @@ class DesignatePtrRecordNegative(BasePtrTest, tempest.test.BaseTestCase):
super(DesignatePtrRecordNegative, cls).setup_clients()
cls.primary_ptr_client = cls.os_primary.dns_v2.PtrClient()
cls.primary_floating_ip_client = cls.os_primary.floating_ips_client
cls.admin_ptr_client = cls.os_admin.dns_v2.PtrClient()
def _set_ptr(self, ptr_name=None, ttl=None, description=None,
headers=None):
fip = self.primary_floating_ip_client.create_floatingip(
floating_network_id=CONF.network.public_network_id)[
'floatingip']
fip_id = fip['id']
self.addCleanup(self.primary_floating_ip_client.delete_floatingip,
fip_id)
headers=None, tld=TLD, fip_id=None):
if not fip_id:
fip = self.primary_floating_ip_client.create_floatingip(
floating_network_id=CONF.network.public_network_id)[
'floatingip']
fip_id = fip['id']
self.addCleanup(
self.primary_floating_ip_client.delete_floatingip, fip_id)
ptr = self.primary_ptr_client.set_ptr_record(
fip_id, ptr_name=ptr_name, ttl=ttl, description=description,
headers=headers)
headers=headers, tld=tld)
self.addCleanup(self.primary_ptr_client.unset_ptr_record, fip_id)
self.assertEqual('CREATE', ptr['action'])
self.assertEqual('PENDING', ptr['status'])
waiters.wait_for_ptr_status(
self.primary_ptr_client, fip_id=fip_id, status=const.ACTIVE)
return fip_id, ptr
@decorators.attr(type='negative')
@decorators.idempotent_id('8392db50-cdd0-11eb-a00f-74e5f9e2a801')
def test_set_floatingip_ptr_invalid_ttl(self):
LOG.info('Try to set PTR record using invalid TTL value')
with self.assertRaisesDns(lib_exc.BadRequest, 'invalid_object', 400):
self._set_ptr(ttl=-10)
@decorators.attr(type='negative')
@decorators.idempotent_id('0c9349ae-e2e8-11eb-a097-74e5f9e2a801')
def test_set_floatingip_ptr_not_existing_fip_id(self):
LOG.info('Try to set PTR record using not existing Floating IP')
with self.assertRaisesDns(lib_exc.NotFound, 'not_found', 404):
self._set_ptr(fip_id=data_utils.rand_uuid())
@decorators.attr(type='negative')
@decorators.idempotent_id('df217902-e3b2-11eb-a097-74e5f9e2a801')
def test_set_floatingip_ptr_huge_size_description(self):
LOG.info('Try to set PTR record using huge size description string')
with self.assertRaisesDns(lib_exc.BadRequest, 'invalid_object', 400):
self._set_ptr(description=dns_data_utils.rand_string(5000))
@decorators.attr(type='negative')
@decorators.idempotent_id('cb2264e2-e3b3-11eb-a097-74e5f9e2a801')
def test_set_floatingip_ptr_invalid_name(self):
invalid_names = ['', '@!(*&', 4564, dns_data_utils.rand_string(5000)]
for name in invalid_names:
LOG.info('Set PTR record using invalid name:{}'.format(name))
with self.assertRaisesDns(
lib_exc.BadRequest, 'invalid_object', 400):
self._set_ptr(description=dns_data_utils.rand_string(5000))
@decorators.attr(type='negative')
@decorators.idempotent_id('f616d216-51ac-11ec-8edf-201e8823901f')
def test_show_floatingip_ptr_impersonate_another_project_no_header(self):
fip_id, ptr = self._set_ptr()
LOG.info('As Admin user, show PTR record created by Primary'
' user, without including "x-auth-sudo-project-id" '
'HTTP header in request.')
with self.assertRaisesDns(lib_exc.NotFound, 'not_found', 404):
self.admin_ptr_client.show_ptr_record(floatingip_id=fip_id)
@decorators.attr(type='negative')
@decorators.idempotent_id('0d132ff0-51ad-11ec-8edf-201e8823901f')
@decorators.skip_because(bug="1935977")
def test_list_floatingip_ptr_all_projects_no_header(self):
ptr = self._set_ptr()[1]
LOG.info('Created PTR is:{}'.format(ptr))
LOG.info('As Admin user, try to list PTR record for all projects '
'without including "x-auth-all-projects" HTTP header.')
received_ptr_ids = [
item['id'] for item in self.admin_ptr_client.list_ptr_records()]
self.assertEqual([], received_ptr_ids,
'Failed, "received_ptr_ids" list should be empty')