Remove old functional tests

Change-Id: Id7f0fd8d30f1f3baa33da574ed6ff531b4914d2c
This commit is contained in:
Graham Hayes 2017-02-22 14:46:08 -05:00
parent b94ba23f9e
commit 9f5de18bcc
39 changed files with 64 additions and 2387 deletions

View File

@ -15,9 +15,11 @@
# under the License.
import os
import functools
import tempfile
import unittest
import six
import testtools
from mock import Mock
from jinja2 import Template
@ -152,3 +154,64 @@ class SocketListenTest(unittest.TestCase):
for addr in ('', '0.0.0.0', '127.0.0.1', '::', '::1'):
s = utils.bind_udp(addr, 0)
s.close()
def def_method(f, *args, **kwargs):
@functools.wraps(f)
def new_method(self):
return f(self, *args, **kwargs)
return new_method
def parameterized_class(cls):
"""A class decorator for running parameterized test cases.
Mark your class with @parameterized_class.
Mark your test cases with @parameterized.
"""
test_functions = {
k: v for k, v in vars(cls).items() if k.startswith('test')
}
for name, f in test_functions.items():
if not hasattr(f, '_test_data'):
continue
# remove the original test function from the class
delattr(cls, name)
# add a new test function to the class for each entry in f._test_data
for tag, args in f._test_data.items():
new_name = "{0}_{1}".format(f.__name__, tag)
if hasattr(cls, new_name):
raise Exception(
"Parameterized test case '{0}.{1}' created from '{0}.{2}' "
"already exists".format(cls.__name__, new_name, name))
# Using `def new_method(self): f(self, **args)` is not sufficient
# (all new_methods use the same args value due to late binding).
# Instead, use this factory function.
new_method = def_method(f, **args)
# To add a method to a class, available for all instances:
# MyClass.method = types.MethodType(f, None, MyClass)
setattr(cls, new_name, six.create_unbound_method(new_method, cls))
return cls
def parameterized(data):
"""A function decorator for parameterized test cases.
Example:
@parameterized({
'zero': dict(val=0),
'one': dict(val=1),
})
def test_val(self, val):
self.assertEqual(self.get_val(), val)
The above will generate two test cases:
`test_val_zero` which runs with val=0
`test_val_one` which runs with val=1
:param data: A dictionary that looks like {tag: {arg1: val1, ...}}
"""
def wrapped(f):
f._test_data = data
return f
return wrapped

View File

@ -19,7 +19,7 @@ import mock
import testtools
from oslo_config import cfg
from functionaltests.common import utils
import designate.tests.test_utils as utils
from designate import exceptions
from designate.worker.tasks import zone
from designate.worker import processing

View File

@ -1,7 +0,0 @@
import logging
logging.basicConfig(
filename='functional-tests.log',
filemode='w',
level=logging.DEBUG,
)

View File

@ -1,72 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from oslo_config import cfg
from tempest.lib import exceptions
from functionaltests.api.v2.clients import quotas_client
from functionaltests.api.v2.clients import tld_client
from functionaltests.api.v2.models import quotas_model
from functionaltests.api.v2.models import tld_model
from functionaltests.common import base
class DesignateV2Test(base.BaseDesignateTest):
def increase_quotas(self, user):
if cfg.CONF.testconfig.no_admin_setup:
return
quotas_client.QuotasClient.as_user('admin').patch_quotas(
quotas_client.QuotasClient.as_user(user).tenant_id,
quotas_model.QuotasModel.from_dict({
'quota': {
'zones': 9999999,
'recordset_records': 9999999,
'zone_records': 9999999,
'zone_recordsets': 9999999
}
})
)
def ensure_tld_exists(self, tld='com'):
if cfg.CONF.testconfig.no_admin_setup:
return
try:
_tld_model = tld_model.TLDModel.from_dict({'name': tld})
tld_client.TLDClient.as_user('admin').post_tld(_tld_model)
except exceptions.Conflict:
pass
def _assert_invalid_uuid(self, method, *args, **kw):
"""
Test that UUIDs used in the URL is valid.
"""
self._assert_exception(
exceptions.BadRequest, 'invalid_uuid', 400, method, *args)
def _assert_exception(self, exc, type_, status, method, *args, **kwargs):
"""
Checks the response that a api call with a exception contains the
wanted data.
"""
try:
method(*args, **kwargs)
except exc as e:
self.assertEqual(status, e.resp_body['code'])
self.assertEqual(type_, e.resp_body['type'])
return e
else:
raise self.failureException("Test failed due to no exception.")

View File

@ -1,51 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.api.v2.models.blacklist_model import BlacklistModel
from functionaltests.api.v2.models.blacklist_model import BlacklistListModel
from functionaltests.common.client import ClientMixin
class BlacklistClient(ClientMixin):
def blacklists_uri(self, filters=None):
return self.create_uri("/blacklists", filters=filters)
def blacklist_uri(self, blacklist_id):
return "{0}/{1}".format(self.blacklists_uri(), blacklist_id)
def list_blacklists(self, filters=None, **kwargs):
resp, body = self.client.get(self.blacklists_uri(filters), **kwargs)
return self.deserialize(resp, body, BlacklistListModel)
def get_blacklist(self, blacklist_id, **kwargs):
resp, body = self.client.get(self.blacklist_uri(blacklist_id))
return self.deserialize(resp, body, BlacklistModel)
def post_blacklist(self, blacklist_model, **kwargs):
resp, body = self.client.post(
self.blacklists_uri(),
body=blacklist_model.to_json(), **kwargs)
return self.deserialize(resp, body, BlacklistModel)
def patch_blacklist(self, blacklist_id, blacklist_model, **kwargs):
resp, body = self.client.patch(
self.blacklist_uri(blacklist_id),
body=blacklist_model.to_json(), **kwargs)
return self.deserialize(resp, body, BlacklistModel)
def delete_blacklist(self, blacklist_id, **kwargs):
return self.client.delete(self.blacklist_uri(blacklist_id), **kwargs)

View File

@ -1,51 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.api.v2.models.pool_model import PoolModel
from functionaltests.api.v2.models.pool_model import PoolListModel
from functionaltests.common.client import ClientMixin
class PoolClient(ClientMixin):
def pools_uri(self, filters=None):
return self.create_uri("/pools", filters=filters)
def pool_uri(self, pool_id):
return "{0}/{1}".format(self.pools_uri(), pool_id)
def list_pools(self, filters=None, **kwargs):
resp, body = self.client.get(self.pools_uri(filters), **kwargs)
return self.deserialize(resp, body, PoolListModel)
def get_pool(self, pool_id, **kwargs):
resp, body = self.client.get(self.pool_uri(pool_id))
return self.deserialize(resp, body, PoolModel)
def post_pool(self, pool_model, **kwargs):
resp, body = self.client.post(
self.pools_uri(),
body=pool_model.to_json(), **kwargs)
return self.deserialize(resp, body, PoolModel)
def patch_pool(self, pool_id, pool_model, **kwargs):
resp, body = self.client.patch(
self.pool_uri(pool_id),
body=pool_model.to_json(), **kwargs)
return self.deserialize(resp, body, PoolModel)
def delete_pool(self, pool_id, **kwargs):
return self.client.delete(self.pool_uri(pool_id), **kwargs)

View File

