Multiple testing fixes

* Changes NotificationHandler base to be a object and the tests to inherit it
  like a mixin, test_invalid method fails if not due to duplicate test id.
* Changes backend tests to use scenarios and special NSD4 tests puts the setUp
  logic into a fixture which is loaded for the NSD4 backend tests.
* Remove any get_*_service call into start_service(name) which uses a fixture
  ServiceFixture that automatically cleans up the service upon teardown (no
  more sprinkling tearDowns all over the tests to stop a service).
* Use fixture for database. Removes storage.setup_schema and teardown_schema.
* Fix cleanup of notifications.
* Use fixture for Policy instead of in teardown().

bug #1256728

Change-Id: I72a53eed9e81d3f014015aa443582de21a4e66ee
This commit is contained in:
Endre Karlson 2013-12-02 01:25:49 +01:00
parent a109782c8a
commit eb164336c5
23 changed files with 269 additions and 360 deletions

@ -24,17 +24,3 @@ def get_storage():
""" Return the engine class from the provided engine name """
storage_driver = cfg.CONF['service:central'].storage_driver
return Storage.get_plugin(storage_driver, invoke_on_load=True)
def setup_schema():
""" Create the DB - Used for testing purposes """
LOG.debug("Setting up Schema")
storage = get_storage()
storage.setup_schema()
def teardown_schema():
""" Reset the DB to default - Used for testing purposes """
LOG.debug("Tearing down Schema")
storage = get_storage()
storage.teardown_schema()

