diff --git a/designate/tests/test_central/test_service.py b/designate/tests/test_central/test_service.py index bd9d45665..204e1d219 100644 --- a/designate/tests/test_central/test_service.py +++ b/designate/tests/test_central/test_service.py @@ -262,38 +262,38 @@ class CentralServiceTest(CentralTestCase): # Ensure all values have been set correctly self.assertIsNotNone(tld['id']) - self.assertEqual(tld['name'], self.get_tld_fixture(fixture=0)['name']) + self.assertEqual(self.get_tld_fixture(fixture=0)['name'], tld['name']) # Create a TLD with more than one label tld = self.create_tld(fixture=1) # Ensure all values have been set correctly self.assertIsNotNone(tld['id']) - self.assertEqual(tld['name'], self.get_tld_fixture(fixture=1)['name']) + self.assertEqual(self.get_tld_fixture(fixture=1)['name'], tld['name']) def test_find_tlds(self): # Ensure we have no tlds to start with. tlds = self.central_service.find_tlds(self.admin_context) - self.assertEqual(len(tlds), 0) + self.assertEqual(0, len(tlds)) # Create a single tld self.create_tld(fixture=0) # Ensure we can retrieve the newly created tld tlds = self.central_service.find_tlds(self.admin_context) - self.assertEqual(len(tlds), 1) - self.assertEqual(tlds[0]['name'], - self.get_tld_fixture(fixture=0)['name']) + self.assertEqual(1, len(tlds)) + self.assertEqual(self.get_tld_fixture(fixture=0)['name'], + tlds[0]['name']) # Create a second tld self.create_tld(fixture=1) # Ensure we can retrieve both tlds tlds = self.central_service.find_tlds(self.admin_context) - self.assertEqual(len(tlds), 2) - self.assertEqual(tlds[0]['name'], - self.get_tld_fixture(fixture=0)['name']) - self.assertEqual(tlds[1]['name'], - self.get_tld_fixture(fixture=1)['name']) + self.assertEqual(2, len(tlds)) + self.assertEqual(self.get_tld_fixture(fixture=0)['name'], + tlds[0]['name']) + self.assertEqual(self.get_tld_fixture(fixture=1)['name'], + tlds[1]['name']) def test_get_tld(self): # Create a tld @@ -304,8 +304,8 @@ class CentralServiceTest(CentralTestCase): tld = self.central_service.get_tld( self.admin_context, expected_tld['id']) - self.assertEqual(tld['id'], expected_tld['id']) - self.assertEqual(tld['name'], expected_tld['name']) + self.assertEqual(expected_tld['id'], tld['id']) + self.assertEqual(expected_tld['name'], tld['name']) def test_update_tld(self): # Create a tld @@ -345,31 +345,31 @@ class CentralServiceTest(CentralTestCase): # Ensure all values have been set correctly self.assertIsNotNone(tsigkey['id']) - self.assertEqual(tsigkey['name'], values['name']) - self.assertEqual(tsigkey['algorithm'], values['algorithm']) - self.assertEqual(tsigkey['secret'], values['secret']) + self.assertEqual(values['name'], tsigkey['name']) + self.assertEqual(values['algorithm'], tsigkey['algorithm']) + self.assertEqual(values['secret'], tsigkey['secret']) def test_find_tsigkeys(self): # Ensure we have no tsigkeys to start with. tsigkeys = self.central_service.find_tsigkeys(self.admin_context) - self.assertEqual(len(tsigkeys), 0) + self.assertEqual(0, len(tsigkeys)) # Create a single tsigkey (using default values) tsigkey_one = self.create_tsigkey() # Ensure we can retrieve the newly created tsigkey tsigkeys = self.central_service.find_tsigkeys(self.admin_context) - self.assertEqual(len(tsigkeys), 1) - self.assertEqual(tsigkeys[0]['name'], tsigkey_one['name']) + self.assertEqual(1, len(tsigkeys)) + self.assertEqual(tsigkey_one['name'], tsigkeys[0]['name']) # Create a second tsigkey tsigkey_two = self.create_tsigkey(fixture=1) # Ensure we can retrieve both tsigkeys tsigkeys = self.central_service.find_tsigkeys(self.admin_context) - self.assertEqual(len(tsigkeys), 2) - self.assertEqual(tsigkeys[0]['name'], tsigkey_one['name']) - self.assertEqual(tsigkeys[1]['name'], tsigkey_two['name']) + self.assertEqual(2, len(tsigkeys)) + self.assertEqual(tsigkey_one['name'], tsigkeys[0]['name']) + self.assertEqual(tsigkey_two['name'], tsigkeys[1]['name']) def test_get_tsigkey(self): # Create a tsigkey @@ -379,10 +379,10 @@ class CentralServiceTest(CentralTestCase): tsigkey = self.central_service.get_tsigkey( self.admin_context, expected['id']) - self.assertEqual(tsigkey['id'], expected['id']) - self.assertEqual(tsigkey['name'], expected['name']) - self.assertEqual(tsigkey['algorithm'], expected['algorithm']) - self.assertEqual(tsigkey['secret'], expected['secret']) + self.assertEqual(expected['id'], tsigkey['id']) + self.assertEqual(expected['name'], tsigkey['name']) + self.assertEqual(expected['algorithm'], tsigkey['algorithm']) + self.assertEqual(expected['secret'], tsigkey['secret']) def test_update_tsigkey(self): # Create a tsigkey @@ -422,14 +422,14 @@ class CentralServiceTest(CentralTestCase): # in the beginning, there should be nothing tenants = self.central_service.count_tenants(admin_context) - self.assertEqual(tenants, 0) + self.assertEqual(0, tenants) # Explicitly set a tenant_id self.create_domain(fixture=0, context=tenant_one_context) self.create_domain(fixture=1, context=tenant_two_context) tenants = self.central_service.count_tenants(admin_context) - self.assertEqual(tenants, 2) + self.assertEqual(2, tenants) def test_count_tenants_policy_check(self): # Set the policy to reject the authz @@ -450,11 +450,11 @@ class CentralServiceTest(CentralTestCase): # Ensure all values have been set correctly self.assertIsNotNone(domain['id']) - self.assertEqual(domain['name'], values['name']) - self.assertEqual(domain['email'], values['email']) + self.assertEqual(values['name'], domain['name']) + self.assertEqual(values['email'], domain['email']) self.assertIn('status', domain) - self.assertEqual(mock_notifier.call_count, 1) + self.assertEqual(1, mock_notifier.call_count) # Ensure the correct NS Records are in place pool = self.central_service.get_pool( @@ -465,7 +465,7 @@ class CentralServiceTest(CentralTestCase): criterion={'domain_id': domain.id, 'type': "NS"}) self.assertIsNotNone(ns_recordset.id) - self.assertEqual(ns_recordset.type, 'NS') + self.assertEqual('NS', ns_recordset.type) self.assertIsNotNone(ns_recordset.records) self.assertEqual(set([n.hostname for n in pool.ns_records]), set([n.data for n in ns_recordset.records])) @@ -533,7 +533,7 @@ class CentralServiceTest(CentralTestCase): # Ensure all values have been set correctly self.assertIsNotNone(domain['id']) - self.assertEqual(domain['parent_domain_id'], parent_domain['id']) + self.assertEqual(parent_domain['id'], domain['parent_domain_id']) def test_create_subdomain_different_pools(self): fixture = self.get_domain_fixture() @@ -574,7 +574,7 @@ class CentralServiceTest(CentralTestCase): # Ensure all values have been set correctly self.assertIsNotNone(parent_domain['id']) - self.assertEqual(subdomain['parent_domain_id'], parent_domain['id']) + self.assertEqual(parent_domain['id'], subdomain['parent_domain_id']) def test_create_subdomain_failure(self): context = self.get_admin_context() @@ -719,29 +719,29 @@ class CentralServiceTest(CentralTestCase): self.admin_context, objects.Domain.from_dict(values)) # Ensure all values have been set correctly - self.assertEqual(domain['ttl'], values['ttl']) + self.assertEqual(values['ttl'], domain['ttl']) def test_find_domains(self): # Ensure we have no domains to start with. domains = self.central_service.find_domains(self.admin_context) - self.assertEqual(len(domains), 0) + self.assertEqual(0, len(domains)) # Create a single domain (using default values) self.create_domain() # Ensure we can retrieve the newly created domain domains = self.central_service.find_domains(self.admin_context) - self.assertEqual(len(domains), 1) - self.assertEqual(domains[0]['name'], 'example.com.') + self.assertEqual(1, len(domains)) + self.assertEqual('example.com.', domains[0]['name']) # Create a second domain self.create_domain(name='example.net.') # Ensure we can retrieve both domain domains = self.central_service.find_domains(self.admin_context) - self.assertEqual(len(domains), 2) - self.assertEqual(domains[0]['name'], 'example.com.') - self.assertEqual(domains[1]['name'], 'example.net.') + self.assertEqual(2, len(domains)) + self.assertEqual('example.com.', domains[0]['name']) + self.assertEqual('example.net.', domains[1]['name']) def test_find_domains_criteria(self): # Create a domain @@ -754,9 +754,9 @@ class CentralServiceTest(CentralTestCase): domains = self.central_service.find_domains( self.admin_context, criterion) - self.assertEqual(domains[0]['id'], expected_domain['id']) - self.assertEqual(domains[0]['name'], expected_domain['name']) - self.assertEqual(domains[0]['email'], expected_domain['email']) + self.assertEqual(expected_domain['id'], domains[0]['id']) + self.assertEqual(expected_domain['name'], domains[0]['name']) + self.assertEqual(expected_domain['email'], domains[0]['email']) def test_find_domains_tenant_restrictions(self): admin_context = self.get_admin_context() @@ -767,24 +767,24 @@ class CentralServiceTest(CentralTestCase): # Ensure we have no domains to start with. domains = self.central_service.find_domains(admin_context) - self.assertEqual(len(domains), 0) + self.assertEqual(0, len(domains)) # Create a single domain (using default values) domain = self.create_domain(context=tenant_one_context) # Ensure admins can retrieve the newly created domain domains = self.central_service.find_domains(admin_context) - self.assertEqual(len(domains), 1) - self.assertEqual(domains[0]['name'], domain['name']) + self.assertEqual(1, len(domains)) + self.assertEqual(domain['name'], domains[0]['name']) # Ensure tenant=1 can retrieve the newly created domain domains = self.central_service.find_domains(tenant_one_context) - self.assertEqual(len(domains), 1) - self.assertEqual(domains[0]['name'], domain['name']) + self.assertEqual(1, len(domains)) + self.assertEqual(domain['name'], domains[0]['name']) # Ensure tenant=2 can NOT retrieve the newly created domain domains = self.central_service.find_domains(tenant_two_context) - self.assertEqual(len(domains), 0) + self.assertEqual(0, len(domains)) def test_get_domain(self): # Create a domain @@ -795,9 +795,9 @@ class CentralServiceTest(CentralTestCase): domain = self.central_service.get_domain( self.admin_context, expected_domain['id']) - self.assertEqual(domain['id'], expected_domain['id']) - self.assertEqual(domain['name'], expected_domain['name']) - self.assertEqual(domain['email'], expected_domain['email']) + self.assertEqual(expected_domain['id'], domain['id']) + self.assertEqual(expected_domain['name'], domain['name']) + self.assertEqual(expected_domain['email'], domain['email']) def test_get_domain_servers(self): # Create a domain @@ -820,9 +820,9 @@ class CentralServiceTest(CentralTestCase): domain = self.central_service.find_domain( self.admin_context, criterion) - self.assertEqual(domain['id'], expected_domain['id']) - self.assertEqual(domain['name'], expected_domain['name']) - self.assertEqual(domain['email'], expected_domain['email']) + self.assertEqual(expected_domain['id'], domain['id']) + self.assertEqual(expected_domain['name'], domain['name']) + self.assertEqual(expected_domain['email'], domain['email']) self.assertIn('status', domain) @mock.patch.object(notifier.Notifier, "info") @@ -848,7 +848,7 @@ class CentralServiceTest(CentralTestCase): self.assertTrue(domain.serial > original_serial) self.assertEqual('info@example.net', domain.email) - self.assertEqual(mock_notifier.call_count, 1) + self.assertEqual(1, mock_notifier.call_count) # Check that the object used in the notify is a Domain and the id # matches up @@ -948,18 +948,18 @@ class CentralServiceTest(CentralTestCase): self.admin_context, domain['id']) # Ensure the domain is marked for deletion - self.assertEqual(deleted_domain.id, domain.id) - self.assertEqual(deleted_domain.name, domain.name) - self.assertEqual(deleted_domain.email, domain.email) - self.assertEqual(deleted_domain.status, 'PENDING') - self.assertEqual(deleted_domain.tenant_id, domain.tenant_id) - self.assertEqual(deleted_domain.parent_domain_id, - domain.parent_domain_id) - self.assertEqual(deleted_domain.action, 'DELETE') - self.assertEqual(deleted_domain.serial, domain.serial) - self.assertEqual(deleted_domain.pool_id, domain.pool_id) + self.assertEqual(domain.id, deleted_domain.id) + self.assertEqual(domain.name, deleted_domain.name) + self.assertEqual(domain.email, deleted_domain.email) + self.assertEqual('PENDING', deleted_domain.status) + self.assertEqual(domain.tenant_id, deleted_domain.tenant_id) + self.assertEqual(domain.parent_domain_id, + deleted_domain.parent_domain_id) + self.assertEqual('DELETE', deleted_domain.action) + self.assertEqual(domain.serial, deleted_domain.serial) + self.assertEqual(domain.pool_id, deleted_domain.pool_id) - self.assertEqual(mock_notifier.call_count, 1) + self.assertEqual(1, mock_notifier.call_count) # Check that the object used in the notify is a Domain and the id # matches up @@ -985,7 +985,7 @@ class CentralServiceTest(CentralTestCase): def test_count_domains(self): # in the beginning, there should be nothing domains = self.central_service.count_domains(self.admin_context) - self.assertEqual(domains, 0) + self.assertEqual(0, domains) # Create a single domain self.create_domain() @@ -994,7 +994,7 @@ class CentralServiceTest(CentralTestCase): domains = self.central_service.count_domains(self.admin_context) # well, did we get 1? - self.assertEqual(domains, 1) + self.assertEqual(1, domains) def test_count_domains_policy_check(self): # Set the policy to reject the authz @@ -1051,7 +1051,7 @@ class CentralServiceTest(CentralTestCase): ) pxy = self.central_service.storage.session.execute(query) - self.assertEqual(pxy.rowcount, 1) + self.assertEqual(1, pxy.rowcount) return domain @mock.patch.object(notifier.Notifier, "info") @@ -1112,7 +1112,7 @@ class CentralServiceTest(CentralTestCase): limit=100, ) self._assert_count_all_domains(2) - self.assertEqual(purge_cnt, 1) + self.assertEqual(1, purge_cnt) @mock.patch.object(notifier.Notifier, "info") def test_purge_domains_without_time_threshold(self, mock_notifier): @@ -1132,7 +1132,7 @@ class CentralServiceTest(CentralTestCase): limit=100, ) self._assert_count_all_domains(1) - self.assertEqual(purge_cnt, 2) + self.assertEqual(2, purge_cnt) @mock.patch.object(notifier.Notifier, "info") def test_purge_domains_without_deleted_criterion(self, mock_notifier): @@ -1154,7 +1154,7 @@ class CentralServiceTest(CentralTestCase): limit=100, ) self._assert_count_all_domains(3) - self.assertEqual(purge_cnt, None) + self.assertEqual(None, purge_cnt) @mock.patch.object(notifier.Notifier, "info") def test_purge_domains_by_name(self, mock_notifier): @@ -1169,7 +1169,7 @@ class CentralServiceTest(CentralTestCase): limit=100, ) self._assert_count_all_domains(0) - self.assertEqual(purge_cnt, 1) + self.assertEqual(1, purge_cnt) @mock.patch.object(notifier.Notifier, "info") def test_purge_domains_without_any_criterion(self, mock_notifier): @@ -1197,7 +1197,7 @@ class CentralServiceTest(CentralTestCase): limit=100, ) n_zones = self.central_service.count_domains(self.admin_context) - self.assertEqual(n_zones, 1) + self.assertEqual(1, n_zones) # purge domains in a shard that contains the domain created above self.central_service.purge_domains( @@ -1210,7 +1210,7 @@ class CentralServiceTest(CentralTestCase): limit=100, ) n_zones = self.central_service.count_domains(self.admin_context) - self.assertEqual(n_zones, 0) + self.assertEqual(0, n_zones) def test_purge_domains_walk_up_domains(self): Zone = namedtuple('Zone', 'id parent_domain_id') @@ -1219,11 +1219,11 @@ class CentralServiceTest(CentralTestCase): zones_by_id = {z.id: z for z in zones} sid = self.central_service.storage._walk_up_domains( zones[0], zones_by_id) - self.assertEqual(sid, 1234) + self.assertEqual(1234, sid) sid = self.central_service.storage._walk_up_domains( zones[-1], zones_by_id) - self.assertEqual(sid, 1234) + self.assertEqual(1234, sid) def test_purge_domains_walk_up_domains_loop(self): Zone = namedtuple('Zone', 'id parent_domain_id') @@ -1249,10 +1249,10 @@ class CentralServiceTest(CentralTestCase): self._delete_domain(z3, old) self._delete_domain(z4, old) - self.assertEqual(z2['parent_domain_id'], z1.id) - self.assertEqual(z3['parent_domain_id'], z2.id) - self.assertEqual(z4['parent_domain_id'], z3.id) - self.assertEqual(z5['parent_domain_id'], z4.id) + self.assertEqual(z1.id, z2['parent_domain_id']) + self.assertEqual(z2.id, z3['parent_domain_id']) + self.assertEqual(z3.id, z4['parent_domain_id']) + self.assertEqual(z4.id, z5['parent_domain_id']) self._assert_count_all_domains(5) mock_notifier.reset_mock() @@ -1272,11 +1272,11 @@ class CentralServiceTest(CentralTestCase): self._log_all_domains(zones) for z in zones: if z.name == 'alive.org.': - self.assertEqual(z.parent_domain_id, None) + self.assertEqual(None, z.parent_domain_id) elif z.name == 'alive2.del3.del2.deleted.alive.org.': # alive2.del2.deleted.alive.org is to be reparented under # alive.org - self.assertEqual(z.parent_domain_id, z1.id) + self.assertEqual(z1.id, z.parent_domain_id) else: raise Exception("Unexpected zone %r" % z) @@ -1379,7 +1379,7 @@ class CentralServiceTest(CentralTestCase): self.assertIsNotNone(recordset.records) # The serial number does not get updated is there are no records # in the recordset - self.assertEqual(new_serial, original_serial) + self.assertEqual(original_serial, new_serial) def test_create_recordset_with_records(self): domain = self.create_domain() @@ -1504,7 +1504,7 @@ class CentralServiceTest(CentralTestCase): self.admin_context, domain['id'], recordset=objects.RecordSet.from_dict(values)) - self.assertEqual(recordset['ttl'], values['ttl']) + self.assertEqual(values['ttl'], recordset['ttl']) def test_get_recordset(self): domain = self.create_domain() @@ -1516,9 +1516,9 @@ class CentralServiceTest(CentralTestCase): recordset = self.central_service.get_recordset( self.admin_context, domain['id'], expected['id']) - self.assertEqual(recordset['id'], expected['id']) - self.assertEqual(recordset['name'], expected['name']) - self.assertEqual(recordset['type'], expected['type']) + self.assertEqual(expected['id'], recordset['id']) + self.assertEqual(expected['name'], recordset['name']) + self.assertEqual(expected['type'], recordset['type']) def test_get_recordset_with_records(self): domain = self.create_domain() @@ -1558,7 +1558,7 @@ class CentralServiceTest(CentralTestCase): recordsets = self.central_service.find_recordsets( self.admin_context, criterion) - self.assertEqual(len(recordsets), 2) + self.assertEqual(2, len(recordsets)) # Create a single recordset (using default values) self.create_recordset(domain, name='www.%s' % domain['name']) @@ -1567,8 +1567,8 @@ class CentralServiceTest(CentralTestCase): recordsets = self.central_service.find_recordsets( self.admin_context, criterion) - self.assertEqual(len(recordsets), 3) - self.assertEqual(recordsets[2]['name'], 'www.%s' % domain['name']) + self.assertEqual(3, len(recordsets)) + self.assertEqual('www.%s' % domain['name'], recordsets[2]['name']) # Create a second recordset self.create_recordset(domain, name='mail.%s' % domain['name']) @@ -1577,9 +1577,9 @@ class CentralServiceTest(CentralTestCase): recordsets = self.central_service.find_recordsets( self.admin_context, criterion) - self.assertEqual(len(recordsets), 4) - self.assertEqual(recordsets[2]['name'], 'www.%s' % domain['name']) - self.assertEqual(recordsets[3]['name'], 'mail.%s' % domain['name']) + self.assertEqual(4, len(recordsets)) + self.assertEqual('www.%s' % domain['name'], recordsets[2]['name']) + self.assertEqual('mail.%s' % domain['name'], recordsets[3]['name']) def test_find_recordset(self): domain = self.create_domain() @@ -1593,8 +1593,8 @@ class CentralServiceTest(CentralTestCase): recordset = self.central_service.find_recordset( self.admin_context, criterion) - self.assertEqual(recordset['id'], expected['id']) - self.assertEqual(recordset['name'], expected['name']) + self.assertEqual(expected['id'], recordset['id'], expected['id']) + self.assertEqual(expected['name'], recordset['name']) def test_find_recordset_with_records(self): domain = self.create_domain() @@ -1638,7 +1638,7 @@ class CentralServiceTest(CentralTestCase): self.admin_context, recordset.domain_id, recordset.id) # Ensure the new value took - self.assertEqual(recordset.ttl, 1800) + self.assertEqual(1800, recordset.ttl) self.assertThat(new_serial, GreaterThan(original_serial)) def test_update_recordset_deadlock_retry(self): @@ -1788,7 +1788,7 @@ class CentralServiceTest(CentralTestCase): self.admin_context, recordset.domain_id, recordset.id) # Ensure the recordset was updated correctly - self.assertEqual(recordset.ttl, 1800) + self.assertEqual(1800, recordset.ttl) # Ensure the domains serial number was not updated domain_after = self.central_service.get_domain( @@ -1906,7 +1906,7 @@ class CentralServiceTest(CentralTestCase): def test_count_recordsets(self): # in the beginning, there should be nothing recordsets = self.central_service.count_recordsets(self.admin_context) - self.assertEqual(recordsets, 0) + self.assertEqual(0, recordsets) # Create a domain to put our recordset in domain = self.create_domain() @@ -1916,7 +1916,7 @@ class CentralServiceTest(CentralTestCase): # We should have 1 recordset now, plus SOA & NS recordsets recordsets = self.central_service.count_recordsets(self.admin_context) - self.assertEqual(recordsets, 3) + self.assertEqual(3, recordsets) def test_count_recordsets_policy_check(self): # Set the policy to reject the authz @@ -1941,7 +1941,7 @@ class CentralServiceTest(CentralTestCase): # Ensure all values have been set correctly self.assertIsNotNone(record['id']) - self.assertEqual(record['data'], values['data']) + self.assertEqual(values['data'], record['data']) self.assertIn('status', record) def test_create_record_over_domain_quota(self): @@ -2000,8 +2000,8 @@ class CentralServiceTest(CentralTestCase): record = self.central_service.get_record( self.admin_context, domain['id'], recordset['id'], expected['id']) - self.assertEqual(record['id'], expected['id']) - self.assertEqual(record['data'], expected['data']) + self.assertEqual(expected['id'], record['id']) + self.assertEqual(expected['data'], record['data']) self.assertIn('status', record) def test_get_record_incorrect_domain_id(self): @@ -2045,7 +2045,7 @@ class CentralServiceTest(CentralTestCase): records = self.central_service.find_records( self.admin_context, criterion) - self.assertEqual(len(records), 0) + self.assertEqual(0, len(records)) # Create a single record (using default values) expected_one = self.create_record(domain, recordset) @@ -2054,8 +2054,8 @@ class CentralServiceTest(CentralTestCase): records = self.central_service.find_records( self.admin_context, criterion) - self.assertEqual(len(records), 1) - self.assertEqual(records[0]['data'], expected_one['data']) + self.assertEqual(1, len(records)) + self.assertEqual(expected_one['data'], records[0]['data']) # Create a second record expected_two = self.create_record(domain, recordset, fixture=1) @@ -2064,9 +2064,9 @@ class CentralServiceTest(CentralTestCase): records = self.central_service.find_records( self.admin_context, criterion) - self.assertEqual(len(records), 2) - self.assertEqual(records[0]['data'], expected_one['data']) - self.assertEqual(records[1]['data'], expected_two['data']) + self.assertEqual(2, len(records)) + self.assertEqual(expected_one['data'], records[0]['data']) + self.assertEqual(expected_two['data'], records[1]['data']) def test_find_record(self): domain = self.create_domain() @@ -2085,8 +2085,8 @@ class CentralServiceTest(CentralTestCase): record = self.central_service.find_record( self.admin_context, criterion) - self.assertEqual(record['id'], expected['id']) - self.assertEqual(record['data'], expected['data']) + self.assertEqual(expected['id'], record['id']) + self.assertEqual(expected['data'], record['data']) self.assertIn('status', record) def test_update_record(self): @@ -2198,14 +2198,14 @@ class CentralServiceTest(CentralTestCase): record['id']) # Ensure the record is marked for deletion - self.assertEqual(deleted_record.id, record.id) - self.assertEqual(deleted_record.data, record.data) - self.assertEqual(deleted_record.domain_id, record.domain_id) - self.assertEqual(deleted_record.status, 'PENDING') - self.assertEqual(deleted_record.tenant_id, record.tenant_id) - self.assertEqual(deleted_record.recordset_id, record.recordset_id) - self.assertEqual(deleted_record.action, 'DELETE') - self.assertEqual(deleted_record.serial, new_domain_serial) + self.assertEqual(record.id, deleted_record.id) + self.assertEqual(record.data, deleted_record.data) + self.assertEqual(record.domain_id, deleted_record.domain_id) + self.assertEqual('PENDING', deleted_record.status) + self.assertEqual(record.tenant_id, deleted_record.tenant_id) + self.assertEqual(record.recordset_id, deleted_record.recordset_id) + self.assertEqual('DELETE', deleted_record.action) + self.assertEqual(new_domain_serial, deleted_record.serial) def test_delete_record_without_incrementing_serial(self): domain = self.create_domain() @@ -2226,7 +2226,7 @@ class CentralServiceTest(CentralTestCase): # Ensure the domains serial number was not updated new_domain_serial = self.central_service.get_domain( self.admin_context, domain['id'])['serial'] - self.assertEqual(new_domain_serial, domain_serial) + self.assertEqual(domain_serial, new_domain_serial) # Fetch the record deleted_record = self.central_service.get_record( @@ -2234,14 +2234,14 @@ class CentralServiceTest(CentralTestCase): record['id']) # Ensure the record is marked for deletion - self.assertEqual(deleted_record.id, record.id) - self.assertEqual(deleted_record.data, record.data) - self.assertEqual(deleted_record.domain_id, record.domain_id) - self.assertEqual(deleted_record.status, 'PENDING') - self.assertEqual(deleted_record.tenant_id, record.tenant_id) - self.assertEqual(deleted_record.recordset_id, record.recordset_id) - self.assertEqual(deleted_record.action, 'DELETE') - self.assertEqual(deleted_record.serial, new_domain_serial) + self.assertEqual(record.id, deleted_record.id) + self.assertEqual(record.data, deleted_record.data) + self.assertEqual(record.domain_id, deleted_record.domain_id) + self.assertEqual('PENDING', deleted_record.status) + self.assertEqual(record.tenant_id, deleted_record.tenant_id) + self.assertEqual(record.recordset_id, deleted_record.recordset_id) + self.assertEqual('DELETE', deleted_record.action) + self.assertEqual(new_domain_serial, deleted_record.serial) def test_delete_record_incorrect_domain_id(self): domain = self.create_domain() @@ -2274,7 +2274,7 @@ class CentralServiceTest(CentralTestCase): def test_count_records(self): # in the beginning, there should be nothing records = self.central_service.count_records(self.admin_context) - self.assertEqual(records, 0) + self.assertEqual(0, records) # Create a domain and recordset to put our record in domain = self.create_domain() @@ -2285,7 +2285,7 @@ class CentralServiceTest(CentralTestCase): # we should have 1 record now, plus SOA & NS records records = self.central_service.count_records(self.admin_context) - self.assertEqual(records, 3) + self.assertEqual(3, records) def test_count_records_policy_check(self): # Set the policy to reject the authz @@ -2621,8 +2621,8 @@ class CentralServiceTest(CentralTestCase): # Verify all values have been set correctly self.assertIsNotNone(blacklist['id']) - self.assertEqual(blacklist['pattern'], values['pattern']) - self.assertEqual(blacklist['description'], values['description']) + self.assertEqual(values['pattern'], blacklist['pattern']) + self.assertEqual(values['description'], blacklist['description']) def test_get_blacklist(self): # Create a blacklisted zone @@ -2632,16 +2632,16 @@ class CentralServiceTest(CentralTestCase): blacklist = self.central_service.get_blacklist( self.admin_context, expected['id']) - self.assertEqual(blacklist['id'], expected['id']) - self.assertEqual(blacklist['pattern'], expected['pattern']) - self.assertEqual(blacklist['description'], expected['description']) + self.assertEqual(expected['id'], blacklist['id']) + self.assertEqual(expected['pattern'], blacklist['pattern']) + self.assertEqual(expected['description'], blacklist['description']) def test_find_blacklists(self): # Verify there are no blacklisted zones to start with blacklists = self.central_service.find_blacklists( self.admin_context) - self.assertEqual(len(blacklists), 0) + self.assertEqual(0, len(blacklists)) # Create a single blacklisted zone self.create_blacklist() @@ -2651,8 +2651,8 @@ class CentralServiceTest(CentralTestCase): self.admin_context) values1 = self.get_blacklist_fixture(fixture=0) - self.assertEqual(len(blacklists), 1) - self.assertEqual(blacklists[0]['pattern'], values1['pattern']) + self.assertEqual(1, len(blacklists)) + self.assertEqual(values1['pattern'], blacklists[0]['pattern']) # Create a second blacklisted zone self.create_blacklist(fixture=1) @@ -2663,9 +2663,9 @@ class CentralServiceTest(CentralTestCase): values2 = self.get_blacklist_fixture(fixture=1) - self.assertEqual(len(blacklists), 2) - self.assertEqual(blacklists[0]['pattern'], values1['pattern']) - self.assertEqual(blacklists[1]['pattern'], values2['pattern']) + self.assertEqual(2, len(blacklists)) + self.assertEqual(values1['pattern'], blacklists[0]['pattern']) + self.assertEqual(values2['pattern'], blacklists[1]['pattern']) def test_find_blacklist(self): # Create a blacklisted zone @@ -2675,8 +2675,8 @@ class CentralServiceTest(CentralTestCase): blacklist = self.central_service.find_blacklist( self.admin_context, {'id': expected['id']}) - self.assertEqual(blacklist['pattern'], expected['pattern']) - self.assertEqual(blacklist['description'], expected['description']) + self.assertEqual(expected['pattern'], blacklist['pattern']) + self.assertEqual(expected['description'], blacklist['description']) def test_update_blacklist(self): # Create a blacklisted zone @@ -2731,12 +2731,12 @@ class CentralServiceTest(CentralTestCase): self.assertIsNotNone(soa.id) self.assertEqual('SOA', soa.type) self.assertIsNotNone(soa.records) - self.assertEqual(int(soa_record_values[2]), zone['serial']) - self.assertEqual(soa_record_values[1], zone_email) - self.assertEqual(int(soa_record_values[3]), zone['refresh']) - self.assertEqual(int(soa_record_values[4]), zone['retry']) - self.assertEqual(int(soa_record_values[5]), zone['expire']) - self.assertEqual(int(soa_record_values[6]), zone['minimum']) + self.assertEqual(zone['serial'], int(soa_record_values[2])) + self.assertEqual(zone_email, soa_record_values[1]) + self.assertEqual(zone['refresh'], int(soa_record_values[3])) + self.assertEqual(zone['retry'], int(soa_record_values[4])) + self.assertEqual(zone['expire'], int(soa_record_values[5])) + self.assertEqual(zone['minimum'], int(soa_record_values[6])) def test_update_soa(self): # Anytime the zone's serial number is incremented @@ -2762,7 +2762,7 @@ class CentralServiceTest(CentralTestCase): # Split out the various soa values soa_record_values = soa.records[0].data.split() - self.assertEqual(int(soa_record_values[2]), updated_zone['serial']) + self.assertEqual(updated_zone['serial'], int(soa_record_values[2])) # Pool Tests def test_create_pool(self): @@ -2781,7 +2781,7 @@ class CentralServiceTest(CentralTestCase): self.assertIsNotNone(pool['attributes']) self.assertIsNotNone(pool['ns_records']) - self.assertEqual(pool['name'], values['name']) + self.assertEqual(values['name'], pool['name']) # Compare the actual values of attributes and ns_records for k in range(0, len(values['attributes'])): @@ -2803,30 +2803,30 @@ class CentralServiceTest(CentralTestCase): pool = self.central_service.get_pool(self.admin_context, expected['id']) - self.assertEqual(pool['id'], expected['id']) - self.assertEqual(pool['created_at'], expected['created_at']) - self.assertEqual(pool['version'], expected['version']) - self.assertEqual(pool['tenant_id'], expected['tenant_id']) - self.assertEqual(pool['name'], expected['name']) + self.assertEqual(expected['id'], pool['id']) + self.assertEqual(expected['created_at'], pool['created_at']) + self.assertEqual(expected['version'], pool['version']) + self.assertEqual(expected['tenant_id'], pool['tenant_id']) + self.assertEqual(expected['name'], pool['name']) # Compare the actual values of attributes and ns_records for k in range(0, len(expected['attributes'])): self.assertEqual( - pool['attributes'][k].to_primitive()['designate_object.data'], expected['attributes'][k].to_primitive() - ['designate_object.data']) + ['designate_object.data'], + pool['attributes'][k].to_primitive()['designate_object.data']) for k in range(0, len(expected['ns_records'])): self.assertEqual( - pool['ns_records'][k].to_primitive()['designate_object.data'], expected['ns_records'][k].to_primitive() - ['designate_object.data']) + ['designate_object.data'], + pool['ns_records'][k].to_primitive()['designate_object.data']) def test_find_pools(self): # Verify no pools exist, except for default pool pools = self.central_service.find_pools(self.admin_context) - self.assertEqual(len(pools), 1) + self.assertEqual(1, len(pools)) # Create a pool self.create_pool(fixture=0) @@ -2835,21 +2835,21 @@ class CentralServiceTest(CentralTestCase): pools = self.central_service.find_pools(self.admin_context) values = self.get_pool_fixture(fixture=0) - self.assertEqual(len(pools), 2) - self.assertEqual(pools[1]['name'], values['name']) + self.assertEqual(2, len(pools)) + self.assertEqual(values['name'], pools[1]['name']) # Compare the actual values of attributes and ns_records expected_attributes = values['attributes'][0] actual_attributes = \ pools[1]['attributes'][0].to_primitive()['designate_object.data'] for k in expected_attributes: - self.assertEqual(actual_attributes[k], expected_attributes[k]) + self.assertEqual(expected_attributes[k], actual_attributes[k]) expected_ns_records = values['ns_records'][0] actual_ns_records = \ pools[1]['ns_records'][0].to_primitive()['designate_object.data'] for k in expected_ns_records: - self.assertEqual(actual_ns_records[k], expected_ns_records[k]) + self.assertEqual(expected_ns_records[k], actual_ns_records[k]) def test_find_pool(self): # Create a server pool @@ -2859,20 +2859,20 @@ class CentralServiceTest(CentralTestCase): pool = self.central_service.find_pool(self.admin_context, {'id': expected['id']}) - self.assertEqual(pool['name'], expected['name']) + self.assertEqual(expected['name'], pool['name']) # Compare the actual values of attributes and ns_records for k in range(0, len(expected['attributes'])): self.assertEqual( - pool['attributes'][k].to_primitive()['designate_object.data'], expected['attributes'][k].to_primitive() - ['designate_object.data']) + ['designate_object.data'], + pool['attributes'][k].to_primitive()['designate_object.data']) for k in range(0, len(expected['ns_records'])): self.assertEqual( - pool['ns_records'][k].to_primitive()['designate_object.data'], expected['ns_records'][k].to_primitive() - ['designate_object.data']) + ['designate_object.data'], + pool['ns_records'][k].to_primitive()['designate_object.data']) def test_update_pool(self): # Create a server pool @@ -3031,7 +3031,7 @@ class CentralServiceTest(CentralTestCase): new_domain_serial = self.central_service.get_domain( self.admin_context, domain['id']).serial - self.assertEqual(new_domain_serial, domain_serial) + self.assertEqual(domain_serial, new_domain_serial) def test_create_zone_transfer_request(self): domain = self.create_domain() @@ -3041,7 +3041,7 @@ class CentralServiceTest(CentralTestCase): self.assertIsNotNone(zone_transfer_request.id) self.assertIsNotNone(zone_transfer_request.tenant_id) self.assertIsNotNone(zone_transfer_request.key) - self.assertEqual(zone_transfer_request.domain_id, domain.id) + self.assertEqual(domain.id, zone_transfer_request.domain_id) def test_create_zone_transfer_request_duplicate(self): domain = self.create_domain() @@ -3059,11 +3059,11 @@ class CentralServiceTest(CentralTestCase): # Verify all values have been set correctly self.assertIsNotNone(zone_transfer_request.id) self.assertIsNotNone(zone_transfer_request.tenant_id) - self.assertEqual(zone_transfer_request.domain_id, domain.id) + self.assertEqual(domain.id, zone_transfer_request.domain_id) self.assertIsNotNone(zone_transfer_request.key) self.assertEqual( - zone_transfer_request.target_tenant_id, - values['target_tenant_id']) + values['target_tenant_id'], + zone_transfer_request.target_tenant_id) def test_get_zone_transfer_request(self): domain = self.create_domain() @@ -3107,7 +3107,7 @@ class CentralServiceTest(CentralTestCase): self.assertIsNotNone(zone_transfer_request.id) self.assertIsNotNone(zone_transfer_request.tenant_id) self.assertIsNotNone(zone_transfer_request.key) - self.assertEqual(zone_transfer_request.description, 'TEST') + self.assertEqual('TEST', zone_transfer_request.description) def test_delete_zone_transfer_request(self): domain = self.create_domain() @@ -3163,15 +3163,15 @@ class CentralServiceTest(CentralTestCase): admin_context, zone_transfer_request.id) self.assertEqual( - result['domain'].tenant_id, str(tenant_2_context.tenant)) + str(tenant_2_context.tenant), result['domain'].tenant_id) self.assertEqual( - result['recordset'].tenant_id, str(tenant_2_context.tenant)) + str(tenant_2_context.tenant), result['recordset'].tenant_id) self.assertEqual( - result['record'].tenant_id, str(tenant_2_context.tenant)) + str(tenant_2_context.tenant), result['record'].tenant_id) self.assertEqual( - result['zt_accept'].status, 'COMPLETE') + 'COMPLETE', result['zt_accept'].status) self.assertEqual( - result['zt_request'].status, 'COMPLETE') + 'COMPLETE', result['zt_request'].status) def test_create_zone_transfer_accept_scoped(self): tenant_1_context = self.get_context(tenant=1) @@ -3216,15 +3216,15 @@ class CentralServiceTest(CentralTestCase): admin_context, zone_transfer_request.id) self.assertEqual( - result['domain'].tenant_id, str(tenant_2_context.tenant)) + str(tenant_2_context.tenant), result['domain'].tenant_id) self.assertEqual( - result['recordset'].tenant_id, str(tenant_2_context.tenant)) + str(tenant_2_context.tenant), result['recordset'].tenant_id) self.assertEqual( - result['record'].tenant_id, str(tenant_2_context.tenant)) + str(tenant_2_context.tenant), result['record'].tenant_id) self.assertEqual( - result['zt_accept'].status, 'COMPLETE') + 'COMPLETE', result['zt_accept'].status) self.assertEqual( - result['zt_request'].status, 'COMPLETE') + 'COMPLETE', result['zt_request'].status) def test_create_zone_transfer_accept_failed_key(self): tenant_1_context = self.get_context(tenant=1) @@ -3286,9 +3286,9 @@ class CentralServiceTest(CentralTestCase): # Ensure all values have been set correctly self.assertIsNotNone(zone_import['id']) - self.assertEqual(zone_import.status, 'PENDING') - self.assertEqual(zone_import.message, None) - self.assertEqual(zone_import.domain_id, None) + self.assertEqual('PENDING', zone_import.status) + self.assertEqual(None, zone_import.message) + self.assertEqual(None, zone_import.domain_id) self.wait_for_import(zone_import.id) @@ -3298,7 +3298,7 @@ class CentralServiceTest(CentralTestCase): # Ensure we have no zone_imports to start with. zone_imports = self.central_service.find_zone_imports( self.admin_context) - self.assertEqual(len(zone_imports), 0) + self.assertEqual(0, len(zone_imports)) # Create a single zone_import request_body = self.get_zonefile_fixture() @@ -3311,7 +3311,7 @@ class CentralServiceTest(CentralTestCase): # Ensure we can retrieve the newly created zone_import zone_imports = self.central_service.find_zone_imports( self.admin_context) - self.assertEqual(len(zone_imports), 1) + self.assertEqual(1, len(zone_imports)) # Create a second zone_import request_body = self.get_zonefile_fixture(variant="two") @@ -3324,9 +3324,9 @@ class CentralServiceTest(CentralTestCase): # Ensure we can retrieve both zone_imports zone_imports = self.central_service.find_zone_imports( self.admin_context) - self.assertEqual(len(zone_imports), 2) - self.assertEqual(zone_imports[0].status, 'COMPLETE') - self.assertEqual(zone_imports[1].status, 'COMPLETE') + self.assertEqual(2, len(zone_imports)) + self.assertEqual('COMPLETE', zone_imports[0].status) + self.assertEqual('COMPLETE', zone_imports[1].status) def test_get_zone_import(self): # Create a Zone Import @@ -3342,8 +3342,8 @@ class CentralServiceTest(CentralTestCase): zone_import = self.central_service.get_zone_import( self.admin_context, zone_import.id) - self.assertEqual(zone_import['id'], zone_import.id) - self.assertEqual(zone_import['status'], zone_import.status) + self.assertEqual(zone_import.id, zone_import['id']) + self.assertEqual(zone_import.status, zone_import['status']) self.assertEqual('COMPLETE', zone_import.status) def test_update_zone_import(self):