@ -1,43 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v2.models.quotas_model import QuotasModel
from functionaltests.common.client import ClientMixin
class QuotasClient(ClientMixin):
def quotas_uri(self, tenant_id, filters=None):
url = "/admin/quotas/{0}".format(tenant_id)
if filters:
url = self.add_filters(url, filters)
return url
def get_quotas(self, tenant_id, filters=None, **kwargs):
resp, body = self.client.get(
self.quotas_uri(tenant_id, filters), **kwargs)
return self.deserialize(resp, body, QuotasModel)
def patch_quotas(self, tenant_id, quotas_model, **kwargs):
resp, body = self.client.patch(
self.quotas_uri(tenant_id),
body=quotas_model.to_json(),
**kwargs)
return self.deserialize(resp, body, QuotasModel)
def delete_quotas(self, tenant_id, **kwargs):
resp, body = self.client.patch(self.quotas_uri(tenant_id), **kwargs)
return self.deserialize(resp, body, QuotasModel)

View File

@ -1,89 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from tempest.lib.exceptions import NotFound
from functionaltests.api.v2.models.recordset_model import RecordsetModel
from functionaltests.api.v2.models.recordset_model import RecordsetListModel
from functionaltests.common.client import ClientMixin
from functionaltests.common import utils
class RecordsetClient(ClientMixin):
def recordsets_uri(self, zone_id, cross_zone=False, filters=None):
if cross_zone:
uri = self.create_uri("/recordsets", filters=filters)
else:
uri = self.create_uri("/zones/{0}/recordsets".format(zone_id),
filters=filters)
return uri
def recordset_uri(self, zone_id, recordset_id, cross_zone=False):
return "{0}/{1}".format(self.recordsets_uri(zone_id, cross_zone),
recordset_id)
def list_recordsets(self, zone_id, cross_zone=False, filters=None,
**kwargs):
resp, body = self.client.get(
self.recordsets_uri(zone_id, cross_zone, filters), **kwargs)
return self.deserialize(resp, body, RecordsetListModel)
def get_recordset(self, zone_id, recordset_id, cross_zone=False, **kwargs):
resp, body = self.client.get(self.recordset_uri(zone_id, recordset_id,
cross_zone),
**kwargs)
return self.deserialize(resp, body, RecordsetModel)
def post_recordset(self, zone_id, recordset_model, **kwargs):
resp, body = self.client.post(self.recordsets_uri(zone_id),
body=recordset_model.to_json(), **kwargs)
return self.deserialize(resp, body, RecordsetModel)
def put_recordset(self, zone_id, recordset_id, recordset_model, **kwargs):
resp, body = self.client.put(self.recordset_uri(zone_id, recordset_id),
body=recordset_model.to_json(), **kwargs)
return self.deserialize(resp, body, RecordsetModel)
def delete_recordset(self, zone_id, recordset_id, **kwargs):
resp, body = self.client.delete(
self.recordset_uri(zone_id, recordset_id), **kwargs)
return self.deserialize(resp, body, RecordsetModel)
def wait_for_recordset(self, zone_id, recordset_id):
utils.wait_for_condition(
lambda: self.is_recordset_active(zone_id, recordset_id))
def wait_for_404(self, zone_id, recordset_id):
utils.wait_for_condition(
lambda: self.is_recordset_404(zone_id, recordset_id))
def is_recordset_active(self, zone_id, recordset_id):
resp, model = self.get_recordset(
zone_id, recordset_id)
assert resp.status == 200
if model.status == 'ACTIVE':
return True
elif model.status == 'ERROR':
raise Exception("Saw ERROR status")
return False
def is_recordset_404(self, zone_id, recordset_id):
try:
self.get_recordset(zone_id, recordset_id)
except NotFound:
return True
return False

View File

@ -1,51 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.api.v2.models.tld_model import TLDModel
from functionaltests.api.v2.models.tld_model import TLDListModel
from functionaltests.common.client import ClientMixin
class TLDClient(ClientMixin):
def tlds_uri(self, filters=None):
return self.create_uri("/tlds", filters=filters)
def tld_uri(self, tld_id):
return "{0}/{1}".format(self.tlds_uri(), tld_id)
def list_tlds(self, filters=None, **kwargs):
resp, body = self.client.get(self.tlds_uri(filters), **kwargs)
return self.deserialize(resp, body, TLDListModel)
def get_tld(self, tld_id, **kwargs):
resp, body = self.client.get(self.tld_uri(tld_id))
return self.deserialize(resp, body, TLDModel)
def post_tld(self, tld_model, **kwargs):
resp, body = self.client.post(
self.tlds_uri(),
body=tld_model.to_json(), **kwargs)
return self.deserialize(resp, body, TLDModel)
def patch_tld(self, tld_id, tld_model, **kwargs):
resp, body = self.client.patch(
self.tld_uri(tld_id),
body=tld_model.to_json(), **kwargs)
return self.deserialize(resp, body, TLDModel)
def delete_tld(self, tld_id, **kwargs):
return self.client.delete(self.tld_uri(tld_id), **kwargs)

View File

@ -1,62 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v2.models.transfer_accepts_model import \
TransferAcceptsModel
from functionaltests.api.v2.models.transfer_accepts_model import \
TransferAcceptsListModel
from functionaltests.common.client import ClientMixin
class TransferAcceptClient(ClientMixin):
def transfer_accepts_uri(self, filters=None):
return self.create_uri("/zones/tasks/transfer_accepts",
filters=filters)
def transfer_accept_uri(self, transfer_request_id):
return "{0}/{1}".format(self.transfer_accepts_uri(),
transfer_request_id)
def list_transfer_accepts(self, zone_id, filters=None, **kwargs):
resp, body = self.client.get(
self.transfer_accepts_uri(filters), **kwargs)
return self.deserialize(resp, body, TransferAcceptsListModel)
def get_transfer_accept(self, zone_id, transfer_request_id, **kwargs):
resp, body = self.client.get(self.transfer_accept_uri(
transfer_request_id),
**kwargs)
return self.deserialize(resp, body, TransferAcceptsModel)
def post_transfer_accept(self, transfer_request_model, **kwargs):
resp, body = self.client.post(
self.transfer_accepts_uri(),
body=transfer_request_model.to_json(),
**kwargs)
return self.deserialize(resp, body, TransferAcceptsModel)
def put_transfer_accept(self, zone_id, transfer_request_id,
transfer_request_model, **kwargs):
resp, body = self.client.put(self.transfer_accept_uri(
transfer_request_id),
body=transfer_request_model.to_json(), **kwargs)
return self.deserialize(resp, body, TransferAcceptsModel)
def delete_transfer_accept(self, zone_id, transfer_request_id, **kwargs):
resp, body = self.client.delete(
self.transfer_accept_uri(zone_id, transfer_request_id), **kwargs)
return self.deserialize(resp, body, TransferAcceptsModel)

View File

@ -1,82 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v2.models.transfer_requests_model import \
TransferRequestsModel
from functionaltests.api.v2.models.transfer_requests_model import \
TransferRequestsListModel
from functionaltests.common.client import ClientMixin
class TransferRequestClient(ClientMixin):
def create_transfer_requests_uri(self, zone_id, filters=None):
return self.create_uri(
"/zones/{0}/tasks/transfer_requests".format(zone_id),
filters=filters,
)
def transfer_requests_uri(self, filters=None):
return self.create_uri(
"/zones/tasks/transfer_requests",
filters=filters,
)
def transfer_request_uri(self, transfer_request_id):
return self.create_uri(
"/zones/tasks/transfer_requests/{0}".format(transfer_request_id)
)
def list_transfer_requests(self, filters=None, **kwargs):
resp, body = self.client.get(
self.transfer_requests_uri(filters), **kwargs)
return self.deserialize(resp, body, TransferRequestsListModel)
def get_transfer_request(self, transfer_request_id, **kwargs):
resp, body = self.client.get(self.transfer_request_uri(
transfer_request_id),
**kwargs)
return self.deserialize(resp, body, TransferRequestsModel)
def post_transfer_request(self, zone_id, transfer_request_model=None,
**kwargs):
resp, body = self.client.post(
self.create_transfer_requests_uri(zone_id),
body=transfer_request_model.to_json(),
**kwargs)
return self.deserialize(resp, body, TransferRequestsModel)
def post_transfer_request_empty_body(self, zone_id, **kwargs):
resp, body = self.client.post(
self.create_transfer_requests_uri(zone_id),
body=None,
**kwargs)
return self.deserialize(resp, body, TransferRequestsModel)
def put_transfer_request(self, transfer_request_id,
transfer_request_model, **kwargs):
resp, body = self.client.put(self.transfer_request_uri(
transfer_request_id),
body=transfer_request_model.to_json(), **kwargs)
return self.deserialize(resp, body, TransferRequestsModel)
def delete_transfer_request(self, transfer_request_id, **kwargs):
resp, body = self.client.delete(
self.transfer_request_uri(transfer_request_id), **kwargs)
# the body is empty on a successful delete
if body:
return self.deserialize(resp, body, TransferRequestsModel)
return resp, body

