Unify Storage vs Rest of World fixture creation

Change-Id: I1b8d602d1baf3ebf5f75c35f3e04efadaa0aa876
This commit is contained in:
Kiall Mac Innes 2014-06-21 12:37:17 +01:00
parent cb6c46397b
commit e45995b3f5
11 changed files with 148 additions and 214 deletions

View File

@ -283,12 +283,13 @@ class TestCase(base.BaseTestCase):
managed_resource_tenant_id='managing_tenant', managed_resource_tenant_id='managing_tenant',
group='service:central') group='service:central')
# "Read" Configuration
self.CONF([], project='designate') self.CONF([], project='designate')
self.useFixture(PolicyFixture()) self.useFixture(PolicyFixture())
self.network_api = NetworkAPIFixture() self.network_api = NetworkAPIFixture()
self.useFixture(self.network_api) self.useFixture(self.network_api)
self.central_service = self.start_service('central')
self.admin_context = self.get_admin_context() self.admin_context = self.get_admin_context()
@ -418,20 +419,13 @@ class TestCase(base.BaseTestCase):
_values.update(values) _values.update(values)
return _values return _values
def create_quota(self, **kwargs):
context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
values = self.get_quota_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_quota(context, values=values)
def create_server(self, **kwargs): def create_server(self, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0) fixture = kwargs.pop('fixture', 0)
values = self.get_server_fixture(fixture=fixture, values=kwargs) values = self.get_server_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_server( return self.central_service.create_server(
context, server=objects.Server(**values)) context, objects.Server(**values))
def create_tld(self, **kwargs): def create_tld(self, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
@ -462,14 +456,14 @@ class TestCase(base.BaseTestCase):
values = self.get_tsigkey_fixture(fixture=fixture, values=kwargs) values = self.get_tsigkey_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_tsigkey( return self.central_service.create_tsigkey(
context, tsigkey=objects.TsigKey(**values)) context, objects.TsigKey(**values))
def create_domain(self, **kwargs): def create_domain(self, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0) fixture = kwargs.pop('fixture', 0)
# We always need a server to create a domain..
try: try:
# We always need a server to create a domain..
self.create_server() self.create_server()
except exceptions.DuplicateServer: except exceptions.DuplicateServer:
pass pass
@ -479,8 +473,8 @@ class TestCase(base.BaseTestCase):
if 'tenant_id' not in values: if 'tenant_id' not in values:
values['tenant_id'] = context.tenant values['tenant_id'] = context.tenant
domain = objects.Domain(**values) return self.central_service.create_domain(
return self.central_service.create_domain(context, domain=domain) context, objects.Domain(**values))
def create_recordset(self, domain, type='A', **kwargs): def create_recordset(self, domain, type='A', **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)

View File

@ -24,10 +24,6 @@ class ApiServiceTest(ApiTestCase):
# Use a random port for the API # Use a random port for the API
self.config(api_port=0, group='service:api') self.config(api_port=0, group='service:api')
# Bring up the Central service as if not the rpc will go into
# AssertError since TRANSPORT is None
self.start_service('central')
self.service = service.Service() self.service = service.Service()
def test_start_and_stop(self): def test_start_and_stop(self):

View File

@ -30,8 +30,6 @@ class ApiV1Test(ApiTestCase):
# Ensure the v1 API is enabled # Ensure the v1 API is enabled
self.config(enable_api_v1=True, group='service:api') self.config(enable_api_v1=True, group='service:api')
self.central_service = self.start_service('central')
# Create the application # Create the application
self.app = api_v1.factory({}) self.app = api_v1.factory({})

View File

@ -40,9 +40,6 @@ class ApiV2TestCase(ApiTestCase):
# Ensure the v2 API is enabled # Ensure the v2 API is enabled
self.config(enable_api_v2=True, group='service:api') self.config(enable_api_v2=True, group='service:api')
# Create and start an instance of the central service
self.central_service = self.start_service('central')
# Create the application # Create the application
self.app = api_v2.factory({}) self.app = api_v2.factory({})

View File

@ -20,9 +20,8 @@ from designate import backend
class BackendTestMixin(object): class BackendTestMixin(object):
def get_backend_driver(self): def get_backend_driver(self):
central_service = self.start_service('central')
return backend.get_backend(cfg.CONF['service:agent'].backend_driver, return backend.get_backend(cfg.CONF['service:agent'].backend_driver,
central_service=central_service) central_service=self.central_service)
def test_constructor(self): def test_constructor(self):
self.get_backend_driver() self.get_backend_driver()

View File

@ -127,7 +127,6 @@ class IPABackendTestCase(tests.TestCase, BackendTestMixin):
self.CONF['backend:ipa'].ipa_auth_driver_class = \ self.CONF['backend:ipa'].ipa_auth_driver_class = \
"designate.tests.test_backend.test_ipa.MockIPAAuth" "designate.tests.test_backend.test_ipa.MockIPAAuth"
self.backend.start() self.backend.start()
self.central_service = self.start_service('central')
# Since some CRUD methods in impl_ipa call central's find_servers # Since some CRUD methods in impl_ipa call central's find_servers
# and find_records method, mock it up to return our fixture. # and find_records method, mock it up to return our fixture.

View File

@ -73,7 +73,6 @@ class PowerDNSBackendTestCase(tests.TestCase, BackendTestMixin):
group='backend:powerdns') group='backend:powerdns')
self.backend = self.get_backend_driver() self.backend = self.get_backend_driver()
self.backend.start() self.backend.start()
self.central_service = self.start_service('central')
# Since some CRUD methods in impl_powerdns call central's find_servers # Since some CRUD methods in impl_powerdns call central's find_servers
# method, mock it up to return our fixture. # method, mock it up to return our fixture.
self.backend.central_service.find_servers = MagicMock( self.backend.central_service.find_servers = MagicMock(

View File

@ -28,10 +28,6 @@ LOG = logging.getLogger(__name__)
class CentralServiceTest(CentralTestCase): class CentralServiceTest(CentralTestCase):
def setUp(self):
super(CentralServiceTest, self).setUp()
self.central_service = self.start_service('central')
def test_stop(self): def test_stop(self):
# Test stopping the service # Test stopping the service
self.central_service.stop() self.central_service.stop()

View File

@ -26,8 +26,6 @@ class NeutronFloatingHandlerTest(TestCase, NotificationHandlerMixin):
def setUp(self): def setUp(self):
super(NeutronFloatingHandlerTest, self).setUp() super(NeutronFloatingHandlerTest, self).setUp()
self.central_service = self.start_service('central')
domain = self.create_domain() domain = self.create_domain()
self.domain_id = domain['id'] self.domain_id = domain['id']
self.config(domain_id=domain['id'], group='handler:neutron_floatingip') self.config(domain_id=domain['id'], group='handler:neutron_floatingip')

View File

@ -26,8 +26,6 @@ class NovaFixedHandlerTest(TestCase, NotificationHandlerMixin):
def setUp(self): def setUp(self):
super(NovaFixedHandlerTest, self).setUp() super(NovaFixedHandlerTest, self).setUp()
self.central_service = self.start_service('central')
domain = self.create_domain() domain = self.create_domain()
self.domain_id = domain['id'] self.domain_id = domain['id']
self.config(domain_id=domain['id'], group='handler:nova_fixed') self.config(domain_id=domain['id'], group='handler:nova_fixed')

View File

@ -27,63 +27,21 @@ LOG = logging.getLogger(__name__)
class StorageTestCase(object): class StorageTestCase(object):
def create_quota(self, fixture=0, values=None, context=None): def create_quota(self, **kwargs):
values = values or {} """
context = context or self.admin_context This create method has been kept in the StorageTestCase class as quotas
are treated differently to other resources in Central.
"""
fixture = self.get_quota_fixture(fixture, values) context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0)
if 'tenant_id' not in fixture: values = self.get_quota_fixture(fixture=fixture, values=kwargs)
fixture['tenant_id'] = context.tenant
return fixture, self.storage.create_quota(context, fixture) if 'tenant_id' not in values:
values['tenant_id'] = context.tenant
def create_server(self, fixture=0, values=None, context=None): return self.storage.create_quota(context, values)
values = values or {}
context = context or self.admin_context
fixture = self.get_server_fixture(fixture, values)
return fixture, self.storage.create_server(
context, objects.Server(**fixture))
def create_tsigkey(self, fixture=0, values=None, context=None):
values = values or {}
context = context or self.admin_context
fixture = self.get_tsigkey_fixture(fixture, values)
return fixture, self.storage.create_tsigkey(
context, objects.TsigKey(**fixture))
def create_domain(self, fixture=0, values=None, context=None):
values = values or {}
context = context or self.admin_context
fixture = self.get_domain_fixture(fixture, values)
if 'tenant_id' not in fixture:
fixture['tenant_id'] = context.tenant
return fixture, self.storage.create_domain(
context, objects.Domain(**fixture))
def create_recordset(self, domain, type='A', fixture=0, values=None,
context=None):
values = values or {}
context = context or self.admin_context
fixture = self.get_recordset_fixture(domain['name'], type, fixture,
values)
return fixture, self.storage.create_recordset(
context, domain['id'], objects.RecordSet(**fixture))
def create_record(self, domain, recordset, fixture=0, values=None,
context=None):
values = values or {}
context = context or self.admin_context
fixture = self.get_record_fixture(recordset['type'], fixture, values)
return fixture, self.storage.create_record(
context, domain['id'], recordset['id'], objects.Record(**fixture))
# Paging Tests # Paging Tests
def _ensure_paging(self, data, method): def _ensure_paging(self, data, method):
@ -157,7 +115,7 @@ class StorageTestCase(object):
self.assertEqual(actual, []) self.assertEqual(actual, [])
# Create a single quota # Create a single quota
_, quota_one = self.create_quota() quota_one = self.create_quota()
actual = self.storage.find_quotas(self.admin_context) actual = self.storage.find_quotas(self.admin_context)
self.assertEqual(len(actual), 1) self.assertEqual(len(actual), 1)
@ -167,7 +125,7 @@ class StorageTestCase(object):
self.assertEqual(actual[0]['hard_limit'], quota_one['hard_limit']) self.assertEqual(actual[0]['hard_limit'], quota_one['hard_limit'])
# Create a second quota # Create a second quota
_, quota_two = self.create_quota(fixture=1) quota_two = self.create_quota(fixture=1)
actual = self.storage.find_quotas(self.admin_context) actual = self.storage.find_quotas(self.admin_context)
self.assertEqual(len(actual), 2) self.assertEqual(len(actual), 2)
@ -177,8 +135,8 @@ class StorageTestCase(object):
self.assertEqual(actual[1]['hard_limit'], quota_two['hard_limit']) self.assertEqual(actual[1]['hard_limit'], quota_two['hard_limit'])
def test_find_quotas_criterion(self): def test_find_quotas_criterion(self):
_, quota_one = self.create_quota(0) quota_one = self.create_quota()
_, quota_two = self.create_quota(1) quota_two = self.create_quota(fixture=1)
criterion = dict( criterion = dict(
tenant_id=quota_one['tenant_id'], tenant_id=quota_one['tenant_id'],
@ -208,7 +166,7 @@ class StorageTestCase(object):
def test_get_quota(self): def test_get_quota(self):
# Create a quota # Create a quota
_, expected = self.create_quota() expected = self.create_quota()
actual = self.storage.get_quota(self.admin_context, expected['id']) actual = self.storage.get_quota(self.admin_context, expected['id'])
self.assertEqual(actual['tenant_id'], expected['tenant_id']) self.assertEqual(actual['tenant_id'], expected['tenant_id'])
@ -221,8 +179,8 @@ class StorageTestCase(object):
self.storage.get_quota(self.admin_context, uuid) self.storage.get_quota(self.admin_context, uuid)
def test_find_quota_criterion(self): def test_find_quota_criterion(self):
_, quota_one = self.create_quota(0) quota_one = self.create_quota()
_, quota_two = self.create_quota(1) quota_two = self.create_quota(fixture=1)
criterion = dict( criterion = dict(
tenant_id=quota_one['tenant_id'], tenant_id=quota_one['tenant_id'],
@ -247,7 +205,7 @@ class StorageTestCase(object):
self.assertEqual(result['hard_limit'], quota_two['hard_limit']) self.assertEqual(result['hard_limit'], quota_two['hard_limit'])
def test_find_quota_criterion_missing(self): def test_find_quota_criterion_missing(self):
_, expected = self.create_quota(0) expected = self.create_quota()
criterion = dict( criterion = dict(
tenant_id=expected['tenant_id'] + "NOT FOUND" tenant_id=expected['tenant_id'] + "NOT FOUND"
@ -258,12 +216,12 @@ class StorageTestCase(object):
def test_update_quota(self): def test_update_quota(self):
# Create a quota # Create a quota
fixture, quota = self.create_quota() fixture = self.get_quota_fixture()
quota = self.create_quota(fixture=1)
updated = self.storage.update_quota(self.admin_context, quota['id'], updated = self.storage.update_quota(self.admin_context, quota['id'],
fixture) fixture)
self.assertEqual(updated['tenant_id'], fixture['tenant_id'])
self.assertEqual(updated['resource'], fixture['resource']) self.assertEqual(updated['resource'], fixture['resource'])
self.assertEqual(updated['hard_limit'], fixture['hard_limit']) self.assertEqual(updated['hard_limit'], fixture['hard_limit'])
@ -272,8 +230,8 @@ class StorageTestCase(object):
context.all_tenants = True context.all_tenants = True
# Create two quotas # Create two quotas
self.create_quota(fixture=0, values={'tenant_id': '1'}) self.create_quota(fixture=0, tenant_id='1')
_, quota = self.create_quota(fixture=0, values={'tenant_id': '2'}) quota = self.create_quota(fixture=0, tenant_id='2')
with testtools.ExpectedException(exceptions.DuplicateQuota): with testtools.ExpectedException(exceptions.DuplicateQuota):
self.storage.update_quota(context, quota['id'], self.storage.update_quota(context, quota['id'],
@ -285,7 +243,7 @@ class StorageTestCase(object):
self.storage.update_quota(self.admin_context, uuid, {}) self.storage.update_quota(self.admin_context, uuid, {})
def test_delete_quota(self): def test_delete_quota(self):
quota_fixture, quota = self.create_quota() quota = self.create_quota()
self.storage.delete_quota(self.admin_context, quota['id']) self.storage.delete_quota(self.admin_context, quota['id'])
@ -324,7 +282,7 @@ class StorageTestCase(object):
self.assertEqual(actual, []) self.assertEqual(actual, [])
# Create a single server # Create a single server
_, server = self.create_server() server = self.create_server()
actual = self.storage.find_servers(self.admin_context) actual = self.storage.find_servers(self.admin_context)
self.assertEqual(len(actual), 1) self.assertEqual(len(actual), 1)
@ -333,15 +291,14 @@ class StorageTestCase(object):
# Order of found items later will be reverse of the order they are # Order of found items later will be reverse of the order they are
# created # created
created = [self.create_server( created = [self.create_server(
values={'name': 'ns%s.example.org.' % i})[1] name='ns%s.example.org.' % i) for i in xrange(10, 20)]
for i in xrange(10, 20)]
created.insert(0, server) created.insert(0, server)
self._ensure_paging(created, self.storage.find_servers) self._ensure_paging(created, self.storage.find_servers)
def test_find_servers_criterion(self): def test_find_servers_criterion(self):
_, server_one = self.create_server(0) server_one = self.create_server(fixture=0)
_, server_two = self.create_server(1) server_two = self.create_server(fixture=1)
criterion = dict( criterion = dict(
name=server_one['name'] name=server_one['name']
@ -365,7 +322,7 @@ class StorageTestCase(object):
def test_get_server(self): def test_get_server(self):
# Create a server # Create a server
_, expected = self.create_server() expected = self.create_server()
actual = self.storage.get_server(self.admin_context, expected['id']) actual = self.storage.get_server(self.admin_context, expected['id'])
self.assertEqual(str(actual['name']), str(expected['name'])) self.assertEqual(str(actual['name']), str(expected['name']))
@ -377,7 +334,8 @@ class StorageTestCase(object):
def test_update_server(self): def test_update_server(self):
# Create a server # Create a server
fixture, server = self.create_server() fixture = self.get_server_fixture()
server = self.create_server(**fixture)
updated = self.storage.update_server(self.admin_context, server['id'], updated = self.storage.update_server(self.admin_context, server['id'],
fixture) fixture)
@ -387,7 +345,7 @@ class StorageTestCase(object):
def test_update_server_duplicate(self): def test_update_server_duplicate(self):
# Create two servers # Create two servers
self.create_server(fixture=0) self.create_server(fixture=0)
_, server = self.create_server(fixture=1) server = self.create_server(fixture=1)
values = self.server_fixtures[0] values = self.server_fixtures[0]
@ -401,7 +359,7 @@ class StorageTestCase(object):
self.storage.update_server(self.admin_context, uuid, {}) self.storage.update_server(self.admin_context, uuid, {})
def test_delete_server(self): def test_delete_server(self):
server_fixture, server = self.create_server() server = self.create_server()
self.storage.delete_server(self.admin_context, server['id']) self.storage.delete_server(self.admin_context, server['id'])
@ -430,20 +388,20 @@ class StorageTestCase(object):
def test_create_tsigkey_duplicate(self): def test_create_tsigkey_duplicate(self):
# Create the Initial TsigKey # Create the Initial TsigKey
_, tsigkey_one = self.create_tsigkey() tsigkey_one = self.create_tsigkey()
values = self.get_tsigkey_fixture(1) values = self.get_tsigkey_fixture(1)
values['name'] = tsigkey_one['name'] values['name'] = tsigkey_one['name']
with testtools.ExpectedException(exceptions.DuplicateTsigKey): with testtools.ExpectedException(exceptions.DuplicateTsigKey):
self.create_tsigkey(values=values) self.create_tsigkey(**values)
def test_find_tsigkeys(self): def test_find_tsigkeys(self):
actual = self.storage.find_tsigkeys(self.admin_context) actual = self.storage.find_tsigkeys(self.admin_context)
self.assertEqual(actual, []) self.assertEqual(actual, [])
# Create a single tsigkey # Create a single tsigkey
_, tsig = self.create_tsigkey() tsig = self.create_tsigkey()
actual = self.storage.find_tsigkeys(self.admin_context) actual = self.storage.find_tsigkeys(self.admin_context)
self.assertEqual(len(actual), 1) self.assertEqual(len(actual), 1)
@ -454,15 +412,15 @@ class StorageTestCase(object):
# Order of found items later will be reverse of the order they are # Order of found items later will be reverse of the order they are
# created # created
created = [self.create_tsigkey(values={'name': 'tsig%s.' % i})[1] created = [self.create_tsigkey(name='tsig%s.' % i)
for i in xrange(10, 20)] for i in xrange(10, 20)]
created.insert(0, tsig) created.insert(0, tsig)
self._ensure_paging(created, self.storage.find_tsigkeys) self._ensure_paging(created, self.storage.find_tsigkeys)
def test_find_tsigkeys_criterion(self): def test_find_tsigkeys_criterion(self):
_, tsigkey_one = self.create_tsigkey(fixture=0) tsigkey_one = self.create_tsigkey(fixture=0)
_, tsigkey_two = self.create_tsigkey(fixture=1) tsigkey_two = self.create_tsigkey(fixture=1)
criterion = dict( criterion = dict(
name=tsigkey_one['name'] name=tsigkey_one['name']
@ -486,7 +444,7 @@ class StorageTestCase(object):
def test_get_tsigkey(self): def test_get_tsigkey(self):
# Create a tsigkey # Create a tsigkey
_, expected = self.create_tsigkey() expected = self.create_tsigkey()
actual = self.storage.get_tsigkey(self.admin_context, expected['id']) actual = self.storage.get_tsigkey(self.admin_context, expected['id'])
@ -501,7 +459,8 @@ class StorageTestCase(object):
def test_update_tsigkey(self): def test_update_tsigkey(self):
# Create a tsigkey # Create a tsigkey
fixture, tsigkey = self.create_tsigkey() fixture = self.get_tsigkey_fixture()
tsigkey = self.create_tsigkey(**fixture)
updated = self.storage.update_tsigkey(self.admin_context, updated = self.storage.update_tsigkey(self.admin_context,
tsigkey['id'], tsigkey['id'],
@ -514,7 +473,7 @@ class StorageTestCase(object):
def test_update_tsigkey_duplicate(self): def test_update_tsigkey_duplicate(self):
# Create two tsigkeys # Create two tsigkeys
self.create_tsigkey(fixture=0) self.create_tsigkey(fixture=0)
_, tsigkey = self.create_tsigkey(fixture=1) tsigkey = self.create_tsigkey(fixture=1)
values = self.tsigkey_fixtures[0] values = self.tsigkey_fixtures[0]
@ -528,7 +487,7 @@ class StorageTestCase(object):
self.storage.update_tsigkey(self.admin_context, uuid, {}) self.storage.update_tsigkey(self.admin_context, uuid, {})
def test_delete_tsigkey(self): def test_delete_tsigkey(self):
tsigkey_fixture, tsigkey = self.create_tsigkey() tsigkey = self.create_tsigkey()
self.storage.delete_tsigkey(self.admin_context, tsigkey['id']) self.storage.delete_tsigkey(self.admin_context, tsigkey['id'])
@ -546,9 +505,9 @@ class StorageTestCase(object):
context.all_tenants = True context.all_tenants = True
# create 3 domains in 2 tenants # create 3 domains in 2 tenants
self.create_domain(fixture=0, values={'tenant_id': 'One'}) self.create_domain(fixture=0, tenant_id='One')
_, domain = self.create_domain(fixture=1, values={'tenant_id': 'One'}) domain = self.create_domain(fixture=1, tenant_id='One')
self.create_domain(fixture=2, values={'tenant_id': 'Two'}) self.create_domain(fixture=2, tenant_id='Two')
# Delete one of the domains. # Delete one of the domains.
self.storage.delete_domain(context, domain['id']) self.storage.delete_domain(context, domain['id'])
@ -572,9 +531,9 @@ class StorageTestCase(object):
context.all_tenants = True context.all_tenants = True
# create 2 domains in a tenant # create 2 domains in a tenant
_, domain_1 = self.create_domain(fixture=0, values={'tenant_id': 1}) domain_1 = self.create_domain(fixture=0, tenant_id=1)
_, domain_2 = self.create_domain(fixture=1, values={'tenant_id': 1}) domain_2 = self.create_domain(fixture=1, tenant_id=1)
_, domain_3 = self.create_domain(fixture=2, values={'tenant_id': 1}) domain_3 = self.create_domain(fixture=2, tenant_id=1)
# Delete one of the domains. # Delete one of the domains.
self.storage.delete_domain(context, domain_3['id']) self.storage.delete_domain(context, domain_3['id'])
@ -595,9 +554,9 @@ class StorageTestCase(object):
self.assertEqual(tenants, 0) self.assertEqual(tenants, 0)
# create 2 domains with 2 tenants # create 2 domains with 2 tenants
self.create_domain(fixture=0, values={'tenant_id': 1}) self.create_domain(fixture=0, tenant_id=1)
self.create_domain(fixture=1, values={'tenant_id': 2}) self.create_domain(fixture=1, tenant_id=2)
_, domain = self.create_domain(fixture=2, values={'tenant_id': 2}) domain = self.create_domain(fixture=2, tenant_id=2)
# Delete one of the domains. # Delete one of the domains.
self.storage.delete_domain(context, domain['id']) self.storage.delete_domain(context, domain['id'])
@ -633,11 +592,13 @@ class StorageTestCase(object):
self.create_domain() self.create_domain()
def test_find_domains(self): def test_find_domains(self):
self.config(quota_domains=20)
actual = self.storage.find_domains(self.admin_context) actual = self.storage.find_domains(self.admin_context)
self.assertEqual(actual, []) self.assertEqual(actual, [])
# Create a single domain # Create a single domain
fixture_one, domain = self.create_domain() domain = self.create_domain()
actual = self.storage.find_domains(self.admin_context) actual = self.storage.find_domains(self.admin_context)
self.assertEqual(len(actual), 1) self.assertEqual(len(actual), 1)
@ -646,16 +607,16 @@ class StorageTestCase(object):
self.assertEqual(actual[0]['email'], domain['email']) self.assertEqual(actual[0]['email'], domain['email'])
# Order of found items later will be reverse of the order they are # Order of found items later will be reverse of the order they are
# created # created XXXX
created = [self.create_domain(values={'name': 'x%s.org.' % i})[1] created = [self.create_domain(name='x%s.org.' % i)
for i in xrange(10, 20)] for i in xrange(10, 20)]
created.insert(0, domain) created.insert(0, domain)
self._ensure_paging(created, self.storage.find_domains) self._ensure_paging(created, self.storage.find_domains)
def test_find_domains_criterion(self): def test_find_domains_criterion(self):
_, domain_one = self.create_domain(0) domain_one = self.create_domain()
_, domain_two = self.create_domain(1) domain_two = self.create_domain(fixture=1)
criterion = dict( criterion = dict(
name=domain_one['name'] name=domain_one['name']
@ -715,7 +676,7 @@ class StorageTestCase(object):
def test_get_domain(self): def test_get_domain(self):
# Create a domain # Create a domain
fixture, expected = self.create_domain() expected = self.create_domain()
actual = self.storage.get_domain(self.admin_context, expected['id']) actual = self.storage.get_domain(self.admin_context, expected['id'])
self.assertEqual(actual['name'], expected['name']) self.assertEqual(actual['name'], expected['name'])
@ -731,14 +692,14 @@ class StorageTestCase(object):
context = self.get_admin_context() context = self.get_admin_context()
context.show_deleted = True context.show_deleted = True
_, domain = self.create_domain(context=context) domain = self.create_domain(context=context)
self.storage.delete_domain(context, domain['id']) self.storage.delete_domain(context, domain['id'])
self.storage.get_domain(context, domain['id']) self.storage.get_domain(context, domain['id'])
def test_find_domain_criterion(self): def test_find_domain_criterion(self):
_, domain_one = self.create_domain(0) domain_one = self.create_domain()
_, domain_two = self.create_domain(1) domain_two = self.create_domain(fixture=1)
criterion = dict( criterion = dict(
name=domain_one['name'] name=domain_one['name']
@ -762,7 +723,7 @@ class StorageTestCase(object):
self.assertIn('status', domain_two) self.assertIn('status', domain_two)
def test_find_domain_criterion_missing(self): def test_find_domain_criterion_missing(self):
_, expected = self.create_domain(0) expected = self.create_domain()
criterion = dict( criterion = dict(
name=expected['name'] + "NOT FOUND" name=expected['name'] + "NOT FOUND"
@ -773,7 +734,8 @@ class StorageTestCase(object):
def test_update_domain(self): def test_update_domain(self):
# Create a domain # Create a domain
fixture, domain = self.create_domain() fixture = self.get_domain_fixture()
domain = self.create_domain(**fixture)
updated = self.storage.update_domain(self.admin_context, domain['id'], updated = self.storage.update_domain(self.admin_context, domain['id'],
fixture) fixture)
@ -784,12 +746,13 @@ class StorageTestCase(object):
def test_update_domain_duplicate(self): def test_update_domain_duplicate(self):
# Create two domains # Create two domains
fixture_one, domain_one = self.create_domain(fixture=0) fixture = self.get_domain_fixture(fixture=0)
_, domain_two = self.create_domain(fixture=1) self.create_domain(**fixture)
domain_two = self.create_domain(fixture=1)
with testtools.ExpectedException(exceptions.DuplicateDomain): with testtools.ExpectedException(exceptions.DuplicateDomain):
self.storage.update_domain(self.admin_context, domain_two['id'], self.storage.update_domain(self.admin_context, domain_two['id'],
fixture_one) fixture)
def test_update_domain_missing(self): def test_update_domain_missing(self):
with testtools.ExpectedException(exceptions.DomainNotFound): with testtools.ExpectedException(exceptions.DomainNotFound):
@ -797,7 +760,7 @@ class StorageTestCase(object):
self.storage.update_domain(self.admin_context, uuid, {}) self.storage.update_domain(self.admin_context, uuid, {})
def test_delete_domain(self): def test_delete_domain(self):
domain_fixture, domain = self.create_domain() domain = self.create_domain()
self.storage.delete_domain(self.admin_context, domain['id']) self.storage.delete_domain(self.admin_context, domain['id'])
@ -824,7 +787,7 @@ class StorageTestCase(object):
self.assertEqual(domains, 1) self.assertEqual(domains, 1)
def test_create_recordset(self): def test_create_recordset(self):
domain_fixture, domain = self.create_domain() domain = self.create_domain()
values = { values = {
'name': 'www.%s' % domain['name'], 'name': 'www.%s' % domain['name'],
@ -844,7 +807,7 @@ class StorageTestCase(object):
self.assertEqual(result['type'], values['type']) self.assertEqual(result['type'], values['type'])
def test_create_recordset_duplicate(self): def test_create_recordset_duplicate(self):
_, domain = self.create_domain() domain = self.create_domain()
# Create the First RecordSet # Create the First RecordSet
self.create_recordset(domain) self.create_recordset(domain)
@ -854,7 +817,7 @@ class StorageTestCase(object):
self.create_recordset(domain) self.create_recordset(domain)
def test_find_recordsets(self): def test_find_recordsets(self):
_, domain = self.create_domain() domain = self.create_domain()
criterion = {'domain_id': domain['id']} criterion = {'domain_id': domain['id']}
@ -862,7 +825,7 @@ class StorageTestCase(object):
self.assertEqual(actual, []) self.assertEqual(actual, [])
# Create a single recordset # Create a single recordset
_, recordset_one = self.create_recordset(domain, fixture=0) recordset_one = self.create_recordset(domain)
actual = self.storage.find_recordsets(self.admin_context, criterion) actual = self.storage.find_recordsets(self.admin_context, criterion)
self.assertEqual(len(actual), 1) self.assertEqual(len(actual), 1)
@ -873,16 +836,15 @@ class StorageTestCase(object):
# Order of found items later will be reverse of the order they are # Order of found items later will be reverse of the order they are
# created # created
created = [self.create_recordset( created = [self.create_recordset(
domain, values={'name': 'test%s' % i + '.%s'})[1] domain, name='test%s' % i + '.%s') for i in xrange(10, 20)]
for i in xrange(10, 20)]
created.insert(0, recordset_one) created.insert(0, recordset_one)
self._ensure_paging(created, self.storage.find_recordsets) self._ensure_paging(created, self.storage.find_recordsets)
def test_find_recordsets_criterion(self): def test_find_recordsets_criterion(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset_one = self.create_recordset(domain, type='A', fixture=0) recordset_one = self.create_recordset(domain, type='A', fixture=0)
self.create_recordset(domain, fixture=1) self.create_recordset(domain, fixture=1)
criterion = dict( criterion = dict(
@ -906,11 +868,11 @@ class StorageTestCase(object):
self.assertEqual(len(results), 2) self.assertEqual(len(results), 2)
def test_find_recordsets_criterion_wildcard(self): def test_find_recordsets_criterion_wildcard(self):
_, domain = self.create_domain() domain = self.create_domain()
values = {'name': 'one.%s' % domain['name']} values = {'name': 'one.%s' % domain['name']}
self.create_recordset(domain, fixture=0, values=values) self.create_recordset(domain, **values)
criterion = dict( criterion = dict(
domain_id=domain['id'], domain_id=domain['id'],
@ -922,8 +884,8 @@ class StorageTestCase(object):
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
def test_get_recordset(self): def test_get_recordset(self):
_, domain = self.create_domain() domain = self.create_domain()
_, expected = self.create_recordset(domain) expected = self.create_recordset(domain)
actual = self.storage.get_recordset(self.admin_context, expected['id']) actual = self.storage.get_recordset(self.admin_context, expected['id'])
@ -936,8 +898,8 @@ class StorageTestCase(object):
self.storage.get_recordset(self.admin_context, uuid) self.storage.get_recordset(self.admin_context, uuid)
def test_find_recordset_criterion(self): def test_find_recordset_criterion(self):
_, domain = self.create_domain(0) domain = self.create_domain()
_, expected = self.create_recordset(domain) expected = self.create_recordset(domain)
criterion = dict( criterion = dict(
domain_id=domain['id'], domain_id=domain['id'],
@ -950,8 +912,8 @@ class StorageTestCase(object):
self.assertEqual(actual['type'], expected['type']) self.assertEqual(actual['type'], expected['type'])
def test_find_recordset_criterion_missing(self): def test_find_recordset_criterion_missing(self):
_, domain = self.create_domain(0) domain = self.create_domain()
_, expected = self.create_recordset(domain) expected = self.create_recordset(domain)
criterion = dict( criterion = dict(
name=expected['name'] + "NOT FOUND" name=expected['name'] + "NOT FOUND"
@ -961,10 +923,10 @@ class StorageTestCase(object):
self.storage.find_recordset(self.admin_context, criterion) self.storage.find_recordset(self.admin_context, criterion)
def test_update_recordset(self): def test_update_recordset(self):
domain_fixture, domain = self.create_domain() domain = self.create_domain()
# Create a recordset # Create a recordset
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
# Get some different values to test the update with # Get some different values to test the update with
recordset_fixture = self.get_recordset_fixture(domain['name'], recordset_fixture = self.get_recordset_fixture(domain['name'],
@ -981,11 +943,12 @@ class StorageTestCase(object):
self.assertEqual(updated['type'], recordset_fixture['type']) self.assertEqual(updated['type'], recordset_fixture['type'])
def test_update_recordset_duplicate(self): def test_update_recordset_duplicate(self):
_, domain = self.create_domain() domain = self.create_domain()
# Create the first two recordsets # Create the first two recordsets
recordset_one_fixture, _ = self.create_recordset(domain, fixture=0) recordset_one_fixture = self.get_recordset_fixture(domain['name'])
_, recordset_two = self.create_recordset(domain, fixture=1) self.create_recordset(domain, **recordset_one_fixture)
recordset_two = self.create_recordset(domain, fixture=1)
with testtools.ExpectedException(exceptions.DuplicateRecordSet): with testtools.ExpectedException(exceptions.DuplicateRecordSet):
# Attempt to update the second recordset, making it a duplicate # Attempt to update the second recordset, making it a duplicate
@ -1000,10 +963,10 @@ class StorageTestCase(object):
self.storage.update_recordset(self.admin_context, uuid, {}) self.storage.update_recordset(self.admin_context, uuid, {})
def test_delete_recordset(self): def test_delete_recordset(self):
_, domain = self.create_domain() domain = self.create_domain()
# Create a recordset # Create a recordset
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
self.storage.delete_recordset(self.admin_context, recordset['id']) self.storage.delete_recordset(self.admin_context, recordset['id'])
@ -1021,7 +984,7 @@ class StorageTestCase(object):
self.assertEqual(recordsets, 0) self.assertEqual(recordsets, 0)
# Create a single domain & recordset # Create a single domain & recordset
_, domain = self.create_domain() domain = self.create_domain()
self.create_recordset(domain) self.create_recordset(domain)
# we should have 1 recordsets now # we should have 1 recordsets now
@ -1029,8 +992,8 @@ class StorageTestCase(object):
self.assertEqual(recordsets, 1) self.assertEqual(recordsets, 1)
def test_create_record(self): def test_create_record(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain, type='A') recordset = self.create_recordset(domain, type='A')
values = { values = {
'data': '192.0.2.1', 'data': '192.0.2.1',
@ -1051,8 +1014,8 @@ class StorageTestCase(object):
self.assertIn('status', result) self.assertIn('status', result)
def test_create_record_duplicate(self): def test_create_record_duplicate(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
# Create the First Record # Create the First Record
self.create_record(domain, recordset) self.create_record(domain, recordset)
@ -1062,8 +1025,8 @@ class StorageTestCase(object):
self.create_record(domain, recordset) self.create_record(domain, recordset)
def test_find_records(self): def test_find_records(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
criterion = { criterion = {
'domain_id': domain['id'], 'domain_id': domain['id'],
@ -1074,7 +1037,7 @@ class StorageTestCase(object):
self.assertEqual(actual, []) self.assertEqual(actual, [])
# Create a single record # Create a single record
_, record = self.create_record(domain, recordset, fixture=0) record = self.create_record(domain, recordset)
actual = self.storage.find_records(self.admin_context, criterion) actual = self.storage.find_records(self.admin_context, criterion)
self.assertEqual(len(actual), 1) self.assertEqual(len(actual), 1)
@ -1085,18 +1048,17 @@ class StorageTestCase(object):
# Order of found items later will be reverse of the order they are # Order of found items later will be reverse of the order they are
# created # created
created = [self.create_record( created = [self.create_record(
domain, recordset, domain, recordset, data='192.0.0.%s' % i)
values={'data': '192.0.0.%s' % i})[1]
for i in xrange(10, 20)] for i in xrange(10, 20)]
created.insert(0, record) created.insert(0, record)
self._ensure_paging(created, self.storage.find_records) self._ensure_paging(created, self.storage.find_records)
def test_find_records_criterion(self): def test_find_records_criterion(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain, type='A') recordset = self.create_recordset(domain, type='A')
_, record_one = self.create_record(domain, recordset, fixture=0) record_one = self.create_record(domain, recordset)
self.create_record(domain, recordset, fixture=1) self.create_record(domain, recordset, fixture=1)
criterion = dict( criterion = dict(
@ -1118,12 +1080,12 @@ class StorageTestCase(object):
self.assertEqual(len(results), 2) self.assertEqual(len(results), 2)
def test_find_records_criterion_wildcard(self): def test_find_records_criterion_wildcard(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain, type='A') recordset = self.create_recordset(domain, type='A')
values = {'data': '127.0.0.1'} values = {'data': '127.0.0.1'}
self.create_record(domain, recordset, fixture=0, values=values) self.create_record(domain, recordset, **values)
criterion = dict( criterion = dict(
domain_id=domain['id'], domain_id=domain['id'],
@ -1148,18 +1110,16 @@ class StorageTestCase(object):
at_context.all_tenants = True at_context.all_tenants = True
# Create two domains in different tenants, and 1 record in each # Create two domains in different tenants, and 1 record in each
_, domain_one = self.create_domain(fixture=0, context=one_context) domain_one = self.create_domain(fixture=0, context=one_context)
_, recordset_one = self.create_recordset(domain_one, fixture=0, recordset_one = self.create_recordset(domain_one, fixture=0,
context=one_context)
self.create_record(domain_one, recordset_one, fixture=0,
context=one_context) context=one_context)
self.create_record(domain_one, recordset_one, context=one_context)
_, domain_two = self.create_domain(fixture=1, context=two_context) domain_two = self.create_domain(fixture=1, context=two_context)
_, recordset_one = self.create_recordset(domain_two, fixture=1, recordset_one = self.create_recordset(domain_two, fixture=1,
context=two_context) context=two_context)
self.create_record(domain_two, recordset_one, fixture=0, self.create_record(domain_two, recordset_one, context=two_context)
context=two_context)
# Ensure the all_tenants context see's two records # Ensure the all_tenants context see's two records
results = self.storage.find_records(at_context) results = self.storage.find_records(at_context)
@ -1178,10 +1138,10 @@ class StorageTestCase(object):
self.assertEqual(len(results), 1) self.assertEqual(len(results), 1)
def test_get_record(self): def test_get_record(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
_, expected = self.create_record(domain, recordset) expected = self.create_record(domain, recordset)
actual = self.storage.get_record(self.admin_context, expected['id']) actual = self.storage.get_record(self.admin_context, expected['id'])
@ -1194,10 +1154,10 @@ class StorageTestCase(object):
self.storage.get_record(self.admin_context, uuid) self.storage.get_record(self.admin_context, uuid)
def test_find_record_criterion(self): def test_find_record_criterion(self):
_, domain = self.create_domain(0) domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
_, expected = self.create_record(domain, recordset) expected = self.create_record(domain, recordset)
criterion = dict( criterion = dict(
domain_id=domain['id'], domain_id=domain['id'],
@ -1211,10 +1171,10 @@ class StorageTestCase(object):
self.assertIn('status', actual) self.assertIn('status', actual)
def test_find_record_criterion_missing(self): def test_find_record_criterion_missing(self):
_, domain = self.create_domain(0) domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
_, expected = self.create_record(domain, recordset) expected = self.create_record(domain, recordset)
criterion = dict( criterion = dict(
domain_id=domain['id'], domain_id=domain['id'],
@ -1225,11 +1185,11 @@ class StorageTestCase(object):
self.storage.find_record(self.admin_context, criterion) self.storage.find_record(self.admin_context, criterion)
def test_update_record(self): def test_update_record(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
# Create a record # Create a record
_, record = self.create_record(domain, recordset) record = self.create_record(domain, recordset)
# Get some different values to test the update with # Get some different values to test the update with
record_fixture = self.get_record_fixture(recordset['type'], fixture=1) record_fixture = self.get_record_fixture(recordset['type'], fixture=1)
@ -1245,13 +1205,13 @@ class StorageTestCase(object):
self.assertIn('status', updated) self.assertIn('status', updated)
def test_update_record_duplicate(self): def test_update_record_duplicate(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
# Create the first two records # Create the first two records
record_one_fixture, _ = self.create_record(domain, recordset, record_one_fixture = self.get_record_fixture(recordset['type'])
fixture=0) self.create_record(domain, recordset, **record_one_fixture)
_, record_two = self.create_record(domain, recordset, fixture=1) record_two = self.create_record(domain, recordset, fixture=1)
with testtools.ExpectedException(exceptions.DuplicateRecord): with testtools.ExpectedException(exceptions.DuplicateRecord):
# Attempt to update the second record, making it a duplicate record # Attempt to update the second record, making it a duplicate record
@ -1264,11 +1224,11 @@ class StorageTestCase(object):
self.storage.update_record(self.admin_context, uuid, {}) self.storage.update_record(self.admin_context, uuid, {})
def test_delete_record(self): def test_delete_record(self):
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
# Create a record # Create a record
_, record = self.create_record(domain, recordset) record = self.create_record(domain, recordset)
self.storage.delete_record(self.admin_context, record['id']) self.storage.delete_record(self.admin_context, record['id'])
@ -1286,8 +1246,8 @@ class StorageTestCase(object):
self.assertEqual(records, 0) self.assertEqual(records, 0)
# Create a single domain & record # Create a single domain & record
_, domain = self.create_domain() domain = self.create_domain()
_, recordset = self.create_recordset(domain) recordset = self.create_recordset(domain)
self.create_record(domain, recordset) self.create_record(domain, recordset)
# we should have 1 record now # we should have 1 record now