Merge "Update tests to use Object.from_dict() method"

This commit is contained in:
Jenkins 2015-03-05 16:58:38 +00:00 committed by Gerrit Code Review
commit 3bfb6904db
9 changed files with 118 additions and 142 deletions

View File

@ -49,7 +49,7 @@ cfg.CONF.import_opt('cache_driver', 'designate.pool_manager',
cfg.CONF.import_opt('connection', cfg.CONF.import_opt('connection',
'designate.pool_manager.cache.impl_sqlalchemy', 'designate.pool_manager.cache.impl_sqlalchemy',
group='pool_manager_cache:sqlalchemy') group='pool_manager_cache:sqlalchemy')
pool_id = cfg.CONF['service:central'].default_pool_id default_pool_id = cfg.CONF['service:central'].default_pool_id
class TestCase(base.BaseTestCase): class TestCase(base.BaseTestCase):
@ -163,19 +163,19 @@ class TestCase(base.BaseTestCase):
}] }]
pool_attributes_fixtures = [ pool_attributes_fixtures = [
{'pool_id': pool_id, {'pool_id': default_pool_id,
'key': 'name_server', 'key': 'name_server',
'value': 'ns1.example.com.'}, 'value': 'ns1.example.com.'},
{'pool_id': pool_id, {'pool_id': default_pool_id,
'key': 'scope', 'key': 'scope',
'value': 'public'} 'value': 'public'}
] ]
pool_attribute_nameserver_fixtures = [ pool_attribute_nameserver_fixtures = [
{'pool_id': pool_id, {'pool_id': default_pool_id,
'key': 'name_server', 'key': 'name_server',
'value': 'ns1.example.org'}, 'value': 'ns1.example.org'},
{'pool_id': pool_id, {'pool_id': default_pool_id,
'key': 'name_server', 'key': 'name_server',
'value': 'ns2.example.org'}, 'value': 'ns2.example.org'},
] ]
@ -193,10 +193,15 @@ class TestCase(base.BaseTestCase):
}] }]
pool_fixtures = [ pool_fixtures = [
{'name': 'test1', {'name': 'Pool-One',
'description': 'default description1'}, 'description': 'Pool-One description',
{'name': 'test2', 'attributes': [{'key': 'scope', 'value': 'public'}],
'description': 'default description2'} 'nameservers': [{'key': 'name_server', 'value': 'ns1.example.org.'}]},
{'name': 'Pool-Two',
'description': 'Pool-Two description',
'attributes': [{'key': 'scope', 'value': 'public'}],
'nameservers': [{'key': 'name_server', 'value': 'ns1.example.org.'}]},
] ]
pool_attribute_fixtures = [ pool_attribute_fixtures = [
@ -449,18 +454,6 @@ class TestCase(base.BaseTestCase):
_values = copy.copy(self.pool_fixtures[fixture]) _values = copy.copy(self.pool_fixtures[fixture])
_values.update(values) _values.update(values)
_attribute_values = self.get_pool_attribute_fixture(
fixture=fixture, values=None)
_values['attributes'] = objects.PoolAttributeList(
objects=[objects.PoolAttribute(key=r, value=_attribute_values[r])
for r in _attribute_values])
_nameserver_values = self.get_nameserver_fixture(
fixture=fixture, values=None)
_values['nameservers'] = objects.NameServerList(
objects=[objects.NameServer(key='name_server', value=r)
for r in _nameserver_values])
return _values return _values
def get_pool_attribute_fixture(self, fixture=0, values=None): def get_pool_attribute_fixture(self, fixture=0, values=None):
@ -498,10 +491,11 @@ class TestCase(base.BaseTestCase):
values = self.get_pool_attribute_nameserver_fixtures(fixture=fixture, values = self.get_pool_attribute_nameserver_fixtures(fixture=fixture,
values=kwargs) values=kwargs)
nameserver = objects.PoolAttribute(**values) nameserver = objects.PoolAttribute.from_dict(values)
# Get the default pool # Get the default pool
pool = self.central_service.get_pool(self.admin_context, pool_id) pool = self.central_service.get_pool(
self.admin_context, default_pool_id)
# Add the new PoolAttribute to the pool as a nameserver # Add the new PoolAttribute to the pool as a nameserver
pool.nameservers.append(nameserver) pool.nameservers.append(nameserver)
@ -522,16 +516,18 @@ class TestCase(base.BaseTestCase):
fixture = kwargs.pop('fixture', 0) fixture = kwargs.pop('fixture', 0)
values = self.get_tld_fixture(fixture=fixture, values=kwargs) values = self.get_tld_fixture(fixture=fixture, values=kwargs)
tld = objects.Tld(**values)
return self.central_service.create_tld(context, tld=tld) return self.central_service.create_tld(
context, objects.Tld.from_dict(values))
def create_default_tld(self, **kwargs): def create_default_tld(self, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0) fixture = kwargs.pop('fixture', 0)
values = self.get_default_tld_fixture(fixture=fixture, values=kwargs) values = self.get_default_tld_fixture(fixture=fixture, values=kwargs)
tld = objects.Tld(**values)
return self.central_service.create_tld(context, tld=tld) return self.central_service.create_tld(
context, objects.Tld.from_dict(values))
def create_default_tlds(self): def create_default_tlds(self):
for index in range(len(self.default_tld_fixtures)): for index in range(len(self.default_tld_fixtures)):
@ -545,8 +541,9 @@ class TestCase(base.BaseTestCase):
fixture = kwargs.pop('fixture', 0) fixture = kwargs.pop('fixture', 0)
values = self.get_tsigkey_fixture(fixture=fixture, values=kwargs) values = self.get_tsigkey_fixture(fixture=fixture, values=kwargs)
return self.central_service.create_tsigkey( return self.central_service.create_tsigkey(
context, objects.TsigKey(**values)) context, objects.TsigKey.from_dict(values))
def create_domain(self, **kwargs): def create_domain(self, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
@ -556,7 +553,7 @@ class TestCase(base.BaseTestCase):
# We always need a server to create a server # We always need a server to create a server
nameservers = self.storage.find_pool_attributes( nameservers = self.storage.find_pool_attributes(
context=self.admin_context, context=self.admin_context,
criterion={'pool_id': pool_id, criterion={'pool_id': default_pool_id,
'key': 'name_server'} 'key': 'name_server'}
) )
if len(nameservers) == 0: if len(nameservers) == 0:
@ -570,7 +567,7 @@ class TestCase(base.BaseTestCase):
values['tenant_id'] = context.tenant values['tenant_id'] = context.tenant
return self.central_service.create_domain( return self.central_service.create_domain(
context, objects.Domain(**values)) context, objects.Domain.from_dict(values))
def create_recordset(self, domain, type='A', **kwargs): def create_recordset(self, domain, type='A', **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
@ -579,8 +576,9 @@ class TestCase(base.BaseTestCase):
values = self.get_recordset_fixture(domain['name'], type=type, values = self.get_recordset_fixture(domain['name'], type=type,
fixture=fixture, fixture=fixture,
values=kwargs) values=kwargs)
return self.central_service.create_recordset( return self.central_service.create_recordset(
context, domain['id'], recordset=objects.RecordSet(**values)) context, domain['id'], objects.RecordSet.from_dict(values))
def create_record(self, domain, recordset, **kwargs): def create_record(self, domain, recordset, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
@ -588,20 +586,19 @@ class TestCase(base.BaseTestCase):
values = self.get_record_fixture(recordset['type'], fixture=fixture, values = self.get_record_fixture(recordset['type'], fixture=fixture,
values=kwargs) values=kwargs)
return self.central_service.create_record( return self.central_service.create_record(
context, context, domain['id'], recordset['id'],
domain['id'], objects.Record.from_dict(values))
recordset['id'],
record=objects.Record(**values))
def create_blacklist(self, **kwargs): def create_blacklist(self, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
fixture = kwargs.pop('fixture', 0) fixture = kwargs.pop('fixture', 0)
values = self.get_blacklist_fixture(fixture=fixture, values=kwargs) values = self.get_blacklist_fixture(fixture=fixture, values=kwargs)
blacklist = objects.Blacklist(**values)
return self.central_service.create_blacklist( return self.central_service.create_blacklist(
context, blacklist=blacklist) context, objects.Blacklist.from_dict(values))
def create_pool(self, **kwargs): def create_pool(self, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
@ -613,7 +610,7 @@ class TestCase(base.BaseTestCase):
values['tenant_id'] = context.tenant values['tenant_id'] = context.tenant
return self.central_service.create_pool( return self.central_service.create_pool(
context, objects.Pool(**values)) context, objects.Pool.from_dict(values))
def create_zone_transfer_request(self, domain, **kwargs): def create_zone_transfer_request(self, domain, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
@ -625,10 +622,8 @@ class TestCase(base.BaseTestCase):
if 'domain_id' not in values: if 'domain_id' not in values:
values['domain_id'] = domain.id values['domain_id'] = domain.id
zone_transfer_request = objects.ZoneTransferRequest(**values)
return self.central_service.create_zone_transfer_request( return self.central_service.create_zone_transfer_request(
context, zone_transfer_request=zone_transfer_request) context, objects.ZoneTransferRequest.from_dict(values))
def create_zone_transfer_accept(self, zone_transfer_request, **kwargs): def create_zone_transfer_accept(self, zone_transfer_request, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
@ -647,10 +642,8 @@ class TestCase(base.BaseTestCase):
if 'key' not in values: if 'key' not in values:
values['key'] = zone_transfer_request.key values['key'] = zone_transfer_request.key
zone_transfer_accept = objects.ZoneTransferAccept(**values)
return self.central_service.create_zone_transfer_accept( return self.central_service.create_zone_transfer_accept(
context, zone_transfer_accept) context, objects.ZoneTransferAccept.from_dict(values))
def create_pool_attribute(self, **kwargs): def create_pool_attribute(self, **kwargs):
context = kwargs.pop('context', self.admin_context) context = kwargs.pop('context', self.admin_context)
@ -658,12 +651,11 @@ class TestCase(base.BaseTestCase):
values = self.get_pool_attributes_fixture(fixture=fixture, values = self.get_pool_attributes_fixture(fixture=fixture,
values=kwargs) values=kwargs)
pool_attribute = objects.PoolAttribute(**values)
# TODO(kiall): We shouldn't be assuming the default_pool_id here
return self.storage.create_pool_attribute( return self.storage.create_pool_attribute(
context, context, default_pool_id,
pool_attribute.pool_id, objects.PoolAttribute.from_dict(values))
pool_attribute
)
def _ensure_interface(self, interface, implementation): def _ensure_interface(self, interface, implementation):
for name in interface.__abstractmethods__: for name in interface.__abstractmethods__:

View File

@ -129,7 +129,7 @@ class ApiV1ServersTest(ApiV1Test):
'value': fixture['name'], 'value': fixture['name'],
'id': '2fdadfb1-cf96-4259-ac6b-bb7b6d2ff980' 'id': '2fdadfb1-cf96-4259-ac6b-bb7b6d2ff980'
} }
nameserver = objects.PoolAttribute(**values) nameserver = objects.PoolAttribute.from_dict(values)
self.get('servers/%s' % nameserver['id'], status_code=504) self.get('servers/%s' % nameserver['id'], status_code=504)

View File

@ -307,7 +307,7 @@ class CentralServiceTest(CentralTestCase):
# Create a tsigkey # Create a tsigkey
tsigkey = self.central_service.create_tsigkey( tsigkey = self.central_service.create_tsigkey(
self.admin_context, tsigkey=objects.TsigKey(**values)) self.admin_context, tsigkey=objects.TsigKey.from_dict(values))
# Ensure all values have been set correctly # Ensure all values have been set correctly
self.assertIsNotNone(tsigkey['id']) self.assertIsNotNone(tsigkey['id'])
@ -414,7 +414,7 @@ class CentralServiceTest(CentralTestCase):
# Create a domain # Create a domain
domain = self.central_service.create_domain( domain = self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, domain=objects.Domain.from_dict(values))
# Ensure all values have been set correctly # Ensure all values have been set correctly
self.assertIsNotNone(domain['id']) self.assertIsNotNone(domain['id'])
@ -452,9 +452,9 @@ class CentralServiceTest(CentralTestCase):
name='xn--3e0b707e' name='xn--3e0b707e'
) )
tld = objects.Tld(**values)
# Create the appropriate TLD # Create the appropriate TLD
self.central_service.create_tld(self.admin_context, tld=tld) self.central_service.create_tld(
self.admin_context, objects.Tld.from_dict(values))
# Test creation of a domain in 한국 (kr) # Test creation of a domain in 한국 (kr)
values = dict( values = dict(
@ -481,7 +481,7 @@ class CentralServiceTest(CentralTestCase):
# Create the subdomain # Create the subdomain
domain = self.central_service.create_domain( domain = self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, objects.Domain.from_dict(values))
# Ensure all values have been set correctly # Ensure all values have been set correctly
self.assertIsNotNone(domain['id']) self.assertIsNotNone(domain['id'])
@ -504,8 +504,7 @@ class CentralServiceTest(CentralTestCase):
# Create the Parent Domain using fixture 1 # Create the Parent Domain using fixture 1
parent_domain = self.central_service.create_domain( parent_domain = self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**domain_values) self.admin_context, objects.Domain.from_dict(domain_values))
)
# Get updated subdomain values # Get updated subdomain values
subdomain = self.central_service.get_domain(self.admin_context, subdomain = self.central_service.get_domain(self.admin_context,
@ -536,7 +535,7 @@ class CentralServiceTest(CentralTestCase):
# Attempt to create the subdomain # Attempt to create the subdomain
with testtools.ExpectedException(exceptions.Forbidden): with testtools.ExpectedException(exceptions.Forbidden):
self.central_service.create_domain( self.central_service.create_domain(
context, domain=objects.Domain(**values)) context, objects.Domain.from_dict(values))
def test_create_superdomain_failure(self): def test_create_superdomain_failure(self):
context = self.get_admin_context() context = self.get_admin_context()
@ -563,7 +562,7 @@ class CentralServiceTest(CentralTestCase):
# Attempt to create the domain # Attempt to create the domain
with testtools.ExpectedException(exceptions.Forbidden): with testtools.ExpectedException(exceptions.Forbidden):
self.central_service.create_domain( self.central_service.create_domain(
context, domain=objects.Domain(**domain_values)) context, objects.Domain.from_dict(domain_values))
def test_create_blacklisted_domain_success(self): def test_create_blacklisted_domain_success(self):
# Create blacklisted zone using default values # Create blacklisted zone using default values
@ -582,7 +581,7 @@ class CentralServiceTest(CentralTestCase):
# Create a zone that is blacklisted # Create a zone that is blacklisted
domain = self.central_service.create_domain( domain = self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, objects.Domain.from_dict(values))
# Ensure all values have been set correctly # Ensure all values have been set correctly
self.assertIsNotNone(domain['id']) self.assertIsNotNone(domain['id'])
@ -603,14 +602,14 @@ class CentralServiceTest(CentralTestCase):
with testtools.ExpectedException(exceptions.InvalidDomainName): with testtools.ExpectedException(exceptions.InvalidDomainName):
# Create a domain # Create a domain
self.central_service.create_domain( self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, objects.Domain.from_dict(values))
def _test_create_domain_fail(self, values, exception): def _test_create_domain_fail(self, values, exception):
with testtools.ExpectedException(exception): with testtools.ExpectedException(exception):
# Create an invalid domain # Create an invalid domain
self.central_service.create_domain( self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, objects.Domain.from_dict(values))
def test_create_domain_invalid_tld_fail(self): def test_create_domain_invalid_tld_fail(self):
# Create a server # Create a server
@ -626,7 +625,7 @@ class CentralServiceTest(CentralTestCase):
# Create a valid domain # Create a valid domain
self.central_service.create_domain( self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, objects.Domain.from_dict(values))
values = dict( values = dict(
name='example.net.', name='example.net.',
@ -637,7 +636,7 @@ class CentralServiceTest(CentralTestCase):
with testtools.ExpectedException(exceptions.InvalidDomainName): with testtools.ExpectedException(exceptions.InvalidDomainName):
# Create an invalid domain # Create an invalid domain
self.central_service.create_domain( self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, objects.Domain.from_dict(values))
def test_create_domain_invalid_ttl_fail(self): def test_create_domain_invalid_ttl_fail(self):
self.policy({'use_low_ttl': '!'}) self.policy({'use_low_ttl': '!'})
@ -653,7 +652,7 @@ class CentralServiceTest(CentralTestCase):
with testtools.ExpectedException(exceptions.InvalidTTL): with testtools.ExpectedException(exceptions.InvalidTTL):
self.central_service.create_domain( self.central_service.create_domain(
context, domain=objects.Domain(**values)) context, objects.Domain.from_dict(values))
def test_create_domain_no_min_ttl(self): def test_create_domain_no_min_ttl(self):
self.policy({'use_low_ttl': '!'}) self.policy({'use_low_ttl': '!'})
@ -667,7 +666,7 @@ class CentralServiceTest(CentralTestCase):
# Create domain with random TTL # Create domain with random TTL
domain = self.central_service.create_domain( domain = self.central_service.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, objects.Domain.from_dict(values))
# Ensure all values have been set correctly # Ensure all values have been set correctly
self.assertEqual(domain['ttl'], values['ttl']) self.assertEqual(domain['ttl'], values['ttl'])
@ -1066,7 +1065,7 @@ class CentralServiceTest(CentralTestCase):
self.central_service.create_recordset( self.central_service.create_recordset(
self.admin_context, self.admin_context,
domain['id'], domain['id'],
recordset=objects.RecordSet(**values)) recordset=objects.RecordSet.from_dict(values))
def test_create_invalid_recordset_location_cname_sharing(self): def test_create_invalid_recordset_location_cname_sharing(self):
domain = self.create_domain() domain = self.create_domain()
@ -1082,7 +1081,7 @@ class CentralServiceTest(CentralTestCase):
self.central_service.create_recordset( self.central_service.create_recordset(
self.admin_context, self.admin_context,
domain['id'], domain['id'],
recordset=objects.RecordSet(**values)) recordset=objects.RecordSet.from_dict(values))
def test_create_invalid_recordset_location_wrong_domain(self): def test_create_invalid_recordset_location_wrong_domain(self):
domain = self.create_domain() domain = self.create_domain()
@ -1098,7 +1097,7 @@ class CentralServiceTest(CentralTestCase):
self.central_service.create_recordset( self.central_service.create_recordset(
self.admin_context, self.admin_context,
domain['id'], domain['id'],
recordset=objects.RecordSet(**values)) recordset=objects.RecordSet.from_dict(values))
def test_create_invalid_recordset_ttl(self): def test_create_invalid_recordset_ttl(self):
self.policy({'use_low_ttl': '!'}) self.policy({'use_low_ttl': '!'})
@ -1117,7 +1116,7 @@ class CentralServiceTest(CentralTestCase):
self.central_service.create_recordset( self.central_service.create_recordset(
self.admin_context, self.admin_context,
domain['id'], domain['id'],
recordset=objects.RecordSet(**values)) recordset=objects.RecordSet.from_dict(values))
def test_create_recordset_no_min_ttl(self): def test_create_recordset_no_min_ttl(self):
self.policy({'use_low_ttl': '!'}) self.policy({'use_low_ttl': '!'})
@ -1134,7 +1133,7 @@ class CentralServiceTest(CentralTestCase):
recordset = self.central_service.create_recordset( recordset = self.central_service.create_recordset(
self.admin_context, self.admin_context,
domain['id'], domain['id'],
recordset=objects.RecordSet(**values)) recordset=objects.RecordSet.from_dict(values))
self.assertEqual(recordset['ttl'], values['ttl']) self.assertEqual(recordset['ttl'], values['ttl'])
def test_get_recordset(self): def test_get_recordset(self):
@ -1567,10 +1566,8 @@ class CentralServiceTest(CentralTestCase):
# Create a record # Create a record
record = self.central_service.create_record( record = self.central_service.create_record(
self.admin_context, self.admin_context, domain['id'], recordset['id'],
domain['id'], objects.Record.from_dict(values))
recordset['id'],
record=objects.Record(**values))
# Ensure all values have been set correctly # Ensure all values have been set correctly
self.assertIsNotNone(record['id']) self.assertIsNotNone(record['id'])
@ -1599,10 +1596,8 @@ class CentralServiceTest(CentralTestCase):
# Create a record # Create a record
self.central_service.create_record( self.central_service.create_record(
self.admin_context, self.admin_context, domain['id'], recordset['id'],
domain['id'], objects.Record.from_dict(values),
recordset['id'],
record=objects.Record(**values),
increment_serial=False) increment_serial=False)
# Ensure the domains serial number was not updated # Ensure the domains serial number was not updated
@ -2448,8 +2443,7 @@ class CentralServiceTest(CentralTestCase):
values = self.get_pool_fixture(fixture=0) values = self.get_pool_fixture(fixture=0)
# Create the pool using the values # Create the pool using the values
pool = self.central_service.create_pool( pool = self.central_service.create_pool(
context=self.admin_context, self.admin_context, objects.Pool.from_dict(values))
pool=objects.Pool(**values))
# Verify that all the values were set correctly # Verify that all the values were set correctly
self.assertIsNotNone(pool['id']) self.assertIsNotNone(pool['id'])
@ -2465,19 +2459,14 @@ class CentralServiceTest(CentralTestCase):
# Compare the actual values of attributes and nameservers # Compare the actual values of attributes and nameservers
for k in range(0, len(values['attributes'])): for k in range(0, len(values['attributes'])):
self.assertDictContainsSubset( self.assertDictContainsSubset(
values['attributes'][k].to_primitive() values['attributes'][k],
['designate_object.data'], pool['attributes'][k].to_primitive()['designate_object.data']
pool['attributes'][k].to_primitive()
['designate_object.data']
) )
for k in range(0, len(values['nameservers'])): for k in range(0, len(values['nameservers'])):
self.assertDictContainsSubset( self.assertDictContainsSubset(
values['nameservers'][k].to_primitive() values['nameservers'][k],
['designate_object.data'], pool['nameservers'][k].to_primitive()['designate_object.data'])
pool['nameservers'][k].to_primitive()
['designate_object.data']
)
def test_get_pool(self): def test_get_pool(self):
# Create a server pool # Create a server pool
@ -2523,15 +2512,13 @@ class CentralServiceTest(CentralTestCase):
self.assertEqual(pools[1]['name'], values['name']) self.assertEqual(pools[1]['name'], values['name'])
# Compare the actual values of attributes and nameservers # Compare the actual values of attributes and nameservers
expected_attributes = \ expected_attributes = values['attributes'][0]
values['attributes'][0].to_primitive()['designate_object.data']
actual_attributes = \ actual_attributes = \
pools[1]['attributes'][0].to_primitive()['designate_object.data'] pools[1]['attributes'][0].to_primitive()['designate_object.data']
for k in expected_attributes: for k in expected_attributes:
self.assertEqual(actual_attributes[k], expected_attributes[k]) self.assertEqual(actual_attributes[k], expected_attributes[k])
expected_nameservers = \ expected_nameservers = values['nameservers'][0]
values['nameservers'][0].to_primitive()['designate_object.data']
actual_nameservers = \ actual_nameservers = \
pools[1]['nameservers'][0].to_primitive()['designate_object.data'] pools[1]['nameservers'][0].to_primitive()['designate_object.data']
for k in expected_nameservers: for k in expected_nameservers:

View File

@ -42,7 +42,7 @@ class MdnsNotifyTest(MdnsTestCase):
'port': 65255, 'port': 65255,
'backend': 'fake' 'backend': 'fake'
} }
self.server = objects.PoolServer(**server_values) self.server = objects.PoolServer.from_dict(server_values)
def test_send_notify_message(self): def test_send_notify_message(self):
# id 10001 # id 10001
@ -60,8 +60,8 @@ class MdnsNotifyTest(MdnsTestCase):
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire( with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(expected_notify_response))): binascii.a2b_hex(expected_notify_response))):
response, retry = self.notify.notify_zone_changed( response, retry = self.notify.notify_zone_changed(
context, objects.Domain(**self.test_domain), self.server, context, objects.Domain.from_dict(self.test_domain),
0, 0, 2, 0) self.server, 0, 0, 2, 0)
self.assertEqual(response, dns.message.from_wire( self.assertEqual(response, dns.message.from_wire(
binascii.a2b_hex(expected_notify_response))) binascii.a2b_hex(expected_notify_response)))
self.assertEqual(retry, 1) self.assertEqual(retry, 1)
@ -82,8 +82,8 @@ class MdnsNotifyTest(MdnsTestCase):
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire( with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(non_auth_notify_response))): binascii.a2b_hex(non_auth_notify_response))):
response, retry = self.notify.notify_zone_changed( response, retry = self.notify.notify_zone_changed(
context, objects.Domain(**self.test_domain), self.server, context, objects.Domain.from_dict(self.test_domain),
0, 0, 2, 0) self.server, 0, 0, 2, 0)
self.assertEqual(response, None) self.assertEqual(response, None)
self.assertEqual(retry, 1) self.assertEqual(retry, 1)
@ -91,8 +91,8 @@ class MdnsNotifyTest(MdnsTestCase):
def test_send_notify_message_timeout(self, _): def test_send_notify_message_timeout(self, _):
context = self.get_context() context = self.get_context()
response, retry = self.notify.notify_zone_changed( response, retry = self.notify.notify_zone_changed(
context, objects.Domain(**self.test_domain), self.server, 0, context, objects.Domain.from_dict(self.test_domain), self.server,
0, 2, 0) 0, 0, 2, 0)
self.assertEqual(response, None) self.assertEqual(response, None)
self.assertEqual(retry, 2) self.assertEqual(retry, 2)
@ -100,8 +100,8 @@ class MdnsNotifyTest(MdnsTestCase):
def test_send_notify_message_bad_response(self, _): def test_send_notify_message_bad_response(self, _):
context = self.get_context() context = self.get_context()
response, retry = self.notify.notify_zone_changed( response, retry = self.notify.notify_zone_changed(
context, objects.Domain(**self.test_domain), self.server, 0, context, objects.Domain.from_dict(self.test_domain), self.server,
0, 2, 0) 0, 0, 2, 0)
self.assertEqual(response, None) self.assertEqual(response, None)
self.assertEqual(retry, 1) self.assertEqual(retry, 1)
@ -125,8 +125,8 @@ class MdnsNotifyTest(MdnsTestCase):
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire( with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(poll_response))): binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number( status, serial, retries = self.notify.get_serial_number(
context, objects.Domain(**self.test_domain), self.server, context, objects.Domain.from_dict(self.test_domain),
0, 0, 2, 0) self.server, 0, 0, 2, 0)
self.assertEqual(status, 'SUCCESS') self.assertEqual(status, 'SUCCESS')
self.assertEqual(serial, self.test_domain['serial']) self.assertEqual(serial, self.test_domain['serial'])
self.assertEqual(retries, 2) self.assertEqual(retries, 2)
@ -151,8 +151,8 @@ class MdnsNotifyTest(MdnsTestCase):
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire( with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(poll_response))): binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number( status, serial, retries = self.notify.get_serial_number(
context, objects.Domain(**self.test_domain), self.server, context, objects.Domain.from_dict(self.test_domain),
0, 0, 2, 0) self.server, 0, 0, 2, 0)
self.assertEqual(status, 'ERROR') self.assertEqual(status, 'ERROR')
self.assertEqual(serial, 99) self.assertEqual(serial, 99)
self.assertEqual(retries, 0) self.assertEqual(retries, 0)
@ -177,8 +177,8 @@ class MdnsNotifyTest(MdnsTestCase):
with patch.object(dns.query, 'udp', return_value=dns.message.from_wire( with patch.object(dns.query, 'udp', return_value=dns.message.from_wire(
binascii.a2b_hex(poll_response))): binascii.a2b_hex(poll_response))):
status, serial, retries = self.notify.get_serial_number( status, serial, retries = self.notify.get_serial_number(
context, objects.Domain(**self.test_domain), self.server, context, objects.Domain.from_dict(self.test_domain),
0, 0, 2, 0) self.server, 0, 0, 2, 0)
self.assertEqual(status, 'SUCCESS') self.assertEqual(status, 'SUCCESS')
self.assertEqual(serial, 101) self.assertEqual(serial, 101)
self.assertEqual(retries, 2) self.assertEqual(retries, 2)
@ -187,7 +187,7 @@ class MdnsNotifyTest(MdnsTestCase):
def test_poll_for_serial_number_timeout(self, _): def test_poll_for_serial_number_timeout(self, _):
context = self.get_context() context = self.get_context()
status, serial, retries = self.notify.get_serial_number( status, serial, retries = self.notify.get_serial_number(
context, objects.Domain(**self.test_domain), self.server, context, objects.Domain.from_dict(self.test_domain), self.server,
0, 0, 2, 0) 0, 0, 2, 0)
self.assertEqual(status, 'ERROR') self.assertEqual(status, 'ERROR')
self.assertEqual(serial, None) self.assertEqual(serial, None)
@ -201,7 +201,7 @@ class MdnsNotifyTest(MdnsTestCase):
group='service:mdns' group='service:mdns'
) )
context = self.get_context() context = self.get_context()
test_domain = objects.Domain(**self.test_domain) test_domain = objects.Domain.from_dict(self.test_domain)
status, serial, retries = self.notify.get_serial_number( status, serial, retries = self.notify.get_serial_number(
context, test_domain, self.server, 0, 0, 2, 0) context, test_domain, self.server, 0, 0, 2, 0)
response, retry = self.notify.notify_zone_changed( response, retry = self.notify.notify_zone_changed(

View File

@ -29,7 +29,7 @@ class PoolManagerCacheTestCase(object):
'status': 'SUCCESS', 'status': 'SUCCESS',
'serial_number': 1 'serial_number': 1
} }
return objects.PoolManagerStatus(**values) return objects.PoolManagerStatus.from_dict(values)
def test_interface(self): def test_interface(self):
self._ensure_interface(PoolManagerCache, self.cache.__class__) self._ensure_interface(PoolManagerCache, self.cache.__class__)