View File

@ -1,82 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from tempest.lib.exceptions import NotFound
from functionaltests.api.v2.models.zone_model import ZoneModel
from functionaltests.api.v2.models.zone_model import ZoneListModel
from functionaltests.common.client import ClientMixin
from functionaltests.common import utils
class ZoneClient(ClientMixin):
def zones_uri(self, filters=None):
return self.create_uri("/zones", filters=filters)
def zone_uri(self, id):
return "{0}/{1}".format(self.zones_uri(), id)
def list_zones(self, filters=None, **kwargs):
resp, body = self.client.get(self.zones_uri(filters), **kwargs)
return self.deserialize(resp, body, ZoneListModel)
def get_zone(self, id, **kwargs):
resp, body = self.client.get(self.zone_uri(id))
return self.deserialize(resp, body, ZoneModel)
def post_zone(self, zone_model, **kwargs):
resp, body = self.client.post(self.zones_uri(),
body=zone_model.to_json(), **kwargs)
return self.deserialize(resp, body, ZoneModel)
def patch_zone(self, id, zone_model, **kwargs):
resp, body = self.client.patch(self.zone_uri(id),
body=zone_model.to_json(), **kwargs)
return self.deserialize(resp, body, ZoneModel)
def delete_zone(self, id, **kwargs):
resp, body = self.client.delete(self.zone_uri(id), **kwargs)
return self.deserialize(resp, body, ZoneModel)
def wait_for_zone(self, zone_id):
utils.wait_for_condition(lambda: self.is_zone_active(zone_id))
def wait_for_zone_404(self, zone_id):
utils.wait_for_condition(lambda: self.is_zone_404(zone_id))
def is_zone_active(self, zone_id):
resp, model = self.get_zone(zone_id)
# don't have assertEqual but still want to fail fast
assert resp.status == 200
if model.status == 'ACTIVE':
return True
elif model.status == 'ERROR':
raise Exception("Saw ERROR status")
return False
def is_zone_404(self, zone_id):
try:
# tempest.lib rest client raises exceptions on bad status codes
resp, model = self.get_zone(zone_id)
except NotFound:
return True
return False
def zones_dot_json(self, filters=None, **kwargs):
uri = self.create_uri("/zones.json", filters=filters)
resp, body = self.client.get(uri, **kwargs)
return self.deserialize(resp, body, ZoneListModel)

View File

@ -1,226 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import absolute_import
from __future__ import print_function
import sys
import traceback
import fixtures
from tempest.lib.exceptions import NotFound
from testtools.runtest import MultipleExceptions
from functionaltests.api.v2.clients.blacklist_client import BlacklistClient
from functionaltests.api.v2.clients.pool_client import PoolClient
from functionaltests.api.v2.clients.recordset_client import RecordsetClient
from functionaltests.api.v2.clients.tld_client import TLDClient
from functionaltests.api.v2.clients.zone_client import ZoneClient
from functionaltests.api.v2.clients.transfer_requests_client import \
TransferRequestClient
from functionaltests.common import datagen
class BaseFixture(fixtures.Fixture):
def setUp(self):
# Sometimes, exceptions are raised in _setUp methods on fixtures.
# testtools pushes the exception into a MultipleExceptions object along
# with an artificial SetupError, which produces bad error messages.
# This just logs those stack traces to stderr for easier debugging.
try:
super(BaseFixture, self).setUp()
except MultipleExceptions as e:
for i, exc_info in enumerate(e.args):
print('--- printing MultipleExceptions traceback {} of {} ---'
.format(i + 1, len(e.args)), file=sys.stderr)
traceback.print_exception(*exc_info)
raise
class ZoneFixture(BaseFixture):
def __init__(self, post_model=None, user='default'):
super(ZoneFixture, self).__init__()
self.post_model = post_model or datagen.random_zone_data()
self.user = user
def _setUp(self):
super(ZoneFixture, self)._setUp()
self._create_zone()
def _create_zone(self):
client = ZoneClient.as_user(self.user)
self.post_resp, self.created_zone = client.post_zone(self.post_model)
assert self.post_resp.status == 202
self.addCleanup(self.cleanup_zone, client, self.created_zone.id)
client.wait_for_zone(self.created_zone.id)
@classmethod
def cleanup_zone(cls, client, zone_id):
try:
client.delete_zone(zone_id)
except NotFound:
pass
class RecordsetFixture(BaseFixture):
def __init__(self, zone_id, post_model, user='default'):
super(RecordsetFixture, self).__init__()
self.zone_id = zone_id
self.post_model = post_model
self.user = user
def _setUp(self):
super(RecordsetFixture, self)._setUp()
self._create_recordset()
def _create_recordset(self):
client = RecordsetClient.as_user(self.user)
self.post_resp, self.created_recordset = client.post_recordset(
self.zone_id, self.post_model)
assert self.post_resp.status == 202
self.addCleanup(self.cleanup_recordset, client, self.zone_id,
self.created_recordset.id)
assert self.created_recordset.status == "PENDING"
assert self.created_recordset.name == self.post_model.name
assert self.created_recordset.ttl == self.post_model.ttl
assert self.created_recordset.records == self.post_model.records
RecordsetClient.as_user('default').wait_for_recordset(
self.zone_id, self.created_recordset.id)
@classmethod
def cleanup_recordset(cls, client, zone_id, recordset_id):
try:
client.delete_recordset(zone_id, recordset_id)
except NotFound:
pass
class PoolFixture(BaseFixture):
def __init__(self, post_model=None, user='admin'):
super(PoolFixture, self).__init__()
self.post_model = post_model or datagen.random_pool_data()
self.user = user
def _setUp(self):
super(PoolFixture, self)._setUp()
self._create_pool()
def _create_pool(self):
client = PoolClient.as_user(self.user)
self.post_resp, self.created_pool = client.post_pool(self.post_model)
assert self.post_resp.status == 201
self.addCleanup(self.cleanup_pool, client, self.created_pool.id)
@classmethod
def cleanup_pool(cls, client, pool_id):
try:
client.delete_pool(pool_id)
except NotFound:
pass
class TransferRequestFixture(BaseFixture):
def __init__(self, zone, post_model=None, user='default',
target_user='alt'):
"""Assuming the zone is being transferred between the two users, this
fixture will ensure that zone is deleted by trying to delete the zone
as each user.
"""
self.zone = zone
self.post_model = post_model or datagen.random_transfer_request_data()
self.user = user
self.target_user = target_user
def _setUp(self):
super(TransferRequestFixture, self)._setUp()
self._create_transfer_request()
def _create_transfer_request(self):
client = TransferRequestClient.as_user(self.user)
self.post_resp, self.transfer_request = client \
.post_transfer_request(self.zone.id, self.post_model)
assert self.post_resp.status == 201
self.addCleanup(self.cleanup_transfer_request, client,
self.transfer_request.id)
self.addCleanup(ZoneFixture.cleanup_zone,
ZoneClient.as_user(self.user), self.zone.id)
self.addCleanup(ZoneFixture.cleanup_zone,
ZoneClient.as_user(self.target_user), self.zone.id)
@classmethod
def cleanup_transfer_request(self, client, transfer_id):
try:
client.delete_transfer_request(transfer_id)
except NotFound:
pass
class BlacklistFixture(BaseFixture):
def __init__(self, post_model=None, user='admin'):
super(BlacklistFixture, self).__init__()
self.post_model = post_model or datagen.random_blacklist_data()
self.user = user
def _setUp(self):
super(BlacklistFixture, self)._setUp()
self._create_blacklist()
def _create_blacklist(self):
client = BlacklistClient.as_user(self.user)
self.post_resp, self.created_blacklist = client.post_blacklist(
self.post_model)
assert self.post_resp.status == 201
self.addCleanup(self.cleanup_blacklist, client,
self.created_blacklist.id)
@classmethod
def cleanup_blacklist(cls, client, blacklist_id):
try:
client.delete_blacklist(blacklist_id)
except NotFound:
pass
class TLDFixture(BaseFixture):
def __init__(self, post_model=None, user='admin'):
super(TLDFixture, self).__init__()
self.post_model = post_model or datagen.random_tld_data()
self.user = user
def _setUp(self):
super(TLDFixture, self)._setUp()
self._create_tld()
def _create_tld(self):
client = TLDClient.as_user(self.user)
self.post_resp, self.created_tld = client.post_tld(self.post_model)
assert self.post_resp.status == 201
self.addCleanup(self.cleanup_tld, client, self.created_tld.id)
@classmethod
def cleanup_tld(cls, client, tld_id):
try:
client.delete_tld(tld_id)
except NotFound:
pass

