Restructure testing

- Moved all functional tests to dedicated folder.
- Updated a few tests to use faster oslo tests.
- Renamed fixtures.py to base_fixtures.py.

Change-Id: Ie8c5591dd12a91350173bec2b418b6c9ddea79fe
This commit is contained in:
Erik Olof Gunnar Andersson 2023-12-15 08:06:57 -08:00
parent af940fb6e4
commit 57f2e367bf
96 changed files with 1211 additions and 1097 deletions
designate/tests
README__init__.pybase_fixtures.py
functional
test_worker
unit

9
designate/tests/README Normal file

@ -0,0 +1,9 @@
This directory contains all Designate tests.
Examples:
tox -e py3
tox -e py3 -- tests.unit
tox -e py3 -- tests.unit.backend
tox -e py3 -- tests.functional
tox -e py3 -- tests.functional.api

@ -14,879 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import copy
import functools
import inspect
import os
import time
from unittest import mock
import eventlet
from oslo_config import fixture as cfg_fixture
from oslo_log import log as logging
from oslo_messaging import conffixture as messaging_fixture
from oslotest import base
from testtools import testcase
from designate.common import constants
import designate.conf
from designate.context import DesignateContext
from designate import exceptions
from designate import objects
from designate import policy
from designate import storage
from designate.tests import fixtures
from designate.tests import resources
from designate import utils
eventlet.monkey_patch(os=False)
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
default_pool_id = CONF['service:central'].default_pool_id
_TRUE_VALUES = ('true', '1', 'yes', 'y')
class TestTimeoutError(Exception):
# Used in wait_for_condition
pass
class TestCase(base.BaseTestCase):
service_status_fixtures = [{
'service_name': 'foo',
'hostname': 'bar',
'status': "UP",
'stats': {},
'capabilities': {},
}, {
'id': 'c326f735-eecc-4968-969f-355a43c4ae27',
'service_name': 'baz',
'hostname': 'qux',
'status': "UP",
'stats': {},
'capabilities': {},
}]
quota_fixtures = [{
'resource': constants.QUOTA_ZONES,
'hard_limit': 5,
}, {
'resource': constants.QUOTA_ZONE_RECORDS,
'hard_limit': 50,
}]
server_fixtures = [{
'name': 'ns1.example.org.',
}, {
'name': 'ns2.example.org.',
}, {
'name': 'ns2.example.org.',
}]
# The last tld is invalid
tld_fixtures = [{
'name': 'com',
}, {
'name': 'co.uk',
}, {
'name': 'com.',
}]
default_tld_fixtures = [{
'name': 'com',
}, {
'name': 'org',
}, {
'name': 'net',
}]
tsigkey_fixtures = [{
'name': 'test-key-one',
'algorithm': 'hmac-md5',
'secret': 'SomeOldSecretKey',
'scope': 'POOL',
'resource_id': '6ca6baef-3305-4ad0-a52b-a82df5752b62',
}, {
'name': 'test-key-two',
'algorithm': 'hmac-sha256',
'secret': 'AnotherSecretKey',
'scope': 'ZONE',
'resource_id': '7fbb6304-5e74-4691-bd80-cef3cff5fe2f',
}]
# The last zone is invalid
zone_fixtures = {
'PRIMARY': [
{
'name': 'example.com.',
'type': 'PRIMARY',
'email': 'example@example.com',
}, {
'name': 'example.net.',
'type': 'PRIMARY',
'email': 'example@example.net',
}, {
'name': 'example.org.',
'type': 'PRIMARY',
'email': 'example@example.org',
}, {
'name': 'invalid.com.....',
'type': 'PRIMARY',
'email': 'example@invalid.com',
}
],
'SECONDARY': [
{
'name': 'example.com.',
'type': 'SECONDARY',
}, {
'name': 'example.net.',
'type': 'SECONDARY',
}, {
'name': 'example.org.',
'type': 'SECONDARY',
}, {
'name': 'invalid.com.....',
'type': 'SECONDARY',
}
]
}
recordset_fixtures = {
'A': [
{'name': 'mail.%s', 'type': 'A'},
{'name': 'www.%s', 'type': 'A'},
],
'MX': [
{'name': 'mail.%s', 'type': 'MX'},
],
'SRV': [
{'name': '_sip._tcp.%s', 'type': 'SRV'},
{'name': '_sip._udp.%s', 'type': 'SRV'},
],
'TXT': [
{'name': 'text.%s', 'type': 'TXT'},
],
'CNAME': [
{'name': 'www.%s', 'type': 'CNAME'},
{'name': 'sub1.%s', 'type': 'CNAME'},
]
}
record_fixtures = {
'A': [
{'data': '192.0.2.1'},
{'data': '192.0.2.2'}
],
'MX': [
{'data': '5 mail.example.org.'},
{'data': '10 mail.example.com.'},
],
'SRV': [
{'data': '5 0 5060 server1.example.org.'},
{'data': '10 1 5060 server2.example.org.'},
],
'CNAME': [
{'data': 'www.somezone.org.'},
{'data': 'www.someotherzone.com.'},
],
'TXT': [
{'data': 'footxtdata'}
]
}
ptr_fixtures = [
{'ptrdname': 'srv1.example.com.'},
{'ptrdname': 'srv1.example.net.'},
{'ptrdname': 'srv2.example.com.'},
{'ptrdname': 'srv3.example.com.'},
{'ptrdname': 'srv4.example.com.'},
{'ptrdname': 'srv5.example.com.'},
]
blacklist_fixtures = [{
'pattern': 'blacklisted.com.',
'description': 'This is a comment',
}, {
'pattern': 'blacklisted.net.'
}, {
'pattern': 'blacklisted.org.'
}]
pool_fixtures = [
{'name': 'Pool-One',
'description': 'Pool-One description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'ns_records': [{'priority': 1, 'hostname': 'ns1.example.org.'},
{'priority': 2, 'hostname': 'ns2.example.org.'}]},
{'name': 'Pool-Two',
'description': 'Pool-Two description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'ns_records': [{'priority': 1, 'hostname': 'ns1.example.org.'}]},
]
pool_attribute_fixtures = [
{'scope': 'public'},
{'scope': 'private'},
{'scope': 'unknown'}
]
pool_attributes_fixtures = [
{'pool_id': default_pool_id,
'key': 'continent',
'value': 'NA'},
{'pool_id': default_pool_id,
'key': 'scope',
'value': 'public'}
]
pool_nameserver_fixtures = [
{'pool_id': default_pool_id,
'host': "192.0.2.1",
'port': 53},
{'pool_id': default_pool_id,
'host': "192.0.2.2",
'port': 53},
]
pool_target_fixtures = [
{'pool_id': default_pool_id,
'type': "fake",
'description': "FooBar"},
{'pool_id': default_pool_id,
'type': "fake",
'description': "BarFoo"},
]
pool_also_notify_fixtures = [
{'pool_id': default_pool_id,
'host': "192.0.2.1",
'port': 53},
{'pool_id': default_pool_id,
'host': "192.0.2.2",
'port': 53},
]
shared_zone_fixtures = [
{
"target_project_id": "target_project_id",
"zone_id": None,
"project_id": "project_id",
}
]
zone_transfers_request_fixtures = [{
"description": "Test Transfer",
}, {
"description": "Test Transfer 2 - with target",
"target_tenant_id": "target_tenant_id"
}]
zone_import_fixtures = [{
'status': 'PENDING',
'zone_id': None,
'message': None,
'task_type': 'IMPORT'
}, {
'status': 'ERROR',
'zone_id': None,
'message': None,
'task_type': 'IMPORT'
}, {
'status': 'COMPLETE',
'zone_id': '6ca6baef-3305-4ad0-a52b-a82df5752b62',
'message': None,
'task_type': 'IMPORT'
}]
zone_export_fixtures = [{
'status': 'PENDING',
'zone_id': None,
'message': None,
'task_type': 'EXPORT'
}, {
'status': 'ERROR',
'zone_id': None,
'message': None,
'task_type': 'EXPORT'
}, {
'status': 'COMPLETE',
'zone_id': '6ca6baef-3305-4ad0-a52b-a82df5752b62',
'message': None,
'task_type': 'EXPORT'
}]
def setUp(self):
super().setUp()
self.CONF = self.useFixture(cfg_fixture.Config(CONF)).conf
self.messaging_conf = messaging_fixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake:/'
self.messaging_conf.response_timeout = 5
self.useFixture(self.messaging_conf)
self.config(
driver=['test'],
group='oslo_messaging_notifications'
)
self.useFixture(fixtures.RPCFixture(CONF))
self.config(
emitter_type="noop",
group="heartbeat_emitter"
)
self.config(
auth_strategy='noauth',
group='service:api'
)
self._disable_osprofiler()
self.db_fixture = self.useFixture(
fixtures.DatabaseFixture.get_fixture())
if os.getenv('DESIGNATE_SQL_DEBUG', "False").lower() in _TRUE_VALUES:
connection_debug = 50
else:
connection_debug = 0
self.config(
connection=self.db_fixture.url,
connection_debug=connection_debug,
group='storage:sqlalchemy'
)
self.config(network_api='fake')
self.config(
scheduler_filters=['pool_id_attribute', 'random'],
group='service:central')
# "Read" Configuration
self.CONF([], project='designate')
self.useFixture(fixtures.PolicyFixture())
self.network_api = fixtures.NetworkAPIFixture()
self.useFixture(self.network_api)
self.central_service = self.start_service('central')
self.admin_context = self.get_admin_context()
self.admin_context_all_tenants = self.get_admin_context(
all_tenants=True)
self.storage = storage.get_storage()
# Setup the Default Pool with some useful settings
self._setup_default_pool()
def _disable_osprofiler(self):
"""Disable osprofiler.
osprofiler should not run for unit tests.
"""
def side_effect(value):
return value
mock_decorator = mock.MagicMock(side_effect=side_effect)
try:
p = mock.patch("osprofiler.profiler.trace_cls",
return_value=mock_decorator)
p.start()
except ModuleNotFoundError:
pass
def _setup_default_pool(self):
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
# Fill out the necessary pool details
pool.ns_records = objects.PoolNsRecordList.from_list([
{'hostname': 'ns1.example.org.', 'priority': 1}
])
pool.targets = objects.PoolTargetList.from_list([
{'type': 'fake', 'description': "Fake PoolTarget for Tests"}
])
# Save the default pool
self.storage.update_pool(self.admin_context, pool)
# Config Methods
def config(self, **kwargs):
group = kwargs.pop('group', None)
for k, v in kwargs.items():
CONF.set_override(k, v, group)
def policy(self, rules, default_rule='allow', overwrite=True):
# Inject an allow and deny rule
rules['allow'] = '@'
rules['deny'] = '!'
# Set the rules
policy.set_rules(rules, default_rule, overwrite)
def start_service(self, svc_name, *args, **kw):
"""
Convenience method for starting a service!
"""
fixture = fixtures.ServiceFixture(svc_name, *args, **kw)
self.useFixture(fixture)
return fixture.svc
# Context Methods
def get_context(self, **kwargs):
return DesignateContext(**kwargs)
def get_admin_context(self, **kwargs):
return DesignateContext.get_admin_context(
project_id=utils.generate_uuid(),
user_id=utils.generate_uuid(),
**kwargs)
# Fixture methods
def get_quota_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.quota_fixtures[fixture])
_values.update(values)
return _values
def get_server_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.server_fixtures[fixture])
_values.update(values)
return _values
def get_tld_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.tld_fixtures[fixture])
_values.update(values)
return _values
def get_default_tld_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.default_tld_fixtures[fixture])
_values.update(values)
return _values
def get_tsigkey_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.tsigkey_fixtures[fixture])
_values.update(values)
return _values
def get_zone_fixture(self, zone_type=None, fixture=0, values=None):
zone_type = zone_type or 'PRIMARY'
_values = copy.copy(self.zone_fixtures[zone_type][fixture])
if values:
_values.update(values)
return _values
def get_recordset_fixture(self, zone_name, recordset_type='A', fixture=0,
values=None):
values = values or {}
_values = copy.copy(self.recordset_fixtures[recordset_type][fixture])
_values.update(values)
try:
_values['name'] = _values['name'] % zone_name
except TypeError:
pass
return _values
def get_record_fixture(self, recordset_type, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.record_fixtures[recordset_type][fixture])
_values.update(values)
return _values
def get_ptr_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.ptr_fixtures[fixture])
_values.update(values)
return objects.FloatingIP().from_dict(_values)
def get_zonefile_fixture(self, variant=None):
if variant is None:
f = 'example.com.zone'
else:
f = '%s_example.com.zone' % variant
path = os.path.join(resources.path, 'zonefiles', f)
with open(path) as zonefile:
return zonefile.read()
def get_blacklist_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.blacklist_fixtures[fixture])
_values.update(values)
return _values
def get_pool_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_fixtures[fixture])
_values.update(values)
return _values
def get_pool_attribute_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_attribute_fixtures[fixture])
_values.update(values)
return _values
def get_pool_attributes_fixture(self, fixture=0, values=None):
# TODO(kiall): Remove this method, in favor of the
# get_pool_attribute_fixture method above.
values = values or {}
_values = copy.copy(self.pool_attributes_fixtures[fixture])
_values.update(values)
return _values
def get_pool_nameserver_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_nameserver_fixtures[fixture])
_values.update(values)
return _values
def get_pool_target_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_target_fixtures[fixture])
_values.update(values)
return _values
def get_pool_also_notify_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_also_notify_fixtures[fixture])
_values.update(values)
return _values
def get_zone_transfer_request_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.zone_transfers_request_fixtures[fixture])
_values.update(values)
return _values
def get_zone_transfer_accept_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.zone_transfers_accept_fixtures[fixture])
_values.update(values)
return _values
def get_zone_import_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.zone_import_fixtures[fixture])
_values.update(values)
return _values
def get_zone_export_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.zone_export_fixtures[fixture])
_values.update(values)
return _values
def get_service_status_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.service_status_fixtures[fixture])
_values.update(values)
return _values
def get_shared_zone_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.shared_zone_fixtures[fixture])
_values.update(values)
return _values
def update_service_status(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_service_status_fixture(
fixture=fixture, values=kwargs)
return self.central_service.update_service_status(
context, objects.ServiceStatus.from_dict(values))
def create_tld(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_tld_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_tld(
context, objects.Tld.from_dict(values))
def create_default_tld(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_default_tld_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_tld(
context, objects.Tld.from_dict(values))
def create_default_tlds(self):
for index in range(len(self.default_tld_fixtures)):
try:
self.create_default_tld(fixture=index)
except exceptions.DuplicateTld:
pass
def create_tsigkey(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_tsigkey_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_tsigkey(
context, objects.TsigKey.from_dict(values))
def create_zone(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
zone_type = kwargs.pop('type', None)
values = self.get_zone_fixture(zone_type=zone_type,
fixture=fixture, values=kwargs)
if 'tenant_id' not in values:
values['tenant_id'] = context.project_id
return self.central_service.create_zone(
context, objects.Zone.from_dict(values))
def create_recordset(self, zone, recordset_type='A', records=None,
increment_serial=True, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_recordset_fixture(
zone['name'],
recordset_type=recordset_type, fixture=fixture, values=kwargs
)
recordset = objects.RecordSet.from_dict(values)
if records is None:
recordset.records = [
objects.Record.from_dict(
self.get_record_fixture(recordset_type=recordset_type)
)
]
else:
recordset.records = records
return self.central_service.create_recordset(
context, zone['id'], recordset,
increment_serial=increment_serial
)
def create_blacklist(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_blacklist_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_blacklist(
context, objects.Blacklist.from_dict(values))
def create_pool(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_pool_fixture(fixture=fixture, values=kwargs)
if 'tenant_id' not in values:
values['tenant_id'] = context.project_id
return self.central_service.create_pool(
context, objects.Pool.from_dict(values))
def create_pool_attribute(self, **kwargs):
# TODO(kiall): This method should require a "pool" be passed in,
# rather than hardcoding the default pool ID into the
# fixture.
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_pool_attributes_fixture(fixture=fixture,
values=kwargs)
# TODO(kiall): We shouldn't be assuming the default_pool_id here
return self.storage.create_pool_attribute(
context, default_pool_id,
objects.PoolAttribute.from_dict(values))
def create_zone_transfer_request(self, zone, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_zone_transfer_request_fixture(
fixture=fixture, values=kwargs)
if 'zone_id' not in values:
values['zone_id'] = zone.id
return self.central_service.create_zone_transfer_request(
context, objects.ZoneTransferRequest.from_dict(values))
def create_zone_transfer_accept(self, zone_transfer_request, **kwargs):
context = kwargs.pop('context', self.admin_context)
values = {}
if 'tenant_id' not in values:
values['tenant_id'] = context.project_id
if 'zone_transfer_request_id' not in values:
values['zone_transfer_request_id'] = zone_transfer_request.id
if 'zone_id' not in values:
values['zone_id'] = zone_transfer_request.zone_id
if 'key' not in values:
values['key'] = zone_transfer_request.key
return self.central_service.create_zone_transfer_accept(
context, objects.ZoneTransferAccept.from_dict(values))
def create_zone_import(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
zone_import = self.get_zone_import_fixture(fixture=fixture,
values=kwargs)
return self.storage.create_zone_import(
context, objects.ZoneImport.from_dict(zone_import))
def create_zone_export(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
zone_export = self.get_zone_export_fixture(fixture=fixture,
values=kwargs)
return self.storage.create_zone_export(
context, objects.ZoneExport.from_dict(zone_export))
def wait_for_import(self, zone_import_id, error_is_ok=False, max_wait=10):
"""
Zone imports spawn a thread to parse the zone file and
insert the data. This waits for this process before continuing
"""
start_time = time.monotonic()
while True:
# Retrieve it, and ensure it's the same
zone_import = self.central_service.get_zone_import(
self.admin_context_all_tenants, zone_import_id
)
# If the import is done, we're done
if zone_import.status == 'COMPLETE':
break
# If errors are allowed, just make sure that something completed
if error_is_ok and zone_import.status != 'PENDING':
break
if (time.monotonic() - start_time) > max_wait:
break
time.sleep(0.5)
if not error_is_ok:
self.assertEqual('COMPLETE', zone_import.status)
return zone_import
def share_zone(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_shared_zone_fixture(fixture, values=kwargs)
return self.central_service.share_zone(
context, kwargs['zone_id'], objects.SharedZone.from_dict(values)
)
def _ensure_interface(self, interface, implementation):
for name in interface.__abstractmethods__:
in_arginfo = inspect.getfullargspec(getattr(interface, name))
im_arginfo = inspect.getfullargspec(getattr(implementation, name))
self.assertEqual(
in_arginfo, im_arginfo,
"Method Signature for '%s' mismatched" % name)
def wait_for_condition(self, condition, interval=0.3, timeout=2):
"""Wait for a condition to be true or raise an exception after
`timeout` seconds.
Poll every `interval` seconds. `condition` can be a callable.
(Caution: some mocks behave both as values and callables.)
"""
t_max = time.monotonic() + timeout
while time.monotonic() < t_max:
if callable(condition):
result = condition()
else:
result = condition
if result:
return result
time.sleep(interval)
raise TestTimeoutError
def _skip_decorator(func):
@functools.wraps(func)
def skip_if_not_implemented(*args, **kwargs):
try:
return func(*args, **kwargs)
except NotImplementedError as e:
raise testcase.TestSkipped(str(e))
except Exception as e:
if 'not implemented' in str(e):
raise testcase.TestSkipped(str(e))
raise
return skip_if_not_implemented
class SkipNotImplementedMeta(type):
def __new__(cls, name, bases, local):
for attr in local:
value = local[attr]
if callable(value) and (
attr.startswith('test_') or attr == 'setUp'):
local[attr] = _skip_decorator(value)
return type.__new__(cls, name, bases, local)

@ -18,6 +18,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from contextlib import contextmanager
@ -41,8 +43,7 @@ from designate import rpc
import designate.service
import designate.utils
"""Test fixtures
"""
_TRUE_VALUES = ('True', 'true', '1', 'yes')
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
@ -56,7 +57,8 @@ class CoordinatorFixture(fixtures.Fixture):
def setUp(self):
super().setUp()
self.coordinator = tooz.coordination.get_coordinator(
*self._args, **self._kwargs)
*self._args, **self._kwargs
)
self.coordinator.start()
self.addCleanup(self.coordinator.stop)

@ -0,0 +1,6 @@
This directory contains all functional tests.
Examples:
tox -e py3 -- tests.functional
tox -e py3 -- tests.functional.api

@ -0,0 +1,891 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import functools
import inspect
import os
import time
from unittest import mock
from oslo_config import fixture as cfg_fixture
from oslo_log import log as logging
from oslo_messaging import conffixture as messaging_fixture
from oslotest import base
from testtools import testcase
from designate.common import constants
import designate.conf
from designate.context import DesignateContext
from designate import exceptions
from designate import objects
from designate import policy
from designate import storage
from designate.tests import base_fixtures
from designate.tests import resources
from designate import utils
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
default_pool_id = CONF['service:central'].default_pool_id
_TRUE_VALUES = ('true', '1', 'yes', 'y')
class TestTimeoutError(Exception):
# Used in wait_for_condition
pass
class TestCase(base.BaseTestCase):
service_status_fixtures = [{
'service_name': 'foo',
'hostname': 'bar',
'status': "UP",
'stats': {},
'capabilities': {},
}, {
'id': 'c326f735-eecc-4968-969f-355a43c4ae27',
'service_name': 'baz',
'hostname': 'qux',
'status': "UP",
'stats': {},
'capabilities': {},
}]
quota_fixtures = [{
'resource': constants.QUOTA_ZONES,
'hard_limit': 5,
}, {
'resource': constants.QUOTA_ZONE_RECORDS,
'hard_limit': 50,
}]
server_fixtures = [{
'name': 'ns1.example.org.',
}, {
'name': 'ns2.example.org.',
}, {
'name': 'ns2.example.org.',
}]
# The last tld is invalid
tld_fixtures = [{
'name': 'com',
}, {
'name': 'co.uk',
}, {
'name': 'com.',
}]
default_tld_fixtures = [{
'name': 'com',
}, {
'name': 'org',
}, {
'name': 'net',
}]
tsigkey_fixtures = [{
'name': 'test-key-one',
'algorithm': 'hmac-md5',
'secret': 'SomeOldSecretKey',
'scope': 'POOL',
'resource_id': '6ca6baef-3305-4ad0-a52b-a82df5752b62',
}, {
'name': 'test-key-two',
'algorithm': 'hmac-sha256',
'secret': 'AnotherSecretKey',
'scope': 'ZONE',
'resource_id': '7fbb6304-5e74-4691-bd80-cef3cff5fe2f',
}]
# The last zone is invalid
zone_fixtures = {
'PRIMARY': [
{
'name': 'example.com.',
'type': 'PRIMARY',
'email': 'example@example.com',
}, {
'name': 'example.net.',
'type': 'PRIMARY',
'email': 'example@example.net',
}, {
'name': 'example.org.',
'type': 'PRIMARY',
'email': 'example@example.org',
}, {
'name': 'invalid.com.....',
'type': 'PRIMARY',
'email': 'example@invalid.com',
}
],
'SECONDARY': [
{
'name': 'example.com.',
'type': 'SECONDARY',
}, {
'name': 'example.net.',
'type': 'SECONDARY',
}, {
'name': 'example.org.',
'type': 'SECONDARY',
}, {
'name': 'invalid.com.....',
'type': 'SECONDARY',
}
]
}
recordset_fixtures = {
'A': [
{'name': 'mail.%s', 'type': 'A'},
{'name': 'www.%s', 'type': 'A'},
],
'MX': [
{'name': 'mail.%s', 'type': 'MX'},
],
'SRV': [
{'name': '_sip._tcp.%s', 'type': 'SRV'},
{'name': '_sip._udp.%s', 'type': 'SRV'},
],
'TXT': [
{'name': 'text.%s', 'type': 'TXT'},
],
'CNAME': [
{'name': 'www.%s', 'type': 'CNAME'},
{'name': 'sub1.%s', 'type': 'CNAME'},
]
}
record_fixtures = {
'A': [
{'data': '192.0.2.1'},
{'data': '192.0.2.2'}
],
'MX': [
{'data': '5 mail.example.org.'},
{'data': '10 mail.example.com.'},
],
'SRV': [
{'data': '5 0 5060 server1.example.org.'},
{'data': '10 1 5060 server2.example.org.'},
],
'CNAME': [
{'data': 'www.somezone.org.'},
{'data': 'www.someotherzone.com.'},
],
'TXT': [
{'data': 'footxtdata'}
]
}
ptr_fixtures = [
{'ptrdname': 'srv1.example.com.'},
{'ptrdname': 'srv1.example.net.'},
{'ptrdname': 'srv2.example.com.'},
{'ptrdname': 'srv3.example.com.'},
{'ptrdname': 'srv4.example.com.'},
{'ptrdname': 'srv5.example.com.'},
]
blacklist_fixtures = [{
'pattern': 'blacklisted.com.',
'description': 'This is a comment',
}, {
'pattern': 'blacklisted.net.'
}, {
'pattern': 'blacklisted.org.'
}]
pool_fixtures = [
{'name': 'Pool-One',
'description': 'Pool-One description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'ns_records': [{'priority': 1, 'hostname': 'ns1.example.org.'},
{'priority': 2, 'hostname': 'ns2.example.org.'}]},
{'name': 'Pool-Two',
'description': 'Pool-Two description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'ns_records': [{'priority': 1, 'hostname': 'ns1.example.org.'}]},
]
pool_attribute_fixtures = [
{'scope': 'public'},
{'scope': 'private'},
{'scope': 'unknown'}
]
pool_attributes_fixtures = [
{'pool_id': default_pool_id,
'key': 'continent',
'value': 'NA'},
{'pool_id': default_pool_id,
'key': 'scope',
'value': 'public'}
]
pool_nameserver_fixtures = [
{'pool_id': default_pool_id,
'host': "192.0.2.1",
'port': 53},
{'pool_id': default_pool_id,
'host': "192.0.2.2",
'port': 53},
]
pool_target_fixtures = [
{'pool_id': default_pool_id,
'type': "fake",
'description': "FooBar"},
{'pool_id': default_pool_id,
'type': "fake",
'description': "BarFoo"},
]
pool_also_notify_fixtures = [
{'pool_id': default_pool_id,
'host': "192.0.2.1",
'port': 53},
{'pool_id': default_pool_id,
'host': "192.0.2.2",
'port': 53},
]
shared_zone_fixtures = [
{
"target_project_id": "target_project_id",
"zone_id": None,
"project_id": "project_id",
}
]
zone_transfers_request_fixtures = [{
"description": "Test Transfer",
}, {
"description": "Test Transfer 2 - with target",
"target_tenant_id": "target_tenant_id"
}]
zone_import_fixtures = [{
'status': 'PENDING',
'zone_id': None,
'message': None,
'task_type': 'IMPORT'
}, {
'status': 'ERROR',
'zone_id': None,
'message': None,
'task_type': 'IMPORT'
}, {
'status': 'COMPLETE',
'zone_id': '6ca6baef-3305-4ad0-a52b-a82df5752b62',
'message': None,
'task_type': 'IMPORT'
}]
zone_export_fixtures = [{
'status': 'PENDING',
'zone_id': None,
'message': None,
'task_type': 'EXPORT'
}, {
'status': 'ERROR',
'zone_id': None,
'message': None,
'task_type': 'EXPORT'
}, {
'status': 'COMPLETE',
'zone_id': '6ca6baef-3305-4ad0-a52b-a82df5752b62',
'message': None,
'task_type': 'EXPORT'
}]
def setUp(self):
super().setUp()
self.CONF = self.useFixture(cfg_fixture.Config(CONF)).conf
self.messaging_conf = messaging_fixture.ConfFixture(CONF)
self.messaging_conf.transport_url = 'fake:/'
self.messaging_conf.response_timeout = 5
self.useFixture(self.messaging_conf)
self.config(
driver=['test'],
group='oslo_messaging_notifications'
)
self.useFixture(base_fixtures.RPCFixture(CONF))
self.config(
emitter_type="noop",
group="heartbeat_emitter"
)
self.config(
auth_strategy='noauth',
group='service:api'
)
self._disable_osprofiler()
self.db_fixture = self.useFixture(
base_fixtures.DatabaseFixture.get_fixture())
if os.getenv('DESIGNATE_SQL_DEBUG', "False").lower() in _TRUE_VALUES:
connection_debug = 50
else:
connection_debug = 0
self.config(
connection=self.db_fixture.url,
connection_debug=connection_debug,
group='storage:sqlalchemy'
)
self.config(network_api='fake')
self.config(
scheduler_filters=['pool_id_attribute', 'random'],
group='service:central')
# "Read" Configuration
self.CONF([], project='designate')
self.useFixture(base_fixtures.PolicyFixture())
self.network_api = base_fixtures.NetworkAPIFixture()
self.useFixture(self.network_api)
self.central_service = self.start_service('central')
self.admin_context = self.get_admin_context()
self.admin_context_all_tenants = self.get_admin_context(
all_tenants=True)
self.storage = storage.get_storage()
# Setup the Default Pool with some useful settings
self._setup_default_pool()
def _disable_osprofiler(self):
"""Disable osprofiler.
osprofiler should not run for unit tests.
"""
def side_effect(value):
return value
mock_decorator = mock.MagicMock(side_effect=side_effect)
try:
p = mock.patch("osprofiler.profiler.trace_cls",
return_value=mock_decorator)
p.start()
except ModuleNotFoundError:
pass
def _setup_default_pool(self):
# Fetch the default pool
pool = self.storage.get_pool(self.admin_context, default_pool_id)
# Fill out the necessary pool details
pool.ns_records = objects.PoolNsRecordList.from_list([
{'hostname': 'ns1.example.org.', 'priority': 1}
])
pool.targets = objects.PoolTargetList.from_list([
{'type': 'fake', 'description': "Fake PoolTarget for Tests"}
])
# Save the default pool
self.storage.update_pool(self.admin_context, pool)
# Config Methods
def config(self, **kwargs):
group = kwargs.pop('group', None)
for k, v in kwargs.items():
CONF.set_override(k, v, group)
def policy(self, rules, default_rule='allow', overwrite=True):
# Inject an allow and deny rule
rules['allow'] = '@'
rules['deny'] = '!'
# Set the rules
policy.set_rules(rules, default_rule, overwrite)
def start_service(self, svc_name, *args, **kw):
"""
Convenience method for starting a service!
"""
fixture = base_fixtures.ServiceFixture(svc_name, *args, **kw)
self.useFixture(fixture)
return fixture.svc
# Context Methods
def get_context(self, **kwargs):
return DesignateContext(**kwargs)
def get_admin_context(self, **kwargs):
return DesignateContext.get_admin_context(
project_id=utils.generate_uuid(),
user_id=utils.generate_uuid(),
**kwargs)
# Fixture methods
def get_quota_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.quota_fixtures[fixture])
_values.update(values)
return _values
def get_server_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.server_fixtures[fixture])
_values.update(values)
return _values
def get_tld_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.tld_fixtures[fixture])
_values.update(values)
return _values
def get_default_tld_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.default_tld_fixtures[fixture])
_values.update(values)
return _values
def get_tsigkey_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.tsigkey_fixtures[fixture])
_values.update(values)
return _values
def get_zone_fixture(self, zone_type=None, fixture=0, values=None):
zone_type = zone_type or 'PRIMARY'
_values = copy.copy(self.zone_fixtures[zone_type][fixture])
if values:
_values.update(values)
return _values
def get_recordset_fixture(self, zone_name, recordset_type='A', fixture=0,
values=None):
values = values or {}
_values = copy.copy(self.recordset_fixtures[recordset_type][fixture])
_values.update(values)
try:
_values['name'] = _values['name'] % zone_name
except TypeError:
pass
return _values
def get_record_fixture(self, recordset_type, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.record_fixtures[recordset_type][fixture])
_values.update(values)
return _values
def get_ptr_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.ptr_fixtures[fixture])
_values.update(values)
return objects.FloatingIP().from_dict(_values)
def get_zonefile_fixture(self, variant=None):
if variant is None:
f = 'example.com.zone'
else:
f = '%s_example.com.zone' % variant
path = os.path.join(resources.path, 'zonefiles', f)
with open(path) as zonefile:
return zonefile.read()
def get_blacklist_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.blacklist_fixtures[fixture])
_values.update(values)
return _values
def get_pool_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_fixtures[fixture])
_values.update(values)
return _values
def get_pool_attribute_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_attribute_fixtures[fixture])
_values.update(values)
return _values
def get_pool_attributes_fixture(self, fixture=0, values=None):
# TODO(kiall): Remove this method, in favor of the
# get_pool_attribute_fixture method above.
values = values or {}
_values = copy.copy(self.pool_attributes_fixtures[fixture])
_values.update(values)
return _values
def get_pool_nameserver_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_nameserver_fixtures[fixture])
_values.update(values)
return _values
def get_pool_target_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_target_fixtures[fixture])
_values.update(values)
return _values
def get_pool_also_notify_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.pool_also_notify_fixtures[fixture])
_values.update(values)
return _values
def get_zone_transfer_request_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.zone_transfers_request_fixtures[fixture])
_values.update(values)
return _values
def get_zone_transfer_accept_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.zone_transfers_accept_fixtures[fixture])
_values.update(values)
return _values
def get_zone_import_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.zone_import_fixtures[fixture])
_values.update(values)
return _values
def get_zone_export_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.zone_export_fixtures[fixture])
_values.update(values)
return _values
def get_service_status_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.service_status_fixtures[fixture])
_values.update(values)
return _values
def get_shared_zone_fixture(self, fixture=0, values=None):
values = values or {}
_values = copy.copy(self.shared_zone_fixtures[fixture])
_values.update(values)
return _values
def update_service_status(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_service_status_fixture(
fixture=fixture, values=kwargs)
return self.central_service.update_service_status(
context, objects.ServiceStatus.from_dict(values))
def create_tld(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_tld_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_tld(
context, objects.Tld.from_dict(values))
def create_default_tld(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_default_tld_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_tld(
context, objects.Tld.from_dict(values))
def create_default_tlds(self):
for index in range(len(self.default_tld_fixtures)):
try:
self.create_default_tld(fixture=index)
except exceptions.DuplicateTld:
pass
def create_tsigkey(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_tsigkey_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_tsigkey(
context, objects.TsigKey.from_dict(values))
def create_zone(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
zone_type = kwargs.pop('type', None)
values = self.get_zone_fixture(zone_type=zone_type,
fixture=fixture, values=kwargs)
if 'tenant_id' not in values:
values['tenant_id'] = context.project_id
return self.central_service.create_zone(
context, objects.Zone.from_dict(values))
def create_recordset(self, zone, recordset_type='A', records=None,
increment_serial=True, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_recordset_fixture(
zone['name'],
recordset_type=recordset_type, fixture=fixture, values=kwargs
)
recordset = objects.RecordSet.from_dict(values)
if records is None:
recordset.records = [
objects.Record.from_dict(
self.get_record_fixture(recordset_type=recordset_type)
)
]
else:
recordset.records = records
return self.central_service.create_recordset(
context, zone['id'], recordset,
increment_serial=increment_serial
)
def create_blacklist(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_blacklist_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_blacklist(
context, objects.Blacklist.from_dict(values))
def create_pool(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_pool_fixture(fixture=fixture, values=kwargs)
if 'tenant_id' not in values:
values['tenant_id'] = context.project_id
return self.central_service.create_pool(
context, objects.Pool.from_dict(values))
def create_pool_attribute(self, **kwargs):
# TODO(kiall): This method should require a "pool" be passed in,
# rather than hardcoding the default pool ID into the
# fixture.
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_pool_attributes_fixture(fixture=fixture,
values=kwargs)
# TODO(kiall): We shouldn't be assuming the default_pool_id here
return self.storage.create_pool_attribute(
context, default_pool_id,
objects.PoolAttribute.from_dict(values))
def create_zone_transfer_request(self, zone, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_zone_transfer_request_fixture(
fixture=fixture, values=kwargs)
if 'zone_id' not in values:
values['zone_id'] = zone.id
return self.central_service.create_zone_transfer_request(
context, objects.ZoneTransferRequest.from_dict(values))
def create_zone_transfer_accept(self, zone_transfer_request, **kwargs):
context = kwargs.pop('context', self.admin_context)
values = {}
if 'tenant_id' not in values:
values['tenant_id'] = context.project_id
if 'zone_transfer_request_id' not in values:
values['zone_transfer_request_id'] = zone_transfer_request.id
if 'zone_id' not in values:
values['zone_id'] = zone_transfer_request.zone_id
if 'key' not in values:
values['key'] = zone_transfer_request.key
return self.central_service.create_zone_transfer_accept(
context, objects.ZoneTransferAccept.from_dict(values))
def create_zone_import(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
zone_import = self.get_zone_import_fixture(fixture=fixture,
values=kwargs)
return self.storage.create_zone_import(
context, objects.ZoneImport.from_dict(zone_import))
def create_zone_export(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
zone_export = self.get_zone_export_fixture(fixture=fixture,
values=kwargs)
return self.storage.create_zone_export(
context, objects.ZoneExport.from_dict(zone_export))
def wait_for_import(self, zone_import_id, error_is_ok=False, max_wait=10):
"""
Zone imports spawn a thread to parse the zone file and
insert the data. This waits for this process before continuing
"""
start_time = time.monotonic()
while True:
# Retrieve it, and ensure it's the same
zone_import = self.central_service.get_zone_import(
self.admin_context_all_tenants, zone_import_id
)
# If the import is done, we're done
if zone_import.status == 'COMPLETE':
break
# If errors are allowed, just make sure that something completed
if error_is_ok and zone_import.status != 'PENDING':
break
if (time.monotonic() - start_time) > max_wait:
break
time.sleep(0.5)
if not error_is_ok:
self.assertEqual('COMPLETE', zone_import.status)
return zone_import
def share_zone(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_shared_zone_fixture(fixture, values=kwargs)
return self.central_service.share_zone(
context, kwargs['zone_id'], objects.SharedZone.from_dict(values)
)
def _ensure_interface(self, interface, implementation):
for name in interface.__abstractmethods__:
in_arginfo = inspect.getfullargspec(getattr(interface, name))
im_arginfo = inspect.getfullargspec(getattr(implementation, name))
self.assertEqual(
in_arginfo, im_arginfo,
"Method Signature for '%s' mismatched" % name)
def wait_for_condition(self, condition, interval=0.3, timeout=2):
"""Wait for a condition to be true or raise an exception after
`timeout` seconds.
Poll every `interval` seconds. `condition` can be a callable.
(Caution: some mocks behave both as values and callables.)
"""
t_max = time.monotonic() + timeout
while time.monotonic() < t_max:
if callable(condition):
result = condition()
else:
result = condition
if result:
return result
time.sleep(interval)
raise TestTimeoutError
def _skip_decorator(func):
@functools.wraps(func)
def skip_if_not_implemented(*args, **kwargs):
try:
return func(*args, **kwargs)
except NotImplementedError as e:
raise testcase.TestSkipped(str(e))
except Exception as e:
if 'not implemented' in str(e):
raise testcase.TestSkipped(str(e))
raise
return skip_if_not_implemented
class SkipNotImplementedMeta(type):
def __new__(cls, name, bases, local):
for attr in local:
value = local[attr]
if callable(value) and (
attr.startswith('test_') or attr == 'setUp'):
local[attr] = _skip_decorator(value)
return type.__new__(cls, name, bases, local)

@ -20,7 +20,7 @@ from webtest import TestApp
from designate.api import admin as admin_api
from designate.api import middleware
import designate.tests
import designate.tests.functional
LOG = logging.getLogger(__name__)
@ -33,7 +33,7 @@ INVALID_ID = [
]
class AdminApiTestCase(designate.tests.TestCase):
class AdminApiTestCase(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()

@ -13,14 +13,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import designate.conf
from designate.tests.test_api.test_admin import AdminApiTestCase
from designate.tests.functional.api import admin
CONF = designate.conf.CONF
class AdminApiQuotasTest(AdminApiTestCase):
class AdminApiQuotasTest(admin.AdminApiTestCase):
def setUp(self):
self.config(enabled_extensions_admin=['quotas'], group='service:api')
super().setUp()

@ -13,10 +13,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_api.test_admin import AdminApiTestCase
class AdminApiReportsTest(AdminApiTestCase):
from designate.tests.functional.api import admin
class AdminApiReportsTest(admin.AdminApiTestCase):
def setUp(self):
self.config(enabled_extensions_admin=['reports'], group='service:api')
super().setUp()

@ -14,6 +14,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslo_messaging as messaging
@ -24,7 +26,7 @@ import designate.conf
from designate import context
from designate import exceptions
from designate import rpc
import designate.tests
import designate.tests.functional
CONF = designate.conf.CONF
@ -47,7 +49,7 @@ class FakeRequest:
return "FakeResponse"
class KeystoneContextMiddlewareTest(designate.tests.TestCase):
class KeystoneContextMiddlewareTest(designate.tests.functional.TestCase):
def test_process_request(self):
app = middleware.KeystoneContextMiddleware({})
@ -134,7 +136,7 @@ class KeystoneContextMiddlewareTest(designate.tests.TestCase):
self.assertEqual(response, 'FakeResponse')
class NoAuthContextMiddlewareTest(designate.tests.TestCase):
class NoAuthContextMiddlewareTest(designate.tests.functional.TestCase):
def test_process_request(self):
app = middleware.NoAuthContextMiddleware({})
@ -153,7 +155,7 @@ class NoAuthContextMiddlewareTest(designate.tests.TestCase):
self.assertEqual(['admin'], ctxt.roles)
class MaintenanceMiddlewareTest(designate.tests.TestCase):
class MaintenanceMiddlewareTest(designate.tests.functional.TestCase):
def test_process_request_disabled(self):
self.config(maintenance_mode=False, group='service:api')
@ -225,7 +227,7 @@ class MaintenanceMiddlewareTest(designate.tests.TestCase):
self.assertEqual('FakeResponse', response)
class NormalizeURIMiddlewareTest(designate.tests.TestCase):
class NormalizeURIMiddlewareTest(designate.tests.functional.TestCase):
def test_strip_trailing_slases(self):
request = FakeRequest()
request.environ['PATH_INFO'] = 'resource/'
@ -251,7 +253,7 @@ class NormalizeURIMiddlewareTest(designate.tests.TestCase):
self.assertEqual('resource', request.environ['PATH_INFO'])
class FaultMiddlewareTest(designate.tests.TestCase):
class FaultMiddlewareTest(designate.tests.functional.TestCase):
def test_request(self):
mock_request = mock.Mock()

@ -20,10 +20,10 @@ from paste import urlmap
from designate.api import service
from designate import exceptions
import designate.tests
import designate.tests.functional
class ApiServiceTest(designate.tests.TestCase):
class ApiServiceTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()

@ -20,7 +20,7 @@ from webtest import TestApp
from designate.api import middleware
from designate.api import v2 as api_v2
import designate.tests
import designate.tests.functional
LOG = logging.getLogger(__name__)
@ -33,7 +33,7 @@ INVALID_ID = [
]
class ApiV2TestCase(designate.tests.TestCase):
class ApiV2TestCase(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()

@ -10,10 +10,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
class ApiV2DisableTest(ApiV2TestCase):
class ApiV2DisableTest(v2.ApiV2TestCase):
def setUp(self):
self.config(enable_api_v2=False, group='service:api')
super().setUp()

@ -15,10 +15,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
class ApiV2BlacklistsTest(ApiV2TestCase):
class ApiV2BlacklistsTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -13,15 +13,12 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_api.test_v2 import ApiV2TestCase
"""
NOTE: Record invalidation is tested in Central tests
"""
from designate.tests.functional.api import v2
class ApiV2ReverseFloatingIPTest(ApiV2TestCase):
class ApiV2ReverseFloatingIPTest(v2.ApiV2TestCase):
def test_get_floatingip_no_record(self):
context = self.get_context(project_id='a')

@ -14,10 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
class ApiV2HostHeadersTest(ApiV2TestCase):
class ApiV2HostHeadersTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -18,10 +18,10 @@ from webtest import TestApp
from designate.api import admin as admin_api
from designate.api import middleware
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
class APIV2ZoneImportExportTest(ApiV2TestCase):
class APIV2ZoneImportExportTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -14,13 +14,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import designate.conf
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
CONF = designate.conf.CONF
class ApiV2LimitsTest(ApiV2TestCase):
class ApiV2LimitsTest(v2.ApiV2TestCase):
def test_get_limits(self):
response = self.client.get('/limits/')

@ -15,7 +15,7 @@
from oslo_log import log as logging
import designate.conf
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
CONF = designate.conf.CONF
@ -30,7 +30,7 @@ def _attributes_to_api(attributes):
return result
class ApiV2PoolsTest(ApiV2TestCase):
class ApiV2PoolsTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -12,12 +12,12 @@
from oslo_log import log as logging
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
LOG = logging.getLogger(__name__)
class ApiV2QuotasTest(ApiV2TestCase):
class ApiV2QuotasTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -15,12 +15,12 @@
# under the License.
from oslo_log import log as logging
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
LOG = logging.getLogger(__name__)
class ApiV2RecordSetsTest(ApiV2TestCase):
class ApiV2RecordSetsTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()
self.zone = self.create_zone()

@ -13,12 +13,12 @@
# under the License.
from oslo_log import log as logging
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
LOG = logging.getLogger(__name__)
class ApiV2ServiceStatusTest(ApiV2TestCase):
class ApiV2ServiceStatusTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -11,10 +11,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
class ApiV2SharedZonesTest(ApiV2TestCase):
class ApiV2SharedZonesTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -13,10 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
class ApiV2TldsTest(ApiV2TestCase):
class ApiV2TldsTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -19,10 +19,10 @@ import oslo_messaging as messaging
from designate.central import service as central_service
from designate import exceptions
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
class ApiV2TsigKeysTest(ApiV2TestCase):
class ApiV2TsigKeysTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -15,13 +15,13 @@ from oslo_log import log as logging
import designate.conf
from designate import objects
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
class ZoneExportsTest(ApiV2TestCase):
class ZoneExportsTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()
self.zone = self.create_zone()

@ -22,12 +22,12 @@ from oslo_utils import timeutils
from designate.central import service as central_service
from designate import exceptions
from designate import objects
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
LOG = logging.getLogger(__name__)
class ApiV2RecordSetsTest(ApiV2TestCase):
class ApiV2RecordSetsTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -14,10 +14,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
class ApiV2ZoneTransfersTest(ApiV2TestCase):
class ApiV2ZoneTransfersTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -22,14 +22,14 @@ from designate.central import service as central_service
import designate.conf
from designate import exceptions
from designate import objects
from designate.tests.test_api.test_v2 import ApiV2TestCase
from designate.tests.functional.api import v2
from designate.worker import rpcapi as worker_api
CONF = designate.conf.CONF
class ApiV2ZonesTest(ApiV2TestCase):
class ApiV2ZonesTest(v2.ApiV2TestCase):
def setUp(self):
super().setUp()

@ -29,8 +29,8 @@ from designate import policy
from designate import quota
from designate import storage
from designate.storage import sqlalchemy
from designate import tests
from designate.tests.fixtures import random_seed
from designate.tests.base_fixtures import random_seed
import designate.tests.functional
from designate.tests import unit
from designate.worker import rpcapi as worker_rpcapi
@ -110,7 +110,7 @@ fx_worker = fixtures.MockPatch(
)
class CentralBasic(tests.TestCase):
class CentralBasic(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
@ -1710,7 +1710,7 @@ class CentralStatusTests(CentralBasic):
self.assertEqual(new_zone.status, 'PENDING')
class CentralQuotaTest(tests.TestCase):
class CentralQuotaTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.config(quota_driver='noop')

@ -13,6 +13,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from concurrent import futures
import copy
@ -38,8 +40,8 @@ from designate import exceptions
from designate import objects
from designate.storage import sql
from designate.storage.sqlalchemy import tables
import designate.tests
from designate.tests import fixtures
from designate.tests import base_fixtures
import designate.tests.functional
from designate import utils
from designate.worker import rpcapi as worker_api
@ -48,10 +50,10 @@ CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
class CentralServiceTest(designate.tests.TestCase):
class CentralServiceTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
def test_stop(self):

@ -11,18 +11,20 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from io import StringIO
from unittest import mock
from designate.manage import database
import designate.tests
import designate.tests.functional
class TestManageDatabase(designate.tests.TestCase):
class TestManageDatabase(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.stdlog = designate.tests.fixtures.StandardLogging()
self.stdlog = designate.tests.base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.db_cmds = database.DatabaseCommands()

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from unittest import mock
import yaml
@ -20,8 +21,8 @@ import oslo_messaging
from designate.central import service
from designate.manage import base
from designate.manage import pool
import designate.tests
from designate.tests import fixtures
from designate.tests import base_fixtures
import designate.tests.functional
from designate.tests import resources
LOG = logging.getLogger(__name__)
@ -36,10 +37,10 @@ def get_pools(name='pools.yaml'):
return yaml.safe_load(pool_obj)
class ManagePoolTestCase(designate.tests.TestCase):
class ManagePoolTestCase(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
default_pool = self.central_service.find_pool(

@ -9,11 +9,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from designate.manage import tlds
import designate.tests
from designate.tests import fixtures
from designate.tests import base_fixtures
import designate.tests.functional
from designate.tests import resources
@ -21,10 +23,10 @@ def get_tlds_path(name='tlds_list'):
return os.path.join(resources.path, 'tlds', name)
class ManageTLDSTestCase(designate.tests.TestCase):
class ManageTLDSTestCase(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.tlds = tlds.TLDCommands()

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_log import log as logging
@ -17,8 +18,9 @@ from oslo_log import log as logging
from designate.manage import base
from designate.manage import pool
from designate import objects
import designate.tests
from designate.tests import fixtures
from designate.tests import base_fixtures
import designate.tests.functional
LOG = logging.getLogger(__name__)
@ -34,10 +36,10 @@ def hydrate_pool_targets(target_masters):
return pool_targets
class UpdatePoolTestCase(designate.tests.TestCase):
class UpdatePoolTestCase(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.print_result = mock.patch.object(

@ -28,7 +28,7 @@ import designate.conf
from designate import context
from designate.mdns import handler
from designate import objects
import designate.tests
import designate.tests.functional
CONF = designate.conf.CONF
@ -50,7 +50,7 @@ ANSWER = [
]
class MdnsRequestHandlerTest(designate.tests.TestCase):
class MdnsRequestHandlerTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.mock_tg = mock.Mock()

@ -24,7 +24,7 @@ import dns
import dns.message
from oslo_log import log as logging
import designate.tests
import designate.tests.functional
LOG = logging.getLogger(__name__)
@ -33,7 +33,7 @@ def hex_wire(response):
return binascii.b2a_hex(response.to_wire())
class MdnsServiceTest(designate.tests.TestCase):
class MdnsServiceTest(designate.tests.functional.TestCase):
# DNS packet with IQUERY opcode
query_payload = binascii.a2b_hex(
"271209000001000000000000076578616d706c6503636f6d0000010001"

@ -13,6 +13,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_serialization import jsonutils

@ -13,8 +13,10 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.notification_handler import base
from designate.tests import TestCase
import designate.tests.functional
class InheritFormBaseAddressHandler(base.BaseAddressHandler):
@ -36,7 +38,7 @@ class InheritFormBaseAddressHandler(base.BaseAddressHandler):
pass
class BaseAddressHandlerTest(TestCase):
class BaseAddressHandlerTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()

@ -13,16 +13,21 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from designate.notification_handler.neutron import NeutronFloatingHandler
from designate.tests.test_notification_handler import NotificationHandlerMixin
from designate.tests import TestCase
from designate.notification_handler import neutron
import designate.tests.functional
from designate.tests.functional import notification_handler
LOG = logging.getLogger(__name__)
class NeutronFloatingHandlerTest(TestCase, NotificationHandlerMixin):
class NeutronFloatingHandlerTest(
designate.tests.functional.TestCase,
notification_handler.NotificationHandlerMixin):
def setUp(self):
super().setUp()
@ -33,7 +38,7 @@ class NeutronFloatingHandlerTest(TestCase, NotificationHandlerMixin):
'%(octet0)s-%(octet1)s-%(octet2)s-%(octet3)s.X.%(zone)s']
self.config(formatv4=formats, group='handler:neutron_floatingip')
self.plugin = NeutronFloatingHandler()
self.plugin = neutron.NeutronFloatingHandler()
def test_floatingip_associate(self):
event_type = 'floatingip.update.end'

@ -13,20 +13,25 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_log import log as logging
from designate import exceptions
from designate.notification_handler.nova import NovaFixedHandler
from designate.notification_handler import nova
from designate import objects
from designate.tests.test_notification_handler import NotificationHandlerMixin
from designate.tests import TestCase
import designate.tests.functional
from designate.tests.functional import notification_handler
LOG = logging.getLogger(__name__)
class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
class NovaFixedHandlerTest(
designate.tests.functional.TestCase,
notification_handler.NotificationHandlerMixin):
def setUp(self):
super().setUp()
@ -41,7 +46,7 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
'%(host)s.foo.%(zone)s'],
group='handler:nova_fixed')
self.plugin = NovaFixedHandler()
self.plugin = nova.NovaFixedHandler()
def test_instance_create_end(self):
event_type = 'compute.instance.create.end'

@ -16,13 +16,13 @@
from oslo_log import log as logging
from designate import objects
from designate.tests import TestCase
import designate.tests.functional
LOG = logging.getLogger(__name__)
class ProducerServiceTest(TestCase):
class ProducerServiceTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.producer_service = self.start_service('producer')

@ -23,15 +23,15 @@ from oslo_utils import timeutils
from designate.producer import tasks
from designate.storage import sql
from designate.storage.sqlalchemy import tables
from designate.tests import fixtures
from designate.tests import TestCase
from designate.tests import base_fixtures
import designate.tests.functional
from designate.worker import rpcapi as worker_api
LOG = logging.getLogger(__name__)
class DeletedZonePurgeTest(TestCase):
class DeletedZonePurgeTest(designate.tests.functional.TestCase):
number_of_zones = 20
batch_size = 5
time_threshold = 24 * 60 * 60
@ -44,7 +44,7 @@ class DeletedZonePurgeTest(TestCase):
group='producer_task:zone_purge'
)
self.purge_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(tasks.DeletedZonePurgeTask)
base_fixtures.ZoneManagerTaskFixture(tasks.DeletedZonePurgeTask)
)
def _create_deleted_zone(self, name, mock_deletion_time):
@ -98,7 +98,8 @@ class DeletedZonePurgeTest(TestCase):
self.assertEqual(len(remaning_zones), self.number_of_zones // 2)
class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
class PeriodicGenerateDelayedNotifyTaskTest(
designate.tests.functional.TestCase):
number_of_zones = 20
batch_size = 5
@ -110,7 +111,7 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
group='producer_task:delayed_notify'
)
self.generate_delayed_notify_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(
base_fixtures.ZoneManagerTaskFixture(
tasks.PeriodicGenerateDelayedNotifyTask
)
)
@ -155,7 +156,7 @@ class PeriodicGenerateDelayedNotifyTaskTest(TestCase):
)
class PeriodicIncrementSerialTaskTest(TestCase):
class PeriodicIncrementSerialTaskTest(designate.tests.functional.TestCase):
number_of_zones = 20
batch_size = 20
@ -170,7 +171,7 @@ class PeriodicIncrementSerialTaskTest(TestCase):
group='producer_task:increment_serial'
)
self.increment_serial_task_fixture = self.useFixture(
fixtures.ZoneManagerTaskFixture(
base_fixtures.ZoneManagerTaskFixture(
tasks.PeriodicIncrementSerialTask
)
)

@ -21,14 +21,14 @@ import testscenarios
import designate.conf
from designate import exceptions
from designate import quota
import designate.tests
import designate.tests.functional
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
load_tests = testscenarios.load_tests_apply_scenarios
class QuotaTestCase(designate.tests.TestCase):
class QuotaTestCase(designate.tests.functional.TestCase):
scenarios = [
('noop', dict(quota_driver='noop')),
('storage', dict(quota_driver='storage'))

@ -18,12 +18,12 @@ from oslo_log import log as logging
from designate.common import constants
from designate import exceptions
from designate import quota
from designate import tests
import designate.tests.functional
LOG = logging.getLogger(__name__)
class StorageQuotaTest(tests.TestCase):
class StorageQuotaTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.config(quota_driver='storage')

@ -14,14 +14,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from designate import context
from designate import exceptions
import designate.tests
import designate.tests.functional
class TestDesignateContext(designate.tests.TestCase):
class TestDesignateContext(designate.tests.functional.TestCase):
def test_deepcopy(self):
orig = context.DesignateContext(
user_id='12345', project_id='54321'

@ -9,6 +9,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import dns
@ -20,12 +22,13 @@ from designate import dnsmiddleware
from designate import dnsutils
from designate.mdns import handler
from designate import storage
import designate.tests
import designate.tests.functional
CONF = designate.conf.CONF
class TestSerializationMiddleware(designate.tests.TestCase):
class TestSerializationMiddleware(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.storage = storage.get_storage()

@ -13,6 +13,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import dns
import dns.query
import dns.tsigkeyring
@ -22,7 +24,8 @@ from designate import dnsutils
from designate import exceptions
from designate import objects
from designate import storage
import designate.tests
import designate.tests.functional
CONF = designate.conf.CONF
SAMPLES = {
@ -81,7 +84,7 @@ SAMPLES = {
}
class TestTsigUtils(designate.tests.TestCase):
class TestTsigUtils(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.storage = storage.get_storage()
@ -113,7 +116,7 @@ class TestTsigUtils(designate.tests.TestCase):
self.assertIsNone(self.tsig_keyring.get(query.keyname))
class TestUtils(designate.tests.TestCase):
class TestUtils(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()

@ -25,7 +25,7 @@ from oslo_log import log as logging
from designate.storage import sqlalchemy
from designate.storage.sqlalchemy.alembic import legacy_utils
from designate import tests
import designate.tests.functional
LOG = logging.getLogger(__name__)
@ -35,7 +35,8 @@ ALEMBIC_PATH = os.path.join(
class DesignateMigrationsWalk(
test_fixtures.OpportunisticDBTestMixin, tests.TestCase,
test_fixtures.OpportunisticDBTestMixin,
designate.tests.functional.TestCase,
):
# Migrations can take a long time, particularly on underpowered CI nodes.
# Give them some breathing room.

@ -20,7 +20,7 @@ import sqlalchemy as sa
from sqlalchemy.sql import operators
from designate.storage.sqlalchemy import base
from designate.tests import TestCase
import designate.tests.functional
metadata = sa.MetaData()
@ -32,7 +32,7 @@ dummy_table = sa.Table(
)
class SQLAlchemyTestCase(TestCase):
class SQLAlchemyTestCase(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.query = mock.Mock()

@ -26,7 +26,7 @@ from designate import exceptions
from designate import objects
from designate import storage
from designate.storage import sql
from designate.tests import TestCase
import designate.tests.functional
from designate.utils import generate_uuid
@ -34,7 +34,7 @@ CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
class SqlalchemyStorageTest(TestCase):
class SqlalchemyStorageTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.storage = storage.get_storage()

@ -19,10 +19,10 @@ from sqlalchemy.schema import Table
from designate.cmd import status
from designate.storage import sql
from designate import tests
import designate.tests.functional
class TestDuplicateServiceStatus(tests.TestCase):
class TestDuplicateServiceStatus(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.meta = MetaData()

@ -22,11 +22,11 @@ import dns.message
import dns.query
from designate import objects
from designate.tests import TestCase
import designate.tests.functional
from designate.worker.tasks import zone
class WorkerNotifyTest(TestCase):
class WorkerNotifyTest(designate.tests.functional.TestCase):
test_zone = {
'name': 'example.com.',
'email': 'example@example.com',

@ -9,13 +9,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_log import log as logging
from oslo_utils import timeutils
import designate.conf
from designate.tests import fixtures
from designate.tests import base_fixtures
import designate.tests.functional
from designate.worker import processing
from designate.worker.tasks import zone
@ -24,10 +27,10 @@ CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
class RecoverShardTest(designate.tests.TestCase):
class RecoverShardTest(designate.tests.functional.TestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.executor = processing.Executor()

@ -1,6 +1,6 @@
This directory contains pure unit tests.
This directory contains all unit tests.
Examples:
tox -e py3 -- tests.unit
tox -e py3 -- tests.unit.backend
tox -e py3 -- tests.unit.backend

@ -13,12 +13,15 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslotest.base
from designate.api.admin.views import base
from designate import exceptions
from designate import objects
import designate.tests
class MockRequest:
@ -26,7 +29,7 @@ class MockRequest:
self.GET = GET
class TestAdminAPI(designate.tests.TestCase):
class TestAdminAPI(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()

@ -13,11 +13,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import oslotest.base
from designate import exceptions
from designate.objects.adapters.api_v2 import base
import designate.tests
class MockRequest:
@ -25,7 +28,7 @@ class MockRequest:
self.GET = GET
class TestAPIv2(designate.tests.TestCase):
class TestAPIv2(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()

@ -9,6 +9,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslotest.base
from designate.api.v2.controllers import floatingips

@ -13,6 +13,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import fixtures

@ -9,6 +9,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_config import fixture as cfg_fixture

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import fixture as cfg_fixture
import oslotest.base
import webtest

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import fixture as cfg_fixture
import oslotest.base
from unittest import mock

@ -24,7 +24,7 @@ from designate.backend import impl_akamai_v2 as akamai
from designate import context
from designate import exceptions
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
@ -81,7 +81,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
client_token='client_token'
)
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.assertRaisesRegex(
exceptions.Backend,
'contractId is required for zone creation',
@ -101,7 +101,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
client_token='client_token'
)
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
backend.create_zone(self.admin_context, self.zone)
project_id = self.admin_context.project_id or self.zone.tenant_id
@ -132,7 +132,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
mock_post.return_value = self.gen_response(409, 'Conflict')
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
backend.create_zone(self.admin_context, self.zone)
project_id = self.admin_context.project_id or self.zone.tenant_id
@ -166,7 +166,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
client_token='client_token'
)
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
backend.create_zone(self.admin_context, self.zone)
project_id = self.admin_context.project_id or self.zone.tenant_id
@ -209,7 +209,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
mock_post.return_value = self.gen_response(
400, 'Bad Request', json_data)
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.assertRaisesRegex(
exceptions.Backend,
'Zone creation failed due to: Missed A option',
@ -244,7 +244,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
mock_post.return_value = self.gen_response(200, 'Success')
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
backend.delete_zone(self.admin_context, self.zone)
mock_post.assert_called_once_with(
@ -273,7 +273,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
mock_post.return_value = self.gen_response(
403, 'Bad Request', {'detail': 'Unexpected error'})
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.assertRaisesRegex(
exceptions.Backend,
'Zone deletion failed due to: Unexpected error',
@ -305,7 +305,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
mock_post.return_value = self.gen_response(
404, 'Bad Request', {'detail': 'Unexpected error'})
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
backend.delete_zone(self.admin_context, self.zone)
mock_post.assert_called_once_with(
@ -346,8 +346,8 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
self.gen_response(200, 'Success', {'isComplete': True})
]
with fixtures.random_seed(0), mock.patch.object(akamai.time,
'sleep') as mock_sleep:
with base_fixtures.random_seed(0), mock.patch.object(
akamai.time, 'sleep') as mock_sleep:
mock_sleep.return_value = None
backend.delete_zone(self.admin_context, self.zone)
@ -396,8 +396,8 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
self.gen_response(200, 'Success', {'isComplete': False})
]
with fixtures.random_seed(0), mock.patch.object(akamai.time,
'sleep') as mock_sleep:
with base_fixtures.random_seed(0), mock.patch.object(
akamai.time, 'sleep') as mock_sleep:
mock_sleep.return_value = None
self.assertRaisesRegex(
exceptions.Backend,
@ -442,7 +442,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
self.gen_response(409, 'Conflict', {'detail': 'Intenal Error'})
]
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.assertRaisesRegex(
exceptions.Backend,
'Zone deletion failed due to: Intenal Error',
@ -481,7 +481,7 @@ class AkamaiBackendTestCase(oslotest.base.BaseTestCase):
self.gen_response(200, 'Success')
]
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.assertRaisesRegex(
exceptions.Backend,
'Zone deletion failed due to: requestId missed in response',

@ -20,13 +20,13 @@ from designate.backend import base
from designate.backend import impl_pdns4
from designate import context
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
class BaseBackendTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.context = mock.Mock()

@ -20,7 +20,7 @@ from designate.backend import impl_bind9
from designate import context
from designate import exceptions
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
from designate import utils
import subprocess
@ -29,7 +29,7 @@ import subprocess
class Bind9BackendTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.admin_context = mock.Mock()
@ -68,7 +68,7 @@ class Bind9BackendTestCase(oslotest.base.BaseTestCase):
@mock.patch.object(impl_bind9.Bind9Backend, '_execute_rndc')
def test_create_zone(self, mock_execute):
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.backend.create_zone(self.admin_context, self.zone)
mock_execute.assert_called_with(
@ -80,7 +80,7 @@ class Bind9BackendTestCase(oslotest.base.BaseTestCase):
@mock.patch.object(impl_bind9.Bind9Backend, '_execute_rndc')
def test_update_zone(self, mock_execute):
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.backend.update_zone(self.admin_context, self.zone)
mock_execute.assert_called_with(
@ -96,7 +96,7 @@ class Bind9BackendTestCase(oslotest.base.BaseTestCase):
mock_get_zone.return_value = True
mock_execute.side_effect = exceptions.Backend('error')
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.backend.update_zone(self.admin_context, self.zone)
self.assertIn('Error updating zone', self.stdlog.logger.output)
@ -112,7 +112,7 @@ class Bind9BackendTestCase(oslotest.base.BaseTestCase):
@mock.patch.object(impl_bind9.Bind9Backend, '_execute_rndc')
def test_get_zone(self, mock_execute):
with fixtures.random_seed(0):
with base_fixtures.random_seed(0):
self.backend.get_zone(self.admin_context, self.zone)
mock_execute.assert_called_with(
@ -145,7 +145,7 @@ class Bind9BackendTestCase(oslotest.base.BaseTestCase):
objects.PoolTarget.from_dict(self.target)
)
with fixtures.random_seed(1):
with base_fixtures.random_seed(1):
backend.create_zone(self.admin_context, self.zone)
mock_execute.assert_called_with(

@ -25,7 +25,7 @@ import oslotest.base
from designate.backend import impl_designate
from designate import context
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
LOG = logging.getLogger(__name__)
@ -34,7 +34,7 @@ LOG = logging.getLogger(__name__)
class DesignateBackendTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.admin_context = mock.Mock()

@ -17,13 +17,13 @@ import oslotest.base
from designate.backend import impl_fake
from designate import context
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
class FakeBackendTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.admin_context = mock.Mock()

@ -23,13 +23,13 @@ from designate.backend import impl_ns1
from designate import context
from designate import exceptions
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
class NS1BackendTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.context = mock.Mock()

@ -18,13 +18,13 @@ from designate.backend import impl_pdns4
from designate import context
from designate import exceptions
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
class PDNS4BackendTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.context = mock.Mock()

@ -16,7 +16,7 @@ import oslotest.base
from designate.common.decorators import rpc
import designate.conf
from designate.tests import fixtures
from designate.tests import base_fixtures
CONF = designate.conf.CONF
@ -39,7 +39,7 @@ class RPCLog:
class TestRPCLogging(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
def test_rpc_logging(self):

@ -26,7 +26,7 @@ from designate import exceptions
from designate.mdns import handler
from designate import objects
from designate import rpc
from designate.tests import fixtures
from designate.tests import base_fixtures
from designate.worker import rpcapi as worker_rpcapi
@ -36,7 +36,7 @@ CONF = designate.conf.CONF
class MdnsHandleTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.useFixture(cfg_fixture.Config(CONF))

@ -26,7 +26,7 @@ from designate import policy
from designate import rpc
import designate.service
from designate import storage
from designate.tests import fixtures
from designate.tests import base_fixtures
import designate.utils
@ -40,7 +40,7 @@ class MdnsServiceTest(oslotest.base.BaseTestCase):
@mock.patch.object(rpc, 'initialized')
def setUp(self, mock_rpc_initialized, mock_rpc_init, mock_policy_init):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
mock_rpc_initialized.return_value = False

@ -15,14 +15,14 @@ import oslotest.base
import designate.conf
from designate.notification_handler import fake
from designate.tests import test_notification_handler
from designate.tests.functional import notification_handler
CONF = designate.conf.CONF
class TestFakeHandler(oslotest.base.BaseTestCase,
test_notification_handler.NotificationHandlerMixin):
notification_handler.NotificationHandlerMixin):
@mock.patch('designate.rpc.get_client')
def setUp(self, mock_get_instance):

@ -26,7 +26,7 @@ from designate import policy
from designate.producer import service
from designate import rpc
import designate.service
from designate.tests import fixtures
from designate.tests import base_fixtures
CONF = designate.conf.CONF
@ -43,7 +43,7 @@ class ProducerServiceTest(oslotest.base.BaseTestCase):
super().setUp()
self.useFixture(cfg_fixture.Config(CONF))
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
mock_rpc_initialized.return_value = False

@ -14,9 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit test Producer tasks
"""
import datetime
from unittest import mock
@ -32,12 +30,13 @@ import designate.conf
from designate import context
from designate.producer import tasks
from designate import rpc
from designate.tests import fixtures as tests_fixtures
from designate.tests import base_fixtures
from designate.tests.unit import RoObject
from designate.tests.unit import RwObject
from designate.utils import generate_uuid
from designate.worker import rpcapi as worker_api
DUMMY_TASK_GROUP = cfg.OptGroup(
name='producer_task:dummy',
title='Configuration for Producer Task: Dummy Task'
@ -349,7 +348,7 @@ class PeriodicGenerateDelayedNotifyTaskTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.useFixture(cfg_fixture.Config(CONF))
self.stdlog = tests_fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.central_api = mock.Mock()

@ -13,17 +13,24 @@
# under the License.
from unittest import mock
from oslo_config import fixture as cfg_fixture
import oslotest.base
import designate.conf
from designate import exceptions
from designate import objects
from designate import scheduler
from designate import tests
class SchedulerTest(tests.TestCase):
CONF = designate.conf.CONF
class SchedulerTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.context = self.get_context()
self.useFixture(cfg_fixture.Config(CONF))
self.context = mock.Mock()
self.zone = objects.Zone(
name="example.com.",
type="PRIMARY",
@ -76,7 +83,7 @@ class SchedulerTest(tests.TestCase):
}
mock_storage = mock.Mock(**attrs)
self.CONF.set_override(
CONF.set_override(
'scheduler_filters', ['random'], 'service:central'
)
@ -89,7 +96,7 @@ class SchedulerTest(tests.TestCase):
)
def test_no_filters_enabled(self):
self.CONF.set_override(
CONF.set_override(
'scheduler_filters', [], 'service:central'
)

@ -11,10 +11,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import fixtures
import oslotest.base
from designate import exceptions
from designate import objects
from designate import policy
@ -23,13 +27,12 @@ from designate.scheduler.filters import default_pool_filter
from designate.scheduler.filters import fallback_filter
from designate.scheduler.filters import in_doubt_default_pool_filter
from designate.scheduler.filters import pool_id_attribute_filter
from designate import tests
class SchedulerFilterTest(tests.TestCase):
class SchedulerFilterTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.context = self.get_context()
self.context = mock.Mock()
self.zone = objects.Zone(
name="example.com.",
type="PRIMARY",

@ -12,12 +12,12 @@
from unittest import mock
from oslo_config import fixture as cfg_fixture
import oslotest.base
import designate.conf
from designate import exceptions
from designate import objects
from designate import scheduler
from designate import tests
CONF = designate.conf.CONF
@ -64,11 +64,11 @@ def build_test_pools():
return pools
class AttributeSchedulerPermutationsTest(tests.TestCase):
class AttributeSchedulerPermutationsTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.CONF = self.useFixture(cfg_fixture.Config(CONF)).conf
self.context = self.get_context()
self.context = mock.Mock()
self.CONF.set_override(
'scheduler_filters', ['attribute'], 'service:central'
@ -182,11 +182,11 @@ class AttributeSchedulerPermutationsTest(tests.TestCase):
)
class DefaultSchedulerPermutationsTest(tests.TestCase):
class DefaultSchedulerPermutationsTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.CONF = self.useFixture(cfg_fixture.Config(CONF)).conf
self.context = self.get_context()
self.context = mock.Mock()
self.CONF.set_override(
'scheduler_filters', ['default_pool'], 'service:central'
@ -214,11 +214,11 @@ class DefaultSchedulerPermutationsTest(tests.TestCase):
self.assertEqual(DEFAULT_POOL_ID, result)
class FallbackSchedulerPermutationsTest(tests.TestCase):
class FallbackSchedulerPermutationsTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.CONF = self.useFixture(cfg_fixture.Config(CONF)).conf
self.context = self.get_context()
self.context = mock.Mock()
self.CONF.set_override(
'scheduler_filters', ['attribute', 'fallback'], 'service:central'

@ -21,15 +21,15 @@ from designate.notification_handler import fake
from designate import policy
from designate import rpc
from designate.sink import service
from designate.tests import fixtures
from designate.tests import test_notification_handler
from designate.tests import base_fixtures
from designate.tests.functional import notification_handler
CONF = designate.conf.CONF
class TestSinkNotification(oslotest.base.BaseTestCase,
test_notification_handler.NotificationHandlerMixin):
notification_handler.NotificationHandlerMixin):
@mock.patch.object(policy, 'init')
@mock.patch.object(rpc, 'get_client')
@ -41,7 +41,7 @@ class TestSinkNotification(oslotest.base.BaseTestCase,
mock_rpc_initialized.return_value = False
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(cfg_fixture.Config(CONF))
self.useFixture(self.stdlog)

@ -9,22 +9,34 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.mport threading
from unittest import mock
from oslo_config import fixture as cfg_fixture
import oslotest.base
import designate.conf
import designate.rpc
from designate.sink import service
import designate.tests
from designate.tests import fixtures
from designate.tests import base_fixtures
import designate.tests.functional
class TestSinkService(designate.tests.TestCase):
CONF = designate.conf.CONF
class TestSinkService(oslotest.base.BaseTestCase):
@mock.patch('designate.policy.init', mock.Mock())
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.useFixture(cfg_fixture.Config(CONF))
self.CONF.set_override('enabled_notification_handlers', ['fake'],
'service:sink')
CONF.set_override(
'enabled_notification_handlers', ['fake'], 'service:sink'
)
self.service = service.Service()

@ -22,7 +22,7 @@ import tooz.coordination
import designate.conf
from designate import coordination
from designate.tests import fixtures
from designate.tests import base_fixtures
CONF = designate.conf.CONF
@ -160,7 +160,9 @@ class TestPartitioner(oslotest.base.BaseTestCase):
super().setUp()
def _get_partitioner(self, partitions, host=b'a'):
fixture = self.useFixture(fixtures.CoordinatorFixture('zake://', host))
fixture = self.useFixture(base_fixtures.CoordinatorFixture(
'zake://', host)
)
group = 'group'
fixture.coordinator.create_group(group)
fixture.coordinator.join_group(group)

@ -21,7 +21,7 @@ import oslotest.base
import designate.conf
from designate import heartbeat_emitter
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
CONF = designate.conf.CONF
@ -30,7 +30,7 @@ CONF = designate.conf.CONF
class HeartbeatEmitterTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.useFixture(cfg_fixture.Config(CONF))

@ -20,7 +20,7 @@ import oslotest.base
from designate import notifications
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
LOG = logging.getLogger(__name__)
@ -54,7 +54,7 @@ class AuditNotificationTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.driver = notifications.Audit()

@ -26,7 +26,7 @@ from designate.mdns import handler
from designate import policy
from designate import rpc
from designate import service as designate_service
from designate.tests import fixtures
from designate.tests import base_fixtures
from designate import utils
@ -38,7 +38,7 @@ class BaseServiceTest(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
@mock.patch.object(service, 'launch')
@ -259,7 +259,7 @@ class TestDNSService(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.useFixture(cfg_fixture.Config(CONF))
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.tg = mock.Mock()

@ -19,7 +19,7 @@ import oslotest.base
import designate.conf
from designate import exceptions
from designate.tests import fixtures
from designate.tests import base_fixtures
from designate import utils
@ -29,7 +29,7 @@ CONF = designate.conf.CONF
class TestUtils(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(cfg_fixture.Config(CONF))
self.useFixture(self.stdlog)

@ -16,21 +16,21 @@
from unittest import mock
from oslo_config import fixture as cfg_fixture
import oslotest.base
import designate.conf
from designate import exceptions
from designate.tests import fixtures
from designate.tests import TestCase
from designate.tests import base_fixtures
from designate.worker import processing
CONF = designate.conf.CONF
class TestProcessingExecutor(TestCase):
class TestProcessingExecutor(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(cfg_fixture.Config(CONF))
self.useFixture(self.stdlog)

@ -31,7 +31,7 @@ from designate import rpc
import designate.service
from designate import storage
import designate.tests
from designate.tests import fixtures
from designate.tests import base_fixtures
from designate.worker import processing
from designate.worker import service
@ -50,7 +50,7 @@ class WorkerServiceTest(oslotest.base.BaseTestCase):
mock_rpc_initialized.return_value = False
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.useFixture(cfg_fixture.Config(CONF))

@ -23,7 +23,7 @@ import designate.conf
from designate import dnsutils
from designate import exceptions
from designate import objects
from designate.tests import fixtures
from designate.tests import base_fixtures
from designate.worker.tasks import zone as worker_zone
@ -33,7 +33,7 @@ CONF = designate.conf.CONF
class TestXfr(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.stdlog = fixtures.StandardLogging()
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.useFixture(cfg_fixture.Config(CONF))
self.context = mock.Mock()