@ -22,15 +22,12 @@ from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.openstack.common.notifier import test_notifier
from designate.openstack.common.fixture import config
from designate.openstack.common import importutils
from designate.openstack.common import policy
from designate.openstack.common import test
from designate.context import DesignateContext
from designate import storage
from designate import exceptions
from designate.agent import service as agent_service
from designate.api import service as api_service
from designate.central import service as central_service
from designate.sink import service as sink_service
LOG = logging.getLogger(__name__)
@ -42,11 +39,24 @@ cfg.CONF.import_opt('auth_strategy', 'designate.api',
group='service:api')
cfg.CONF.import_opt('database_connection', 'designate.storage.impl_sqlalchemy',
group='storage:sqlalchemy')
# NOTE: Since we're importing service classes in start_service this breaks
# if not here.
cfg.CONF.import_opt(
'notification_driver', 'designate.openstack.common.notifier.api')
class StorageFixture(fixtures.Fixture):
def setUp(self):
super(StorageFixture, self).setUp()
self.storage = storage.get_storage()
self.storage.setup_schema()
self.addCleanup(self.storage.teardown_schema)
class NotifierFixture(fixtures.Fixture):
def tearDown(self):
self.clear()
def setUp(self):
super(NotifierFixture, self).setUp()
self.addCleanup(self.clear)
def get(self):
return test_notifier.NOTIFICATIONS
@ -55,6 +65,30 @@ class NotifierFixture(fixtures.Fixture):
test_notifier.NOTIFICATIONS = []
class ServiceFixture(fixtures.Fixture):
def __init__(self, svc_name, *args, **kw):
cls = importutils.import_class(
'designate.%s.service.Service' % svc_name)
self.svc = cls(*args, **kw)
def setUp(self):
super(ServiceFixture, self).setUp()
self.svc.start()
self.addCleanup(self.kill)
def kill(self):
try:
self.svc.kill()
except Exception:
pass
class PolicyFixture(fixtures.Fixture):
def setUp(self):
super(PolicyFixture, self).setUp()
self.addCleanup(policy.reset)
class TestCase(test.BaseTestCase):
quota_fixtures = [{
'tenant_id': '12345',
@ -138,16 +172,13 @@ class TestCase(test.BaseTestCase):
self.notifications = NotifierFixture()
self.useFixture(self.notifications)
storage.setup_schema()
storage_fixture = StorageFixture()
self.useFixture(storage_fixture)
self.useFixture(PolicyFixture())
self.admin_context = self.get_admin_context()
def tearDown(self):
self.reset_notifications()
policy.reset()
storage.teardown_schema()
super(TestCase, self).tearDown()
# Config Methods
def config(self, **kwargs):
group = kwargs.pop('group', None)
@ -174,18 +205,13 @@ class TestCase(test.BaseTestCase):
def reset_notifications(self):
self.notifications.clear()
# Service Methods
def get_agent_service(self):
return agent_service.Service()
def get_api_service(self):
return api_service.Service()
def get_central_service(self):
return central_service.Service()
def get_sink_service(self):
return sink_service.Service()
def start_service(self, svc_name, *args, **kw):
"""
Convenience method for starting a service!
"""
fixture = ServiceFixture(svc_name, *args, **kw)
self.useFixture(fixture)
return fixture.svc
# Context Methods
def get_context(self, **kwargs):

@ -19,9 +19,8 @@ from designate.tests.test_agent import AgentTestCase
class AgentServiceTest(AgentTestCase):
def setUp(self):
super(AgentServiceTest, self).setUp()
self.service = self.get_agent_service()
self.service = self.start_service('agent')
def test_start_and_stop(self):
# Ensures the start/stop actions don't raise
self.service.start()
def test_stop(self):
# NOTE: Start is already done by the fixture in start_service()
self.service.stop()

@ -23,9 +23,8 @@ class ApiServiceTest(ApiTestCase):
# Use a random port for the API
self.config(api_port=0, group='service:api')
self.service = self.get_api_service()
self.service = self.start_service('api')
def test_start_and_stop(self):
# Ensures the start/stop actions don't raise
self.service.start()
# NOTE: Start is already done by the fixture in start_service()
self.service.stop()

@ -44,13 +44,7 @@ class ApiV1Test(ApiTestCase):
# Obtain a test client
self.client = self.app.test_client()
# Create and start an instance of the central service
self.central_service = self.get_central_service()
self.central_service.start()
def tearDown(self):
self.central_service.stop()
super(ApiV1Test, self).tearDown()
self.central_service = self.start_service('central')
def get(self, path, **kw):
expected_status_code = kw.pop('status_code', 200)

@ -43,13 +43,10 @@ class ApiV2TestCase(ApiTestCase):
self.client = TestApp(self.app)
# Create and start an instance of the central service
self.central_service = self.get_central_service()
self.central_service.start()
self.central_service = self.start_service('central')
def tearDown(self):
self.app = None
self.client = None
self.central_service.stop()
super(ApiV2TestCase, self).tearDown()

@ -1,6 +1,6 @@
# Copyright 2012 Managed I.T.
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -13,19 +13,3 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.tests import TestCase
from designate import backend
LOG = logging.getLogger(__name__)
class BackendTestCase(TestCase):
def get_backend_driver(self):
central_service = self.get_central_service()
return backend.get_backend(cfg.CONF['service:agent'].backend_driver,
central_service=central_service)
def test_constructor(self):
self.get_backend_driver()

@ -0,0 +1,51 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import backend
from designate import tests
from designate.tests.test_backend.test_nsd4slave import NSD4Fixture
LOG = logging.getLogger(__name__)
class BackendTestCase(tests.TestCase):
scenarios = [
('bind9', dict(backend_driver='bind9', group='service:agent')),
('dnsmasq', dict(backend_driver='dnsmasq', group='service:agent')),
('fake', dict(backend_driver='fake', group='service:agent')),
('mysqlbind9', dict(backend_driver='mysqlbind9',
group='service:agent')),
('nsd4slave', dict(backend_driver='nsd4slave', group='service:agent',
server_fixture=NSD4Fixture)),
('powerdns', dict(backend_driver='powerdns', group='service:agent'))
]
def setUp(self):
super(BackendTestCase, self).setUp()
if hasattr(self, 'server_fixture'):
self.useFixture(self.server_fixture())
self.config(backend_driver=self.backend_driver, group=self.group)
def get_backend_driver(self):
central_service = self.start_service('central')
return backend.get_backend(cfg.CONF['service:agent'].backend_driver,
central_service=central_service)
def test_constructor(self):
self.get_backend_driver()

@ -1,26 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests.test_backend import BackendTestCase
LOG = logging.getLogger(__name__)
class Bind9BackendDriverTestCase(BackendTestCase):
def setUp(self):
super(Bind9BackendDriverTestCase, self).setUp()
self.config(backend_driver='bind9', group='service:agent')

@ -1,26 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests.test_backend import BackendTestCase
LOG = logging.getLogger(__name__)
class DnsmasqBackendDriverTestCase(BackendTestCase):
def setUp(self):
super(DnsmasqBackendDriverTestCase, self).setUp()
self.config(backend_driver='dnsmasq', group='service:agent')

@ -1,26 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests.test_backend import BackendTestCase
LOG = logging.getLogger(__name__)
class FakeBackendDriverTestCase(BackendTestCase):
def setUp(self):
super(FakeBackendDriverTestCase, self).setUp()
self.config(backend_driver='fake', group='service:agent')

@ -1,26 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests.test_backend import BackendTestCase
LOG = logging.getLogger(__name__)
class MySQLBind9BackendDriverTestCase(BackendTestCase):
def setUp(self):
super(MySQLBind9BackendDriverTestCase, self).setUp()
self.config(backend_driver='mysqlbind9', group='service:agent')

@ -15,12 +15,16 @@
# under the License.
import eventlet
import fixtures
from mock import MagicMock
import os
import socket
import ssl
from oslo.config import cfg
from designate import backend
from designate import exceptions
from designate.tests.test_backend import BackendTestCase
from mock import MagicMock
from designate import tests
# impl_nsd4slave needs to register its options before being instanciated.
# Import it and pretend to use it to avoid flake8 unused import errors.
from designate.backend import impl_nsd4slave
@ -58,36 +62,46 @@ class NSD4ServerStub:
eventlet.StopServe()
class NSD4SlaveBackendTestCase(BackendTestCase):
__test__ = True
class NSD4Fixture(fixtures.Fixture):
def setUp(self):
super(NSD4SlaveBackendTestCase, self).setUp()
super(NSD4Fixture, self).setUp()
self.servers = [NSD4ServerStub(), NSD4ServerStub()]
[server.start() for server in self.servers]
impl_nsd4slave.DEFAULT_PORT = self.servers[0].port
self.config(backend_driver='nsd4slave', group='service:agent')
self.config(
servers=['127.0.0.1', '127.0.0.1:%d' % self.servers[1].port],
group='backend:nsd4slave')
cfg.CONF.set_override('backend_driver', 'nsd4slave', 'service:agent')
cfg.CONF.set_override(
'servers', ['127.0.0.1', '127.0.0.1:%d' % self.servers[1].port],
'backend:nsd4slave')
keyfile = os.path.join(os.path.dirname(__file__), 'nsd_control.key')
certfile = os.path.join(os.path.dirname(__file__), 'nsd_control.pem')
self.config(keyfile=keyfile, group='backend:nsd4slave')
self.config(certfile=certfile, group='backend:nsd4slave')
self.config(pattern='test-pattern', group='backend:nsd4slave')
self.nsd4 = self.get_backend_driver()
cfg.CONF.set_override('keyfile', keyfile, 'backend:nsd4slave')
cfg.CONF.set_override('certfile', certfile, 'backend:nsd4slave')
cfg.CONF.set_override('pattern', 'test-pattern', 'backend:nsd4slave')
self.addCleanup(self.tearDown)
def tearDown(self):
super(NSD4SlaveBackendTestCase, self).tearDown()
[server.stop() for server in self.servers]
# NOTE: We'll only test the specifics to the nsd4 backend here.
# Rest is handled via scenarios
class NSD4SlaveBackendTestCase(tests.TestCase):
def setUp(self):
super(NSD4SlaveBackendTestCase, self).setUp()
self.server_fixture = NSD4Fixture()
self.useFixture(self.server_fixture)
central_service = self.start_service('central')
self.nsd4 = backend.get_backend(
cfg.CONF['service:agent'].backend_driver,
central_service=central_service)
def test_create_domain(self):
context = self.get_context()
domain = self.get_domain_fixture()
self.nsd4.create_domain(context, domain)
command = 'NSDCT1 addzone %s test-pattern\n' % domain['name']
[self.assertEqual(server.recved_command, command)
for server in self.servers]
for server in self.server_fixture.servers]
def test_delete_domain(self):
context = self.get_context()
@ -95,10 +109,10 @@ class NSD4SlaveBackendTestCase(BackendTestCase):
self.nsd4.delete_domain(context, domain)
command = 'NSDCT1 delzone %s\n' % domain['name']
[self.assertEqual(server.recved_command, command)
for server in self.servers]
for server in self.server_fixture.servers]
def test_server_not_ok(self):
self.servers[0].response = 'goat'
self.server_fixture.servers[0].response = 'goat'
context = self.get_context()
domain = self.get_domain_fixture()
self.assertRaises(exceptions.NSD4SlaveBackendError,

@ -1,26 +0,0 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests.test_backend import BackendTestCase
LOG = logging.getLogger(__name__)
class PowerDNSBackendDriverTestCase(BackendTestCase):
def setUp(self):
super(PowerDNSBackendDriverTestCase, self).setUp()
self.config(backend_driver='powerdns', group='service:agent')

@ -26,11 +26,10 @@ LOG = logging.getLogger(__name__)
class CentralServiceTest(CentralTestCase):
def setUp(self):
super(CentralServiceTest, self).setUp()
self.central_service = self.get_central_service()
self.central_service = self.start_service('central')
def test_start_and_stop(self):
# Ensures the start/stop actions don't raise
self.central_service.start()
def test_stop(self):
# Test stopping the service
self.central_service.stop()
def test_is_valid_domain_name(self):

@ -15,31 +15,14 @@
# under the License.
import json
import os
import six
import testtools
from designate.notification_handler.base import Handler
from designate.tests import TestCase
from designate.tests import SkipNotImplementedMeta
FIXTURES_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__),
'..',
'sample_notifications'))
@six.add_metaclass(SkipNotImplementedMeta)
class NotificationHandlerTestCase(TestCase):
__plugin_base__ = Handler
def setUp(self):
super(NotificationHandlerTestCase, self).setUp()
self.central_service = self.get_central_service()
self.central_service.start()
def tearDown(self):
self.central_service.stop()
super(NotificationHandlerTestCase, self).tearDown()
class NotificationHandlerMixin(object):
def get_notification_fixture(self, service, name):
filename = os.path.join(FIXTURES_PATH, service, '%s.json' % name)