View File

@ -1,33 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.common.models import BaseModel
from functionaltests.common.models import CollectionModel
from functionaltests.common.models import EntityModel
class BlacklistData(BaseModel):
pass
class BlacklistModel(EntityModel):
ENTITY_NAME = 'blacklist'
MODEL_TYPE = BlacklistData
class BlacklistListModel(CollectionModel):
COLLECTION_NAME = 'blacklists'
MODEL_TYPE = BlacklistData

View File

@ -1,33 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.common.models import BaseModel
from functionaltests.common.models import CollectionModel
from functionaltests.common.models import EntityModel
class PoolData(BaseModel):
pass
class PoolModel(EntityModel):
ENTITY_NAME = 'pool'
MODEL_TYPE = PoolData
class PoolListModel(CollectionModel):
COLLECTION_NAME = 'pools'
MODEL_TYPE = PoolData

View File

@ -1,27 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.common.models import BaseModel
from functionaltests.common.models import EntityModel
class QuotasData(BaseModel):
pass
class QuotasModel(EntityModel):
ENTITY_NAME = 'quota'
MODEL_TYPE = QuotasData

View File

@ -1,33 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.common.models import BaseModel
from functionaltests.common.models import CollectionModel
from functionaltests.common.models import EntityModel
class RecordsetData(BaseModel):
pass
class RecordsetModel(EntityModel):
ENTITY_NAME = 'recordset'
MODEL_TYPE = RecordsetData
class RecordsetListModel(CollectionModel):
COLLECTION_NAME = 'recordsets'
MODEL_TYPE = RecordsetData

View File

@ -1,33 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.common.models import BaseModel
from functionaltests.common.models import CollectionModel
from functionaltests.common.models import EntityModel
class TLDData(BaseModel):
pass
class TLDModel(EntityModel):
ENTITY_NAME = 'tld'
MODEL_TYPE = TLDData
class TLDListModel(CollectionModel):
COLLECTION_NAME = 'tlds'
MODEL_TYPE = TLDData

View File

@ -1,33 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.common.models import BaseModel
from functionaltests.common.models import CollectionModel
from functionaltests.common.models import EntityModel
class TransferAcceptsData(BaseModel):
pass
class TransferAcceptsModel(EntityModel):
ENTITY_NAME = 'transfer_accept'
MODEL_TYPE = TransferAcceptsData
class TransferAcceptsListModel(CollectionModel):
COLLECTION_NAME = 'transfer_accepts'
MODEL_TYPE = TransferAcceptsData

View File

@ -1,33 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functionaltests.common.models import BaseModel
from functionaltests.common.models import CollectionModel
from functionaltests.common.models import EntityModel
class TransferRequestsData(BaseModel):
pass
class TransferRequestsModel(EntityModel):
ENTITY_NAME = 'transfer_request'
MODEL_TYPE = TransferRequestsData
class TransferRequestsListModel(CollectionModel):
COLLECTION_NAME = 'transfer_requests'
MODEL_TYPE = TransferRequestsData

View File

@ -1,27 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.common.models import BaseModel
from functionaltests.common.models import CollectionModel
class ZoneModel(BaseModel):
pass
class ZoneListModel(CollectionModel):
COLLECTION_NAME = 'zones'
MODEL_TYPE = ZoneModel

View File