View File

@ -34,7 +34,7 @@ class PoolManagerAPITest(PoolManagerTestCase):
'name': 'example.org.', 'name': 'example.org.',
'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842' 'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842'
} }
domain = objects.Domain(**values) domain = objects.Domain.from_dict(values)
PoolManagerAPI.get_instance().create_domain(self.admin_context, domain) PoolManagerAPI.get_instance().create_domain(self.admin_context, domain)
mock_prepare.assert_called_once_with( mock_prepare.assert_called_once_with(
@ -52,7 +52,7 @@ class PoolManagerAPITest(PoolManagerTestCase):
'name': 'example.org.', 'name': 'example.org.',
'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842' 'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842'
} }
domain = objects.Domain(**values) domain = objects.Domain.from_dict(values)
PoolManagerAPI.get_instance().delete_domain(self.admin_context, domain) PoolManagerAPI.get_instance().delete_domain(self.admin_context, domain)
mock_prepare.assert_called_once_with( mock_prepare.assert_called_once_with(
@ -70,7 +70,7 @@ class PoolManagerAPITest(PoolManagerTestCase):
'name': 'example.org.', 'name': 'example.org.',
'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842' 'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842'
} }
domain = objects.Domain(**values) domain = objects.Domain.from_dict(values)
PoolManagerAPI.get_instance().update_domain(self.admin_context, domain) PoolManagerAPI.get_instance().update_domain(self.admin_context, domain)
mock_prepare.assert_called_once_with( mock_prepare.assert_called_once_with(
@ -88,12 +88,12 @@ class PoolManagerAPITest(PoolManagerTestCase):
'name': 'example.org.', 'name': 'example.org.',
'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842' 'pool_id': '794ccc2c-d751-44fe-b57f-8894c9f5c842'
} }
domain = objects.Domain(**values) domain = objects.Domain.from_dict(values)
values = { values = {
'host': '127.0.0.1', 'host': '127.0.0.1',
'port': '53' 'port': '53'
} }
server = objects.PoolServer(**values) server = objects.PoolServer.from_dict(values)
PoolManagerAPI.get_instance().update_status( PoolManagerAPI.get_instance().update_status(
self.admin_context, domain, server, 'SUCCESS', 1) self.admin_context, domain, server, 'SUCCESS', 1)