@ -14,17 +14,20 @@
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests import TestCase
from designate.notification_handler.nova import NovaFixedHandler
from designate.tests.test_notification_handler import \
NotificationHandlerTestCase
NotificationHandlerMixin
LOG = logging.getLogger(__name__)
class NovaFixedHandlerTest(NotificationHandlerTestCase):
class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
def setUp(self):
super(NovaFixedHandlerTest, self).setUp()
self.central_service = self.start_service('central')
domain = self.create_domain()
self.domain_id = domain['id']
self.config(domain_id=domain['id'], group='handler:nova_fixed')

@ -14,17 +14,20 @@
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests import TestCase
from designate.notification_handler.quantum import QuantumFloatingHandler
from designate.tests.test_notification_handler import \
NotificationHandlerTestCase
NotificationHandlerMixin
LOG = logging.getLogger(__name__)
class QuantumFloatingHandlerTest(NotificationHandlerTestCase):
class QuantumFloatingHandlerTest(TestCase, NotificationHandlerMixin):
def setUp(self):
super(QuantumFloatingHandlerTest, self).setUp()
self.central_service = self.start_service('central')
domain = self.create_domain()
self.domain_id = domain['id']
self.config(domain_id=domain['id'], group='handler:quantum_floatingip')

@ -13,86 +13,3 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate.tests import TestCase
from designate import quota
from designate import exceptions
LOG = logging.getLogger(__name__)
class QuotaTestCase(TestCase):
def setUp(self):
super(QuotaTestCase, self).setUp()
self.quota = quota.get_quota()
def test_get_quotas(self):
context = self.get_admin_context()
quotas = self.quota.get_quotas(context, 'DefaultQuotaTenant')
self.assertIsNotNone(quotas)
self.assertEqual(quotas, {
'domains': cfg.CONF.quota_domains,
'domain_records': cfg.CONF.quota_domain_records
})
def test_limit_check_unknown(self):
context = self.get_admin_context()
with testtools.ExpectedException(exceptions.QuotaResourceUnknown):
self.quota.limit_check(context, 'tenant_id', unknown=0)
with testtools.ExpectedException(exceptions.QuotaResourceUnknown):
self.quota.limit_check(context, 'tenant_id', unknown=0, domains=0)
def test_limit_check_under(self):
context = self.get_admin_context()
self.quota.limit_check(context, 'tenant_id', domains=0)
self.quota.limit_check(context, 'tenant_id', domain_records=0)
self.quota.limit_check(context, 'tenant_id', domains=0,
domain_records=0)
self.quota.limit_check(context, 'tenant_id',
domains=(cfg.CONF.quota_domains - 1))
self.quota.limit_check(
context,
'tenant_id',
domain_records=(cfg.CONF.quota_domain_records - 1))
def test_limit_check_at(self):
context = self.get_admin_context()
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id',
domains=cfg.CONF.quota_domains)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(
context,
'tenant_id',
domain_records=cfg.CONF.quota_domain_records)
def test_limit_check_over(self):
context = self.get_admin_context()
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domains=99999)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domain_records=99999)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domains=99999,
domain_records=99999)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domains=99999,
domain_records=0)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domains=0,
domain_records=99999)