@ -1,150 +0,0 @@
"""
Copyright 2016 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from tempest.lib import exceptions
from functionaltests.common import datagen
from functionaltests.common import utils
from functionaltests.api.v2.base import DesignateV2Test
from functionaltests.api.v2.clients.recordset_client import RecordsetClient
from functionaltests.api.v2.fixtures import ZoneFixture
from functionaltests.api.v2.fixtures import RecordsetFixture
from functionaltests.api.v2.test_recordset import RECORDSETS_DATASET
INVALID_TXT_DATASET = {
'trailing_slash': dict(data='\\'),
'trailing_double_slash': dict(data='\\\\'),
'trailing_slash_after_text': dict(data='v=spf1 +all\\'),
}
VALID_TXT_DATASET = {
'slash_with_one_trailing_space': dict(data='\\ '),
'slash_with_many_trailing_space': dict(data='\\ '),
'text_with_slash_and_trailing_space': dict(data='the txts \ '),
}
INVALID_MX_DATASET = {
'empty_preference': dict(pref=''),
'minus_zero_preference': dict(pref='-0'),
'minus_one_preference': dict(pref='-1'),
}
INVALID_SSHFP_DATASET = {
'minus_zero_algorithm': dict(algo='-0', finger=None),
'minus_zero_fingerprint': dict(algo=None, finger='-0'),
'minus_one_algorithm': dict(algo='-1', finger=None),
'minus_one_fingerprint': dict(algo=None, finger='-1'),
}
@utils.parameterized_class
class RecordsetValidationTest(DesignateV2Test):
def setUp(self):
super(RecordsetValidationTest, self).setUp()
self.increase_quotas(user='default')
self.ensure_tld_exists('com')
self.zone = self.useFixture(ZoneFixture()).created_zone
self.client = RecordsetClient.as_user('default')
@utils.parameterized(RECORDSETS_DATASET)
def test_create_invalid(self, make_recordset, data=None):
data = data or ["b0rk"]
for i in data:
model = make_recordset(self.zone)
model.data = i
self._assert_exception(
exceptions.BadRequest, 'invalid_object', 400,
self.client.post_recordset, self.zone.id, model)
@utils.parameterized(RECORDSETS_DATASET)
def test_update_invalid(self, make_recordset, data=None):
data = data or ["b0rk"]
post_model = make_recordset(self.zone)
fixture = self.useFixture(RecordsetFixture(self.zone.id, post_model))
recordset_id = fixture.created_recordset.id
for i in data:
model = make_recordset(self.zone)
model.data = i
self._assert_exception(
exceptions.BadRequest, 'invalid_object', 400,
self.client.put_recordset, self.zone.id, recordset_id, model)
def test_cannot_create_wildcard_NS_recordset(self):
model = datagen.wildcard_ns_recordset(self.zone.name)
self._assert_exception(
exceptions.BadRequest, 'invalid_object', 400,
self.client.post_recordset, self.zone.id, model)
def test_cname_recordsets_cannot_have_more_than_one_record(self):
post_model = datagen.random_cname_recordset(zone_name=self.zone.name)
post_model.records = [
"a.{0}".format(self.zone.name),
"b.{0}".format(self.zone.name),
]
self.assertRaises(exceptions.BadRequest,
self.client.post_recordset, self.zone.id, post_model)
@utils.parameterized(INVALID_TXT_DATASET)
def test_cannot_create_TXT_with(self, data):
post_model = datagen.random_txt_recordset(self.zone.name, data)
e = self._assert_exception(
exceptions.BadRequest, 'invalid_object', 400,
self.client.post_recordset, self.zone.id, post_model,
)
self.assertEqual(
"u'%s' is not a 'txt-data'" % data.replace('\\', '\\\\'),
e.resp_body['errors']['errors'][0]['message'],
)
@utils.parameterized(VALID_TXT_DATASET)
def test_create_TXT_with(self, data):
post_model = datagen.random_txt_recordset(self.zone.name, data)
fixture = self.useFixture(RecordsetFixture(self.zone.id, post_model))
recordset = fixture.created_recordset
self.client.wait_for_recordset(self.zone.id, recordset.id)
@utils.parameterized(VALID_TXT_DATASET)
def test_create_SPF_with(self, data):
post_model = datagen.random_spf_recordset(self.zone.name, data)
fixture = self.useFixture(RecordsetFixture(self.zone.id, post_model))
recordset = fixture.created_recordset
self.client.wait_for_recordset(self.zone.id, recordset.id)
@utils.parameterized(INVALID_MX_DATASET)
def test_cannot_create_MX_with(self, pref):
post_model = datagen.random_mx_recordset(self.zone.name, pref=pref)
self._assert_exception(
exceptions.BadRequest, 'invalid_object', 400,
self.client.post_recordset, self.zone.id, post_model,
)
@utils.parameterized(INVALID_SSHFP_DATASET)
def test_cannot_create_SSHFP_with(self, algo, finger):
post_model = datagen.random_sshfp_recordset(
zone_name=self.zone.name,
algorithm_number=algo,
fingerprint_type=finger,
)
self._assert_exception(
exceptions.BadRequest, 'invalid_object', 400,
self.client.post_recordset, self.zone.id, post_model,
)

View File

@ -1,27 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import tempest.lib.base
from functionaltests.common.config import read_config
class BaseDesignateTest(tempest.lib.base.BaseTestCase):
@classmethod
def setUpClass(cls):
super(BaseDesignateTest, cls).setUpClass()
read_config()

View File

@ -1,242 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import abc
import logging
from config import cfg
from noauth import NoAuthAuthProvider
from six import string_types
from six.moves.urllib.parse import quote_plus
from tempest.lib.common.rest_client import RestClient
from tempest.lib.auth import KeystoneV2Credentials
from tempest.lib.auth import KeystoneV2AuthProvider
from functionaltests.common.utils import memoized
from functionaltests.common import hooks
LOG = logging.getLogger(__name__)
class KeystoneV2AuthProviderWithOverridableUrl(KeystoneV2AuthProvider):
def base_url(self, *args, **kwargs):
# use the base url from the config if it was specified
if cfg.CONF.identity.designate_override_url:
return cfg.CONF.identity.designate_override_url
else:
return super(KeystoneV2AuthProviderWithOverridableUrl, self) \
.base_url(*args, **kwargs)
class KeystoneV2AuthProviderNoToken(KeystoneV2AuthProviderWithOverridableUrl):
def _decorate_request(self, filters, method, url, headers=None, body=None,
auth_data=None):
_res = super(KeystoneV2AuthProviderNoToken, self)._decorate_request(
filters, method, url, headers=headers, body=body,
auth_data=auth_data)
_url, _headers, _body = _res
del _headers['X-Auth-Token']
return (_url, _headers, _body)
class BaseDesignateClient(RestClient):
def __init__(self, with_token=True):
no_cert_check = cfg.CONF.testconfig.disable_ssl_certificate_validation
interface = cfg.CONF.designate.interface
if not interface.endswith('URL'):
interface += "URL"
self.hooks = []
self._populate_hooks()
super(BaseDesignateClient, self).__init__(
auth_provider=self.get_auth_provider(with_token),
service=cfg.CONF.designate.service,
region=cfg.CONF.identity.region,
disable_ssl_certificate_validation=no_cert_check,
endpoint_type=interface
)
def _populate_hooks(self):
for name in cfg.CONF.testconfig.hooks:
LOG.debug("Loading request hook '%s' from config", name)
try:
cls = hooks.get_class(name)
if not cls:
LOG.debug("'%s' not found. Call register_hook", name)
else:
self.hooks.append(cls)
except Exception as e:
LOG.exception(e)
def request(self, *args, **kwargs):
req_hooks = [hook_class() for hook_class in self.hooks]
try:
for hook in req_hooks:
hook.before(args, kwargs)
r, b = super(BaseDesignateClient, self).request(*args, **kwargs)
for hook in req_hooks:
hook.after(r, b)
return r, b
except Exception as e:
for hook in req_hooks:
hook.on_exception(e)
raise
def get_auth_provider(self, with_token=True):
if cfg.CONF.noauth.use_noauth:
return self._get_noauth_auth_provider()
return self._get_keystone_auth_provider(with_token)
@abc.abstractmethod
def _get_noauth_auth_provider(self):
pass
@abc.abstractmethod
def _get_keystone_auth_provider(self):
pass
def _create_keystone_auth_provider(self, creds, with_token=True):
if with_token:
auth_provider = KeystoneV2AuthProviderWithOverridableUrl(
creds, cfg.CONF.identity.uri)
else:
auth_provider = KeystoneV2AuthProviderNoToken(
creds, cfg.CONF.identity.uri)
auth_provider.fill_credentials()
return auth_provider
class DesignateClient(BaseDesignateClient):
"""Client with default user"""
def _get_noauth_auth_provider(self):
creds = KeystoneV2Credentials(
tenant_id=cfg.CONF.noauth.tenant_id,
)
return NoAuthAuthProvider(creds, cfg.CONF.noauth.designate_endpoint)
def _get_keystone_auth_provider(self, with_token=True):
creds = KeystoneV2Credentials(
username=cfg.CONF.identity.username,
password=cfg.CONF.identity.password,
tenant_name=cfg.CONF.identity.tenant_name,
)
return self._create_keystone_auth_provider(creds, with_token)
class DesignateAltClient(BaseDesignateClient):
"""Client with alternate user"""
def _get_noauth_auth_provider(self):
creds = KeystoneV2Credentials(
tenant_id=cfg.CONF.noauth.alt_tenant_id,
)
return NoAuthAuthProvider(creds, cfg.CONF.noauth.designate_endpoint)
def _get_keystone_auth_provider(self, with_token=True):
creds = KeystoneV2Credentials(
username=cfg.CONF.identity.alt_username,
password=cfg.CONF.identity.alt_password,
tenant_name=cfg.CONF.identity.alt_tenant_name,
)
return self._create_keystone_auth_provider(creds, with_token)
class DesignateAdminClient(BaseDesignateClient):
"""Client with admin user"""
def _get_noauth_auth_provider(self):
creds = KeystoneV2Credentials(
tenant_id=cfg.CONF.noauth.tenant_id,
)
return NoAuthAuthProvider(creds, cfg.CONF.noauth.designate_endpoint)
def _get_keystone_auth_provider(self, with_token=True):
creds = KeystoneV2Credentials(
username=cfg.CONF.auth.admin_username,
password=cfg.CONF.auth.admin_password,
tenant_name=cfg.CONF.auth.admin_tenant_name,
)
return self._create_keystone_auth_provider(creds, with_token)
class ClientMixin(object):
@classmethod
@memoized
def get_clients(cls, with_token):
return {
'default': DesignateClient(with_token),
'alt': DesignateAltClient(with_token),
'admin': DesignateAdminClient(with_token),
}
def __init__(self, client):
self.client = client
@classmethod
def deserialize(cls, resp, body, model_type):
return resp, model_type.from_json(body)
@classmethod
def as_user(cls, user, with_token=True):
"""
:param user: 'default', 'alt', or 'admin'
:param with_token: Boolean for whether to send the x-auth-token with
requests
"""
return cls(cls.get_clients(with_token)[user])
@property
def tenant_id(self):
return self.client.tenant_id
@classmethod
def add_filters(cls, url, filters):
"""
:param url: base URL for the request
:param filters: dict with var:val pairs to add as parameters to URL
"""
first = True
for f in filters:
if isinstance(filters[f], string_types):
filters[f] = quote_plus(filters[f].encode('utf-8'))
url = '{url}{sep}{var}={val}'.format(
url=url, sep=('?' if first else '&'), var=f, val=filters[f]
)
first = False
return url
def create_uri(self, path, filters=None):
url_pattern = cfg.CONF.testconfig.v2_path_pattern
params = {
'path': path,
'tenant_id': self.client.tenant_id,
'tenant_name': self.client.tenant_name,
'user': self.client.user,
'user_id': self.client.user_id,
}
uri = url_pattern.format(**params)
uri.replace('//', '/')
if filters:
uri = self.add_filters(uri, filters)
return uri

View File

@ -1,98 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
from oslo_config import cfg
cfg.CONF.register_group(cfg.OptGroup(
name='identity', title="Configuration for Keystone identity"
))
cfg.CONF.register_group(cfg.OptGroup(
name='auth', title="Configuration for Keystone auth"
))
cfg.CONF.register_group(cfg.OptGroup(
name='noauth', title="Configuration to run tests without Keystone"
))
cfg.CONF.register_group(cfg.OptGroup(
name='testconfig', title="Configuration to customize how the tests run"
))
cfg.CONF.register_opts([
cfg.StrOpt('designate_override_url',
help="Use this instead of the endpoint in the service catalog"),
cfg.StrOpt('uri', help="The Keystone v2 endpoint"),
cfg.StrOpt('uri_v3', help="The Keystone v3 endpoint"),
cfg.StrOpt('auth_version', default='v2'),
cfg.StrOpt('region'),
cfg.StrOpt('username'),
cfg.StrOpt('tenant_name'),
cfg.StrOpt('password', secret=True),
cfg.StrOpt('domain_name'),
cfg.StrOpt('alt_username'),
cfg.StrOpt('alt_tenant_name'),
cfg.StrOpt('alt_password', secret=True),
cfg.StrOpt('alt_domain_name'),
], group='identity')
cfg.CONF.register_opts([
cfg.StrOpt('admin_username'),
cfg.StrOpt('admin_tenant_name'),
cfg.StrOpt('admin_password', secret=True),
cfg.StrOpt('admin_domain_name'),
], group="auth")
cfg.CONF.register_opts([
cfg.StrOpt('designate_endpoint', help="The Designate API endpoint"),
cfg.StrOpt('tenant_id', default='noauth-project'),
cfg.StrOpt('alt_tenant_id', default='alt-project'),
cfg.StrOpt('admin_tenant_id', default='admin-project'),
cfg.BoolOpt('use_noauth', default=False),
], group='noauth')
cfg.CONF.register_opts([
cfg.ListOpt('nameservers', default=["127.0.0.1:53"]),
cfg.StrOpt('interface', default='public'),
cfg.StrOpt('service', default='dns')
], group="designate")
cfg.CONF.register_opts([
cfg.ListOpt('hooks', default=[],
help="The list of request hook class names to enable"),
cfg.StrOpt('v2_path_pattern', default='/v2/{path}',
help="Specifies how to build the path for the request"),
cfg.BoolOpt('no_admin_setup', default=False,
help="Skip admin actions (like increasing quotas) in setUp()"),
cfg.BoolOpt('disable_ssl_certificate_validation', default=False),
], group='testconfig')
def find_config_file():
return os.environ.get(
'TEMPEST_CONFIG', '/opt/stack/tempest/etc/tempest.conf')
def read_config():
cfg.CONF(args=[], default_config_files=[find_config_file()])

View File

@ -1,229 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import uuid
import random
from functionaltests.api.v2.models.blacklist_model import BlacklistModel
from functionaltests.api.v2.models.pool_model import PoolModel
from functionaltests.api.v2.models.transfer_requests_model import \
TransferRequestsModel
from functionaltests.api.v2.models.transfer_accepts_model import \
TransferAcceptsModel
from functionaltests.api.v2.models.recordset_model import RecordsetModel
from functionaltests.api.v2.models.zone_model import ZoneModel
from functionaltests.api.v2.models.tld_model import TLDModel
def random_ip():
return ".".join(str(random.randrange(0, 256)) for _ in range(4))
def random_ipv6():
def hexes(n):
return "".join(random.choice("1234567890abcdef") for _ in range(n))
result = ":".join(hexes(4) for _ in range(8))
return result.replace("0000", "0")
def random_uuid():
return uuid.uuid4()
def random_string(prefix='rand', n=8, suffix=''):
"""Return a string containing random digits
:param prefix: the exact text to start the string. Defaults to "rand"
:param n: the number of random digits to generate
:param suffix: the exact text to end the string
"""
digits = "".join(str(random.randrange(0, 10)) for _ in range(n))
return prefix + digits + suffix
def random_zone_data(name=None, email=None, ttl=None, description=None):
"""Generate random zone data, with optional overrides
:return: A ZoneModel
"""
if name is None:
name = random_string(prefix='testdomain', suffix='.com.')
if email is None:
email = ("admin@" + name).strip('.')
if description is None:
description = random_string(prefix='Description ')
if ttl is None:
ttl = random.randint(1200, 8400),
return ZoneModel.from_dict({
'name': name,
'email': email,
'ttl': random.randint(1200, 8400),
'description': description})
def random_transfer_request_data(description=None, target_project_id=None):
"""Generate random zone data, with optional overrides
:return: A TransferRequestModel
"""
data = {}
if description is None:
data['description'] = random_string(prefix='Description ')
if target_project_id:
data['target_project_id'] = target_project_id
return TransferRequestsModel.from_dict(data)
def random_transfer_accept_data(key=None, zone_transfer_request_id=None):
"""Generate random zone data, with optional overrides
:return: A TransferRequestModel
"""
if key is None:
key = random_string()
if zone_transfer_request_id is None:
zone_transfer_request_id = random_uuid()
return TransferAcceptsModel.from_dict({
'key': key,
'zone_transfer_request_id': zone_transfer_request_id})
def random_recordset_data(record_type, zone_name, name=None, records=None,
ttl=None):
"""Generate random recordset data, with optional overrides
:return: A RecordsetModel
"""
if name is None:
name = random_string(prefix=record_type, suffix='.' + zone_name)
if records is None:
records = [random_ip()]
if ttl is None:
ttl = random.randint(1200, 8400)
return RecordsetModel.from_dict({
'type': record_type,
'name': name,
'records': records,
'ttl': ttl})
def random_a_recordset(zone_name, ip=None, **kwargs):
if ip is None:
ip = random_ip()
return random_recordset_data('A', zone_name, records=[ip], **kwargs)
def random_aaaa_recordset(zone_name, ip=None, **kwargs):
if ip is None:
ip = random_ipv6()
return random_recordset_data('AAAA', zone_name, records=[ip], **kwargs)
def random_cname_recordset(zone_name, cname=None, **kwargs):
if cname is None:
cname = zone_name
return random_recordset_data('CNAME', zone_name, records=[cname], **kwargs)
def random_mx_recordset(zone_name, pref=None, host=None, **kwargs):
if pref is None:
pref = str(random.randint(0, 65535))
if host is None:
host = random_string(prefix='mail', suffix='.' + zone_name)
data = "{0} {1}".format(pref, host)
return random_recordset_data('MX', zone_name, records=[data], **kwargs)
def random_blacklist_data():
data = {
"pattern": random_string()
}
return BlacklistModel.from_dict(data)
def random_pool_data():
ns_zone = random_zone_data().name
data = {
"name": random_string(),
}
records = []
for i in range(0, 2):
records.append("ns%s.%s" % (i, ns_zone))
ns_records = [{"hostname": x, "priority": random.randint(1, 999)}
for x in records]
data["ns_records"] = ns_records
return PoolModel.from_dict(data)
def random_zonefile_data(name=None, ttl=None):
"""Generate random zone data, with optional overrides
:return: A ZoneModel
"""
zone_base = ('$ORIGIN &\n& # IN SOA ns.& nsadmin.& # # # # #\n'
'& # IN NS ns.&\n& # IN MX 10 mail.&\nns.& 360 IN A 1.0.0.1')
if name is None:
name = random_string(prefix='testdomain', suffix='.com.')
if ttl is None:
ttl = str(random.randint(1200, 8400))
return zone_base.replace('&', name).replace('#', ttl)
def random_spf_recordset(zone_name, data=None, **kwargs):
data = data or "v=spf1 +all"
return random_recordset_data('SPF', zone_name, records=[data], **kwargs)
def random_srv_recordset(zone_name, data=None):
data = data or "10 0 8080 %s.%s" % (random_string(), zone_name)
return random_recordset_data('SRV', zone_name,
name="_sip._tcp.%s" % zone_name,
records=[data])
def random_sshfp_recordset(zone_name, algorithm_number=None,
fingerprint_type=None, fingerprint=None,
**kwargs):
algorithm_number = algorithm_number or 2
fingerprint_type = fingerprint_type or 1
fingerprint = fingerprint or \
"123456789abcdef67890123456789abcdef67890"
data = "%s %s %s" % (algorithm_number, fingerprint_type, fingerprint)
return random_recordset_data('SSHFP', zone_name, records=[data], **kwargs)
def random_txt_recordset(zone_name, data=None, **kwargs):
data = data or "v=spf1 +all"
return random_recordset_data('TXT', zone_name, records=[data], **kwargs)
def random_tld_data():
data = {
"name": random_string(prefix='tld')
}
return TLDModel.from_dict(data)
def wildcard_ns_recordset(zone_name):
name = "*.{0}".format(zone_name)
records = ["ns.example.com."]
return random_recordset_data('NS', zone_name, name, records)

View File

@ -1,48 +0,0 @@
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Author: Endre Karlson <endre.karlson@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import dns.resolver
from oslo_config import cfg
from functionaltests.common import utils
def query(name, type_, server="127.0.0.1", port=53, timeout=3):
resolver = dns.resolver.Resolver()
resolver.nameservers = [server]
resolver.port = int(port)
resolver.timeout = timeout
try:
return resolver.query(name, type_)
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer):
return False
def query_servers(name, type_, servers=None, timeout=3):
servers = servers or os.environ.get("DESIGNATE_SERVERS",
cfg.CONF.designate.nameservers)
results = []
for srv in servers:
server, port = srv.split(":")
port = port or 53
result = utils.wait_for_condition(
lambda: query(name, type_, server, port))
results.append(result)
return results

View File

@ -1,37 +0,0 @@
"""
Copyright 2016 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
# a dictionary mapping the class name to the hook class
_HOOKS = {}
def register_hook(cls):
"""Register the request hook. This does not enable the hook. Hooks are
enable via the config file.
Usage:
>>> register_hook(MyHook)
"""
_HOOKS[cls.__name__] = cls
def get_class(name):
"""Get a hook class by it's class name:
Usage:
>>> get_hook_class('MyHook')
"""
return _HOOKS.get(name)