View File

@ -74,7 +74,7 @@ class PoolManagerServiceMemcacheTest(PoolManagerTestCase):
'serial': 1422062497, 'serial': 1422062497,
'status': status 'status': status
} }
return objects.Domain(**values) return objects.Domain.from_dict(values)
@patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number') @patch.object(mdns_rpcapi.MdnsAPI, 'poll_for_serial_number')
@patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed') @patch.object(mdns_rpcapi.MdnsAPI, 'notify_zone_changed')

View File

@ -75,7 +75,7 @@ class PoolManagerServiceTest(PoolManagerTestCase):
'serial': 1422062497, 'serial': 1422062497,
'status': status 'status': status
} }
return objects.Domain(**values) return objects.Domain.from_dict(values)
def test_stop(self): def test_stop(self):
# NOTE: Start is already done by the fixture in start_service() # NOTE: Start is already done by the fixture in start_service()

View File

@ -280,7 +280,7 @@ class StorageTestCase(object):
values = self.get_tsigkey_fixture() values = self.get_tsigkey_fixture()
result = self.storage.create_tsigkey( result = self.storage.create_tsigkey(
self.admin_context, tsigkey=objects.TsigKey(**values)) self.admin_context, tsigkey=objects.TsigKey.from_dict(values))
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
@ -503,7 +503,7 @@ class StorageTestCase(object):
} }
result = self.storage.create_domain( result = self.storage.create_domain(
self.admin_context, domain=objects.Domain(**values)) self.admin_context, domain=objects.Domain.from_dict(values))
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
@ -785,7 +785,7 @@ class StorageTestCase(object):
result = self.storage.create_recordset( result = self.storage.create_recordset(
self.admin_context, self.admin_context,
domain['id'], domain['id'],
recordset=objects.RecordSet(**values)) recordset=objects.RecordSet.from_dict(values))
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
@ -1201,10 +1201,9 @@ class StorageTestCase(object):
'data': '192.0.2.1', 'data': '192.0.2.1',
} }
result = self.storage.create_record(self.admin_context, result = self.storage.create_record(
domain['id'], self.admin_context, domain['id'], recordset['id'],
recordset['id'], objects.Record.from_dict(values))
record=objects.Record(**values))
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
@ -1503,8 +1502,9 @@ class StorageTestCase(object):
'description': 'This is a comment.' 'description': 'This is a comment.'
} }
result = self.storage.create_tld(self.admin_context, result = self.storage.create_tld(
objects.Tld(**values)) self.admin_context, objects.Tld.from_dict(values))
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
self.assertIsNone(result['updated_at']) self.assertIsNone(result['updated_at'])
@ -1658,8 +1658,7 @@ class StorageTestCase(object):
} }
result = self.storage.create_blacklist( result = self.storage.create_blacklist(
self.admin_context, blacklist=objects.Blacklist(**values) self.admin_context, objects.Blacklist.from_dict(values))
)
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
@ -1807,8 +1806,8 @@ class StorageTestCase(object):
'provisioner': 'UNMANAGED' 'provisioner': 'UNMANAGED'
} }
result = self.storage.create_pool(self.admin_context, result = self.storage.create_pool(
pool=objects.Pool(**values)) self.admin_context, objects.Pool.from_dict(values))
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
@ -1972,7 +1971,7 @@ class StorageTestCase(object):
} }
result = self.storage.create_zone_transfer_request( result = self.storage.create_zone_transfer_request(
self.admin_context, objects.ZoneTransferRequest(**values)) self.admin_context, objects.ZoneTransferRequest.from_dict(values))
self.assertEqual(result['tenant_id'], self.admin_context.tenant) self.assertEqual(result['tenant_id'], self.admin_context.tenant)
self.assertIn('status', result) self.assertIn('status', result)
@ -1990,7 +1989,7 @@ class StorageTestCase(object):
} }
result = self.storage.create_zone_transfer_request( result = self.storage.create_zone_transfer_request(
self.admin_context, objects.ZoneTransferRequest(**values)) self.admin_context, objects.ZoneTransferRequest.from_dict(values))
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
@ -2021,7 +2020,7 @@ class StorageTestCase(object):
} }
self.storage.create_zone_transfer_request( self.storage.create_zone_transfer_request(
self.admin_context, objects.ZoneTransferRequest(**values)) self.admin_context, objects.ZoneTransferRequest.from_dict(values))
requests = self.storage.find_zone_transfer_requests( requests = self.storage.find_zone_transfer_requests(
self.admin_context, {"tenant_id": self.admin_context.tenant}) self.admin_context, {"tenant_id": self.admin_context.tenant})
@ -2068,7 +2067,7 @@ class StorageTestCase(object):
} }
result = self.storage.create_zone_transfer_accept( result = self.storage.create_zone_transfer_accept(
self.admin_context, objects.ZoneTransferAccept(**values)) self.admin_context, objects.ZoneTransferAccept.from_dict(values))
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])
@ -2088,7 +2087,7 @@ class StorageTestCase(object):
} }
self.storage.create_zone_transfer_accept( self.storage.create_zone_transfer_accept(
self.admin_context, objects.ZoneTransferAccept(**values)) self.admin_context, objects.ZoneTransferAccept.from_dict(values))
accepts = self.storage.find_zone_transfer_accepts( accepts = self.storage.find_zone_transfer_accepts(
self.admin_context, {"tenant_id": self.admin_context.tenant}) self.admin_context, {"tenant_id": self.admin_context.tenant})
@ -2105,7 +2104,7 @@ class StorageTestCase(object):
} }
result = self.storage.create_zone_transfer_accept( result = self.storage.create_zone_transfer_accept(
self.admin_context, objects.ZoneTransferAccept(**values)) self.admin_context, objects.ZoneTransferAccept.from_dict(values))
accept = self.storage.find_zone_transfer_accept( accept = self.storage.find_zone_transfer_accept(
self.admin_context, {"id": result.id}) self.admin_context, {"id": result.id})
@ -2182,10 +2181,8 @@ class StorageTestCase(object):
} }
result = self.storage.create_pool_attribute( result = self.storage.create_pool_attribute(
self.admin_context, self.admin_context, values['pool_id'],
pool_id=values['pool_id'], objects.PoolAttribute.from_dict(values))
pool_attribute=objects.PoolAttribute(**values)
)
self.assertIsNotNone(result['id']) self.assertIsNotNone(result['id'])
self.assertIsNotNone(result['created_at']) self.assertIsNotNone(result['created_at'])