@ -1,25 +0,0 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate.openstack.common import log as logging
from designate.tests.test_quota import QuotaTestCase
LOG = logging.getLogger(__name__)
class NoopQuotaTest(QuotaTestCase):
def setUp(self):
self.config(quota_driver='noop')
super(NoopQuotaTest, self).setUp()

@ -0,0 +1,106 @@
# Copyright 2013 Hewlett-Packard Development Company, L.P.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
import testtools
from oslo.config import cfg
from designate.openstack.common import log as logging
from designate import quota
from designate import tests
from designate import exceptions
LOG = logging.getLogger(__name__)
class QuotaTestCase(tests.TestCase):
scenarios = [
('noop', dict(quota_driver='noop')),
('storage', dict(quota_driver='storage'))
]
def setUp(self):
super(QuotaTestCase, self).setUp()
self.config(quota_driver=self.quota_driver)
self.quota = quota.get_quota()
def test_get_quotas(self):
context = self.get_admin_context()
quotas = self.quota.get_quotas(context, 'DefaultQuotaTenant')
self.assertIsNotNone(quotas)
self.assertEqual(quotas, {
'domains': cfg.CONF.quota_domains,
'domain_records': cfg.CONF.quota_domain_records
})
def test_limit_check_unknown(self):
context = self.get_admin_context()
with testtools.ExpectedException(exceptions.QuotaResourceUnknown):
self.quota.limit_check(context, 'tenant_id', unknown=0)
with testtools.ExpectedException(exceptions.QuotaResourceUnknown):
self.quota.limit_check(context, 'tenant_id', unknown=0, domains=0)
def test_limit_check_under(self):
context = self.get_admin_context()
self.quota.limit_check(context, 'tenant_id', domains=0)
self.quota.limit_check(context, 'tenant_id', domain_records=0)
self.quota.limit_check(context, 'tenant_id', domains=0,
domain_records=0)
self.quota.limit_check(context, 'tenant_id',
domains=(cfg.CONF.quota_domains - 1))
self.quota.limit_check(
context,
'tenant_id',
domain_records=(cfg.CONF.quota_domain_records - 1))
def test_limit_check_at(self):
context = self.get_admin_context()
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id',
domains=cfg.CONF.quota_domains)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(
context,
'tenant_id',
domain_records=cfg.CONF.quota_domain_records)
def test_limit_check_over(self):
context = self.get_admin_context()
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domains=99999)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domain_records=99999)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domains=99999,
domain_records=99999)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domains=99999,
domain_records=0)
with testtools.ExpectedException(exceptions.OverQuota):
self.quota.limit_check(context, 'tenant_id', domains=0,
domain_records=99999)

@ -13,16 +13,18 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate import quota
from designate import tests
from designate.openstack.common import log as logging
from designate.tests.test_quota import QuotaTestCase
LOG = logging.getLogger(__name__)
class StorageQuotaTest(QuotaTestCase):
class StorageQuotaTest(tests.TestCase):
def setUp(self):
self.config(quota_driver='storage')
super(StorageQuotaTest, self).setUp()
self.config(quota_driver='storage')
self.quota = quota.get_quota()
def test_set_quota_create(self):
quota = self.quota.set_quota(self.admin_context, 'tenant_id',

@ -10,4 +10,5 @@ pyflakes>=0.7.2,<0.7.4
python-subunit
testtools>=0.9.32
testrepository>=0.0.8
testscenarios>=0.4
WebTest>=2.0