View File

@ -1,43 +0,0 @@
"""
Copyright 2016 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
class BaseRequestHook(object):
"""When writing your own hook, do three things:
1. Implement this hook interface
2. Register your hook in a global lookup using hook.register_hook()
3. Specify the name of your hook in a config file
A new instance of a hook is created before for each request, for storing
per request state if you want.
"""
def before(self, req_args, req_kwargs):
"""A hook called before each request
:param req_args: a list (mutable)
:param req_kwargs: a dictionary
"""
pass
def after(self, resp, resp_body):
"""A hook called after each request"""
pass
def on_exception(self, exception):
"""A hook called when an exception occurs on a request"""
pass

View File

@ -1,177 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
class BaseModel(object):
@classmethod
def from_json(cls, json_str):
return cls.from_dict(json.loads(json_str))
def to_json(self):
return json.dumps(self.to_dict())
@classmethod
def from_dict(cls, data):
model = cls()
for key in data:
setattr(model, key, data.get(key))
return model
def to_dict(self):
result = {}
for key in self.__dict__:
result[key] = getattr(self, key)
if isinstance(result[key], BaseModel):
result[key] = result[key].to_dict()
return result
def __str__(self):
return "%s" % self.to_dict()
class LinksModel(BaseModel):
pass
class MetadataModel(BaseModel):
pass
class CollectionModel(BaseModel):
"""
{
'collection_name' : [ <models> ],
'links': { <links> },
'metadata': { <metadata> },
}
"""
SUB_MODELS = {
'links': LinksModel,
'metadata': MetadataModel,
}
@classmethod
def from_dict(cls, data):
model = super(CollectionModel, cls).from_dict(data)
# deserialize e.g. data['zones']
collection = []
if hasattr(model, cls.COLLECTION_NAME):
for d in getattr(model, cls.COLLECTION_NAME):
collection.append(cls.MODEL_TYPE.from_dict(d))
setattr(model, cls.COLLECTION_NAME, collection)
# deserialize data['links'], data['metadata'], etc
for key, model_type in cls.SUB_MODELS.items():
if hasattr(model, key):
val = getattr(model, key)
setattr(model, key, model_type.from_dict(val))
return model
class EntityModel(BaseModel):
"""
{ 'entity_name': { <data> } }
"""
@classmethod
def from_dict(cls, data):
model = super(EntityModel, cls).from_dict(data)
if hasattr(model, cls.ENTITY_NAME):
val = getattr(model, cls.ENTITY_NAME)
setattr(model, cls.ENTITY_NAME, cls.MODEL_TYPE.from_dict(val))
return model
class ZoneFile(object):
def __init__(self, origin, ttl, records):
self.origin = origin
self.ttl = ttl
self.records = records
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return str(self)
def __eq__(self, other):
return self.__dict__ == other.__dict__
@classmethod
def from_text(cls, text):
"""Return a ZoneFile from a string containing the zone file contents"""
# filter out empty lines and strip all leading/trailing whitespace.
# this assumes no multiline records
lines = [x.strip() for x in text.split('\n') if x.strip()]
assert lines[0].startswith('$ORIGIN')
assert lines[1].startswith('$TTL')
return ZoneFile(
origin=lines[0].split(' ')[1],
ttl=int(lines[1].split(' ')[1]),
records=[ZoneFileRecord.from_text(x) for x in lines[2:]],
)
class ZoneFileRecord(object):
def __init__(self, name, type, data):
self.name = str(name)
self.type = str(type)
self.data = str(data)
def __str__(self):
return str(self.__dict__)
def __repr__(self):
return str(self)
def __eq__(self, other):
return self.__dict__ == other.__dict__
def __hash__(self):
return hash(tuple(sorted(self.__dict__.items())))
@classmethod
def from_text(cls, text):
"""Create a ZoneFileRecord from a line of text of a zone file, like:
mydomain.com. IN NS ns1.example.com.
"""
# assumes records don't have a TTL between the name and the class.
# assumes no parentheses in the record, all on a single line.
parts = [x for x in text.split(' ', 4) if x.strip()]
name, rclass, rtype, data = parts
assert rclass == 'IN'
return cls(name=name, type=rtype, data=data)
@classmethod
def records_from_recordset(cls, recordset):
"""Returns a list of ZoneFileRecords, one for each entry in the
recordset's list of records
"""
return [
cls(name=recordset.name, type=recordset.type, data=data)
for data in recordset.records
]

View File

@ -1,61 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import copy
import re
from six.moves.urllib import parse
from tempest.lib.auth import AuthProvider
class NoAuthAuthProvider(AuthProvider):
def __init__(self, creds, override_url):
super(NoAuthAuthProvider, self).__init__(creds)
self.override_url = override_url
@classmethod
def check_credentials(cls, credentials):
return True
def base_url(self, *args, **kwargs):
return self.override_url
def _decorate_request(self, filters, method, url, headers=None, body=None,
auth_data=None):
base_url = self.base_url(filters=filters, auth_data=auth_data)
# build the unauthenticated request
_headers = copy.deepcopy(headers) if headers is not None else {}
_headers['X-Auth-Project-ID'] = self.credentials.tenant_id
if url is None or url == "":
_url = base_url
else:
# Join base URL and url, and remove multiple contiguous slashes
_url = "/".join([base_url, url])
parts = [x for x in parse.urlparse(_url)]
parts[2] = re.sub("/{2,}", "/", parts[2])
_url = parse.urlunparse(parts)
# no change to method or body
return str(_url), _headers, body
def _get_auth(self):
return None
def is_expired(self):
return False
def _fill_credentials(self):
pass

View File

@ -1,72 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from functionaltests.api.v2.models.recordset_model import RecordsetModel
from functionaltests.common.models import ZoneFile
from functionaltests.common.models import ZoneFileRecord
import tempest.lib.base
class MetaTest(tempest.lib.base.BaseTestCase):
def test_zone_file_model_meta_test(self):
zone_file = ZoneFile.from_text(
"""
$ORIGIN mydomain.com.
$TTL 1234
mydomain.com. IN NS ns1.example.com.
mydomain.com. IN SOA ns1.example.com. mail.mydomain.com. 1 2 3 4 5
""")
self.assertEqual('mydomain.com.', zone_file.origin)
self.assertEqual(1234, zone_file.ttl)
ns_record = ZoneFileRecord(
name='mydomain.com.', type='NS', data='ns1.example.com.')
soa_record = ZoneFileRecord(
name='mydomain.com.', type='SOA',
data='ns1.example.com. mail.mydomain.com. 1 2 3 4 5')
self.assertEqual(zone_file.records[0], ns_record)
self.assertEqual(zone_file.records[1], soa_record)
def test_zone_file_record_model_meta_test(self):
record = ZoneFileRecord(name='one.com.', type='A', data='1.2.3.4')
wrong_name = ZoneFileRecord(name='two.com.', type='A', data='1.2.3.4')
wrong_type = ZoneFileRecord(name='one.com.', type='MX', data='1.2.3.4')
wrong_data = ZoneFileRecord(name='one.com.', type='A', data='1.2.3.5')
self.assertEqual(record, record)
self.assertNotEqual(record, wrong_name)
self.assertNotEqual(record, wrong_type)
self.assertNotEqual(record, wrong_data)
def test_zone_file_records_from_recordset(self):
# we don't need all of the recordset's fields here
recordset = RecordsetModel.from_dict({
"type": "NS",
"name": "mydomain.com.",
"records": ["ns1.a.com.", "ns2.a.com.", "ns3.a.com."],
})
records = ZoneFileRecord.records_from_recordset(recordset)
expected = [
ZoneFileRecord(name="mydomain.com.", type="NS", data="ns1.a.com."),
ZoneFileRecord(name="mydomain.com.", type="NS", data="ns2.a.com."),
ZoneFileRecord(name="mydomain.com.", type="NS", data="ns3.a.com."),
]
self.assertEqual(expected, records)

View File

@ -1,124 +0,0 @@
"""
Copyright 2015 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import collections
import functools
import time
import six
import netaddr
def def_method(f, *args, **kwargs):
@functools.wraps(f)
def new_method(self):
return f(self, *args, **kwargs)
return new_method
def parameterized_class(cls):
"""A class decorator for running parameterized test cases.
Mark your class with @parameterized_class.
Mark your test cases with @parameterized.
"""
test_functions = {
k: v for k, v in vars(cls).items() if k.startswith('test')
}
for name, f in test_functions.items():
if not hasattr(f, '_test_data'):
continue
# remove the original test function from the class
delattr(cls, name)
# add a new test function to the class for each entry in f._test_data
for tag, args in f._test_data.items():
new_name = "{0}_{1}".format(f.__name__, tag)
if hasattr(cls, new_name):
raise Exception(
"Parameterized test case '{0}.{1}' created from '{0}.{2}' "
"already exists".format(cls.__name__, new_name, name))
# Using `def new_method(self): f(self, **args)` is not sufficient
# (all new_methods use the same args value due to late binding).
# Instead, use this factory function.
new_method = def_method(f, **args)
# To add a method to a class, available for all instances:
# MyClass.method = types.MethodType(f, None, MyClass)
setattr(cls, new_name, six.create_unbound_method(new_method, cls))
return cls
def parameterized(data):
"""A function decorator for parameterized test cases.
Example:
@parameterized({
'zero': dict(val=0),
'one': dict(val=1),
})
def test_val(self, val):
self.assertEqual(self.get_val(), val)
The above will generate two test cases:
`test_val_zero` which runs with val=0
`test_val_one` which runs with val=1
:param data: A dictionary that looks like {tag: {arg1: val1, ...}}
"""
def wrapped(f):
f._test_data = data
return f
return wrapped
def wait_for_condition(condition, interval=5, timeout=45):
end_time = time.time() + timeout
while time.time() < end_time:
result = condition()
if result:
return result
time.sleep(interval)
raise Exception("Timed out after {0} seconds".format(timeout))
def memoized(func):
"""A decorator to cache function's return value"""
cache = {}
@functools.wraps(func)
def wrapper(*args):
if not isinstance(args, collections.Hashable):
# args is not cacheable. just call the function.
return func(*args)
if args in cache:
return cache[args]
else:
value = func(*args)
cache[args] = value
return value
return wrapper
def shorten_ipv6_addrs(addrs):
"""Shorten ipv6 addresses"""
new_addrs = []
for a in addrs:
an = netaddr.IPAddress(a, version=6)
new_addrs.append(an.format(netaddr.ipv6_compact))
return new_addrs

10
tox.ini
View File

@ -84,16 +84,6 @@ deps = pip-check-reqs
-r{toxinidir}/requirements.txt
commands=pip-missing-reqs -d --ignore-file=designate/tests/* designate
[testenv:functional]
usedevelop = False
setenv = VIRTUAL_ENV={envdir}
OS_TEST_PATH=functionaltests/
passenv = TEMPEST_CONFIG
OS_STDOUT_CAPTURE
OS_STDERR_CAPTURE
OS_LOG_CAPTURE
OS_DEBUG
[testenv:api-ref]
# This environment is called from CI scripts to test and publish
# the API Ref to developer.openstack.org.