Fix argument order in assertEqual to (expect, obs)
assertEqual expects that the arguments provided to it should be (expected, observed). If a particular order is kept as a convention, then it helps to provide a cleaner message to the developer if Unit Tests fail. There are several Unit Test files where the arguments for assertEqual have been swapped. Change-Id: I6fdedcde67f76883257d35d1b812ab8a0e3e0733 Related-Bug: #1259292
This commit is contained in:
parent
9fe583fcb2
commit
63d1b2a590
@ -714,7 +714,7 @@ class TestCase(base.BaseTestCase):
|
||||
attempts += 1
|
||||
|
||||
if not errorok:
|
||||
self.assertEqual(zone_import.status, 'COMPLETE')
|
||||
self.assertEqual('COMPLETE', zone_import.status)
|
||||
|
||||
def _ensure_interface(self, interface, implementation):
|
||||
for name in interface.__abstractmethods__:
|
||||
|
@ -84,7 +84,7 @@ class KeystoneContextMiddlewareTest(ApiTestCase):
|
||||
# Process the request
|
||||
response = app(request)
|
||||
|
||||
self.assertEqual(response.status_code, 401)
|
||||
self.assertEqual(401, response.status_code)
|
||||
|
||||
def test_process_unscoped_token(self):
|
||||
app = middleware.KeystoneContextMiddleware({})
|
||||
@ -101,7 +101,7 @@ class KeystoneContextMiddlewareTest(ApiTestCase):
|
||||
# Process the request
|
||||
response = app(request)
|
||||
|
||||
self.assertEqual(response.status_code, 401)
|
||||
self.assertEqual(401, response.status_code)
|
||||
|
||||
|
||||
class NoAuthContextMiddlewareTest(ApiTestCase):
|
||||
@ -149,7 +149,7 @@ class MaintenanceMiddlewareTest(ApiTestCase):
|
||||
response = app(request)
|
||||
|
||||
# Ensure request was blocked
|
||||
self.assertEqual(response.status_code, 503)
|
||||
self.assertEqual(503, response.status_code)
|
||||
|
||||
def test_process_request_enabled_reject_no_roles(self):
|
||||
self.config(maintenance_mode=True, maintenance_mode_role='admin',
|
||||
@ -164,7 +164,7 @@ class MaintenanceMiddlewareTest(ApiTestCase):
|
||||
response = app(request)
|
||||
|
||||
# Ensure request was blocked
|
||||
self.assertEqual(response.status_code, 503)
|
||||
self.assertEqual(503, response.status_code)
|
||||
|
||||
def test_process_request_enabled_reject_no_context(self):
|
||||
self.config(maintenance_mode=True, maintenance_mode_role='admin',
|
||||
@ -177,7 +177,7 @@ class MaintenanceMiddlewareTest(ApiTestCase):
|
||||
response = app(request)
|
||||
|
||||
# Ensure request was blocked
|
||||
self.assertEqual(response.status_code, 503)
|
||||
self.assertEqual(503, response.status_code)
|
||||
|
||||
def test_process_request_enabled_bypass(self):
|
||||
self.config(maintenance_mode=True, maintenance_mode_role='admin',
|
||||
@ -192,7 +192,7 @@ class MaintenanceMiddlewareTest(ApiTestCase):
|
||||
response = app(request)
|
||||
|
||||
# Ensure request was not blocked
|
||||
self.assertEqual(response, 'FakeResponse')
|
||||
self.assertEqual('FakeResponse', response)
|
||||
|
||||
|
||||
class NormalizeURIMiddlewareTest(ApiTestCase):
|
||||
@ -206,7 +206,7 @@ class NormalizeURIMiddlewareTest(ApiTestCase):
|
||||
app(request)
|
||||
|
||||
# Ensure request's PATH_INFO had the trailing slash removed.
|
||||
self.assertEqual(request.environ['PATH_INFO'], 'resource')
|
||||
self.assertEqual('resource', request.environ['PATH_INFO'])
|
||||
|
||||
def test_strip_trailing_slases_multiple(self):
|
||||
request = FakeRequest()
|
||||
@ -218,7 +218,7 @@ class NormalizeURIMiddlewareTest(ApiTestCase):
|
||||
app(request)
|
||||
|
||||
# Ensure request's PATH_INFO had the trailing slash removed.
|
||||
self.assertEqual(request.environ['PATH_INFO'], 'resource')
|
||||
self.assertEqual('resource', request.environ['PATH_INFO'])
|
||||
|
||||
|
||||
class FaultMiddlewareTest(ApiTestCase):
|
||||
@ -240,7 +240,7 @@ class FaultMiddlewareTest(ApiTestCase):
|
||||
# Process the request
|
||||
app(request)
|
||||
|
||||
self.assertEqual(mock_notifier.call_count, 1)
|
||||
self.assertEqual(1, mock_notifier.call_count)
|
||||
mock_notifier.call_args(
|
||||
ctxt,
|
||||
'dns.api.fault',
|
||||
|
@ -61,7 +61,7 @@ class ApiV1Test(ApiTestCase):
|
||||
|
||||
LOG.debug('Response Body: %r' % resp.data)
|
||||
|
||||
self.assertEqual(resp.status_code, expected_status_code)
|
||||
self.assertEqual(expected_status_code, resp.status_code)
|
||||
|
||||
try:
|
||||
resp.json = json.loads(resp.data)
|
||||
@ -79,7 +79,7 @@ class ApiV1Test(ApiTestCase):
|
||||
|
||||
LOG.debug('Response Body: %r' % resp.data)
|
||||
|
||||
self.assertEqual(resp.status_code, expected_status_code)
|
||||
self.assertEqual(expected_status_code, resp.status_code)
|
||||
|
||||
try:
|
||||
resp.json = json.loads(resp.data)
|
||||
@ -97,7 +97,7 @@ class ApiV1Test(ApiTestCase):
|
||||
|
||||
LOG.debug('Response Body: %r' % resp.data)
|
||||
|
||||
self.assertEqual(resp.status_code, expected_status_code)
|
||||
self.assertEqual(expected_status_code, resp.status_code)
|
||||
|
||||
try:
|
||||
resp.json = json.loads(resp.data)
|
||||
@ -113,6 +113,6 @@ class ApiV1Test(ApiTestCase):
|
||||
|
||||
LOG.debug('Response Body: %r' % resp.data)
|
||||
|
||||
self.assertEqual(resp.status_code, expected_status_code)
|
||||
self.assertEqual(expected_status_code, resp.status_code)
|
||||
|
||||
return resp
|
||||
|
@ -309,7 +309,7 @@ class ApiV1DomainsTest(ApiV1Test):
|
||||
self.assertEqual(response.json['id'], domain['id'])
|
||||
|
||||
self.assertIn('email', response.json)
|
||||
self.assertEqual(response.json['email'], 'prefix-%s' % domain['email'])
|
||||
self.assertEqual('prefix-%s' % domain['email'], response.json['email'])
|
||||
|
||||
def test_update_domain_junk(self):
|
||||
# Create a domain
|
||||
|
@ -388,11 +388,11 @@ class ApiV1RecordsTest(ApiV1Test):
|
||||
data=fixture)
|
||||
|
||||
self.assertIn('id', response.json)
|
||||
self.assertEqual(response.json['type'], fixture['type'])
|
||||
self.assertEqual(response.json['name'], fixture['name'])
|
||||
self.assertEqual(fixture['type'], response.json['type'])
|
||||
self.assertEqual(fixture['name'], response.json['name'])
|
||||
|
||||
self.assertEqual(response.json['priority'], fixture['priority'])
|
||||
self.assertEqual(response.json['data'], fixture['data'])
|
||||
self.assertEqual(fixture['priority'], response.json['priority'])
|
||||
self.assertEqual(fixture['data'], response.json['data'])
|
||||
|
||||
def test_create_invalid_data_srv_record(self):
|
||||
recordset_fixture = self.get_recordset_fixture(
|
||||
@ -559,10 +559,10 @@ class ApiV1RecordsTest(ApiV1Test):
|
||||
data=data)
|
||||
|
||||
self.assertIn('id', response.json)
|
||||
self.assertEqual(response.json['id'], record['id'])
|
||||
self.assertEqual(response.json['data'], record['data'])
|
||||
self.assertEqual(response.json['type'], self.recordset['type'])
|
||||
self.assertEqual(response.json['ttl'], 100)
|
||||
self.assertEqual(record['id'], response.json['id'])
|
||||
self.assertEqual(record['data'], response.json['data'])
|
||||
self.assertEqual(self.recordset['type'], response.json['type'])
|
||||
self.assertEqual(100, response.json['ttl'])
|
||||
|
||||
def test_update_record_junk(self):
|
||||
# Create a record
|
||||
|
@ -57,10 +57,10 @@ class APIV2ZoneImportExportTest(ApiV2TestCase):
|
||||
url = '/zones/tasks/imports/%s' % import_id
|
||||
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.json['status'], 'ERROR')
|
||||
self.assertEqual('ERROR', response.json['status'])
|
||||
origin_msg = ("The $ORIGIN statement is required and must be the"
|
||||
" first statement in the zonefile.")
|
||||
self.assertEqual(response.json['message'], origin_msg)
|
||||
self.assertEqual(origin_msg, response.json['message'])
|
||||
|
||||
def test_missing_soa(self):
|
||||
fixture = self.get_zonefile_fixture(variant='nosoa')
|
||||
@ -74,9 +74,9 @@ class APIV2ZoneImportExportTest(ApiV2TestCase):
|
||||
url = '/zones/tasks/imports/%s' % import_id
|
||||
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.json['status'], 'ERROR')
|
||||
self.assertEqual('ERROR', response.json['status'])
|
||||
origin_msg = ("Malformed zonefile.")
|
||||
self.assertEqual(response.json['message'], origin_msg)
|
||||
self.assertEqual(origin_msg, response.json['message'])
|
||||
|
||||
def test_malformed_zonefile(self):
|
||||
fixture = self.get_zonefile_fixture(variant='malformed')
|
||||
@ -90,9 +90,9 @@ class APIV2ZoneImportExportTest(ApiV2TestCase):
|
||||
url = '/zones/tasks/imports/%s' % import_id
|
||||
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.json['status'], 'ERROR')
|
||||
self.assertEqual('ERROR', response.json['status'])
|
||||
origin_msg = ("Malformed zonefile.")
|
||||
self.assertEqual(response.json['message'], origin_msg)
|
||||
self.assertEqual(origin_msg, response.json['message'])
|
||||
|
||||
def test_import_export(self):
|
||||
# Since v2 doesn't support getting records, import and export the
|
||||
|
@ -59,11 +59,11 @@ class ApiV2PoolsTest(ApiV2TestCase):
|
||||
self.assertIsNone(response.json['updated_at'])
|
||||
self.assertEqual(response.json['name'], fixture['name'])
|
||||
self.assertEqual(
|
||||
response.json['description'], fixture['description'])
|
||||
fixture['description'], response.json['description'])
|
||||
self.assertEqual(
|
||||
response.json['attributes'], fixture['attributes'])
|
||||
fixture['attributes'], response.json['attributes'])
|
||||
self.assertEqual(
|
||||
response.json['ns_records'], fixture['ns_records'])
|
||||
fixture['ns_records'], response.json['ns_records'])
|
||||
|
||||
def test_create_pool_validation(self):
|
||||
# NOTE: The schemas should be tested separatly to the API. So we
|
||||
|
@ -33,7 +33,7 @@ class StorageQuotaTest(tests.TestCase):
|
||||
|
||||
quota = self.quota.set_quota(context, 'tenant_id', 'domains', 1500)
|
||||
|
||||
self.assertEqual(quota, {'domains': 1500})
|
||||
self.assertEqual({'domains': 1500}, quota)
|
||||
|
||||
# Drop into the storage layer directly to ensure the quota was created
|
||||
# successfully
|
||||
@ -44,9 +44,9 @@ class StorageQuotaTest(tests.TestCase):
|
||||
|
||||
quota = self.quota.storage.find_quota(context, criterion)
|
||||
|
||||
self.assertEqual(quota['tenant_id'], 'tenant_id')
|
||||
self.assertEqual(quota['resource'], 'domains')
|
||||
self.assertEqual(quota['hard_limit'], 1500)
|
||||
self.assertEqual('tenant_id', quota['tenant_id'])
|
||||
self.assertEqual('domains', quota['resource'])
|
||||
self.assertEqual(1500, quota['hard_limit'])
|
||||
|
||||
def test_set_quota_update(self):
|
||||
context = self.get_admin_context()
|
||||
@ -67,9 +67,9 @@ class StorageQuotaTest(tests.TestCase):
|
||||
|
||||
quota = self.quota.storage.find_quota(context, criterion)
|
||||
|
||||
self.assertEqual(quota['tenant_id'], 'tenant_id')
|
||||
self.assertEqual(quota['resource'], 'domains')
|
||||
self.assertEqual(quota['hard_limit'], 1234)
|
||||
self.assertEqual('tenant_id', quota['tenant_id'])
|
||||
self.assertEqual('domains', quota['resource'])
|
||||
self.assertEqual(1234, quota['hard_limit'])
|
||||
|
||||
def test_reset_quotas(self):
|
||||
context = self.get_admin_context()
|
||||
|
@ -114,9 +114,9 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['tenant_id'], self.admin_context.tenant)
|
||||
self.assertEqual(result['resource'], values['resource'])
|
||||
self.assertEqual(result['hard_limit'], values['hard_limit'])
|
||||
self.assertEqual(self.admin_context.tenant, result['tenant_id'])
|
||||
self.assertEqual(values['resource'], result['resource'])
|
||||
self.assertEqual(values['hard_limit'], result['hard_limit'])
|
||||
|
||||
def test_create_quota_duplicate(self):
|
||||
# Create the initial quota
|
||||
@ -160,11 +160,11 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_quotas(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['tenant_id'], quota_one['tenant_id'])
|
||||
self.assertEqual(results[0]['resource'], quota_one['resource'])
|
||||
self.assertEqual(results[0]['hard_limit'], quota_one['hard_limit'])
|
||||
self.assertEqual(quota_one['tenant_id'], results[0]['tenant_id'])
|
||||
self.assertEqual(quota_one['resource'], results[0]['resource'])
|
||||
self.assertEqual(quota_one['hard_limit'], results[0]['hard_limit'])
|
||||
|
||||
criterion = dict(
|
||||
tenant_id=quota_two['tenant_id'],
|
||||
@ -173,20 +173,20 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_quotas(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['tenant_id'], quota_two['tenant_id'])
|
||||
self.assertEqual(results[0]['resource'], quota_two['resource'])
|
||||
self.assertEqual(results[0]['hard_limit'], quota_two['hard_limit'])
|
||||
self.assertEqual(quota_two['tenant_id'], results[0]['tenant_id'])
|
||||
self.assertEqual(quota_two['resource'], results[0]['resource'])
|
||||
self.assertEqual(quota_two['hard_limit'], results[0]['hard_limit'])
|
||||
|
||||
def test_get_quota(self):
|
||||
# Create a quota
|
||||
expected = self.create_quota()
|
||||
actual = self.storage.get_quota(self.admin_context, expected['id'])
|
||||
|
||||
self.assertEqual(actual['tenant_id'], expected['tenant_id'])
|
||||
self.assertEqual(actual['resource'], expected['resource'])
|
||||
self.assertEqual(actual['hard_limit'], expected['hard_limit'])
|
||||
self.assertEqual(expected['tenant_id'], actual['tenant_id'])
|
||||
self.assertEqual(expected['resource'], actual['resource'])
|
||||
self.assertEqual(expected['hard_limit'], actual['hard_limit'])
|
||||
|
||||
def test_get_quota_missing(self):
|
||||
with testtools.ExpectedException(exceptions.QuotaNotFound):
|
||||
@ -204,9 +204,9 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.find_quota(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['tenant_id'], quota_one['tenant_id'])
|
||||
self.assertEqual(result['resource'], quota_one['resource'])
|
||||
self.assertEqual(result['hard_limit'], quota_one['hard_limit'])
|
||||
self.assertEqual(quota_one['tenant_id'], result['tenant_id'])
|
||||
self.assertEqual(quota_one['resource'], result['resource'])
|
||||
self.assertEqual(quota_one['hard_limit'], result['hard_limit'])
|
||||
|
||||
criterion = dict(
|
||||
tenant_id=quota_two['tenant_id'],
|
||||
@ -215,9 +215,9 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.find_quota(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['tenant_id'], quota_two['tenant_id'])
|
||||
self.assertEqual(result['resource'], quota_two['resource'])
|
||||
self.assertEqual(result['hard_limit'], quota_two['hard_limit'])
|
||||
self.assertEqual(quota_two['tenant_id'], result['tenant_id'])
|
||||
self.assertEqual(quota_two['resource'], result['resource'])
|
||||
self.assertEqual(quota_two['hard_limit'], result['hard_limit'])
|
||||
|
||||
def test_find_quota_criterion_missing(self):
|
||||
expected = self.create_quota()
|
||||
@ -286,10 +286,10 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['name'], values['name'])
|
||||
self.assertEqual(result['algorithm'], values['algorithm'])
|
||||
self.assertEqual(result['secret'], values['secret'])
|
||||
self.assertEqual(result['scope'], values['scope'])
|
||||
self.assertEqual(values['name'], result['name'])
|
||||
self.assertEqual(values['algorithm'], result['algorithm'])
|
||||
self.assertEqual(values['secret'], result['secret'])
|
||||
self.assertEqual(values['scope'], result['scope'])
|
||||
|
||||
def test_create_tsigkey_duplicate(self):
|
||||
# Create the Initial TsigKey
|
||||
@ -334,9 +334,9 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_tsigkeys(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['name'], tsigkey_one['name'])
|
||||
self.assertEqual(tsigkey_one['name'], results[0]['name'])
|
||||
|
||||
criterion = dict(
|
||||
name=tsigkey_two['name']
|
||||
@ -344,9 +344,9 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_tsigkeys(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['name'], tsigkey_two['name'])
|
||||
self.assertEqual(tsigkey_two['name'], results[0]['name'])
|
||||
|
||||
def test_get_tsigkey(self):
|
||||
# Create a tsigkey
|
||||
@ -354,10 +354,10 @@ class StorageTestCase(object):
|
||||
|
||||
actual = self.storage.get_tsigkey(self.admin_context, expected['id'])
|
||||
|
||||
self.assertEqual(actual['name'], expected['name'])
|
||||
self.assertEqual(actual['algorithm'], expected['algorithm'])
|
||||
self.assertEqual(actual['secret'], expected['secret'])
|
||||
self.assertEqual(actual['scope'], expected['scope'])
|
||||
self.assertEqual(expected['name'], actual['name'])
|
||||
self.assertEqual(expected['algorithm'], actual['algorithm'])
|
||||
self.assertEqual(expected['secret'], actual['secret'])
|
||||
self.assertEqual(expected['scope'], actual['scope'])
|
||||
|
||||
def test_get_tsigkey_missing(self):
|
||||
with testtools.ExpectedException(exceptions.TsigKeyNotFound):
|
||||
@ -458,10 +458,10 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.get_tenant(context, 1)
|
||||
|
||||
self.assertEqual(result['id'], 1)
|
||||
self.assertEqual(result['domain_count'], 2)
|
||||
self.assertEqual(sorted(result['domains']),
|
||||
[domain_1['name'], domain_2['name']])
|
||||
self.assertEqual(1, result['id'])
|
||||
self.assertEqual(2, result['domain_count'])
|
||||
self.assertEqual([domain_1['name'], domain_2['name']],
|
||||
sorted(result['domains']))
|
||||
|
||||
def test_count_tenants(self):
|
||||
context = self.get_admin_context()
|
||||
@ -473,7 +473,7 @@ class StorageTestCase(object):
|
||||
|
||||
# in the beginning, there should be nothing
|
||||
tenants = self.storage.count_tenants(context)
|
||||
self.assertEqual(tenants, 0)
|
||||
self.assertEqual(0, tenants)
|
||||
|
||||
# create 2 domains with 2 tenants
|
||||
self.create_domain(fixture=0, context=one_context, tenant_id=1)
|
||||
@ -485,7 +485,7 @@ class StorageTestCase(object):
|
||||
self.storage.delete_domain(context, domain['id'])
|
||||
|
||||
tenants = self.storage.count_tenants(context)
|
||||
self.assertEqual(tenants, 2)
|
||||
self.assertEqual(2, tenants)
|
||||
|
||||
def test_count_tenants_none_result(self):
|
||||
rp = mock.Mock()
|
||||
@ -493,7 +493,7 @@ class StorageTestCase(object):
|
||||
with mock.patch.object(self.storage.session, 'execute',
|
||||
return_value=rp):
|
||||
tenants = self.storage.count_tenants(self.admin_context)
|
||||
self.assertEqual(tenants, 0)
|
||||
self.assertEqual(0, tenants)
|
||||
|
||||
# Domain Tests
|
||||
def test_create_domain(self):
|
||||
@ -512,10 +512,10 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['tenant_id'], self.admin_context.tenant)
|
||||
self.assertEqual(result['name'], values['name'])
|
||||
self.assertEqual(result['email'], values['email'])
|
||||
self.assertEqual(result['pool_id'], pool_id)
|
||||
self.assertEqual(self.admin_context.tenant, result['tenant_id'])
|
||||
self.assertEqual(values['name'], result['name'])
|
||||
self.assertEqual(values['email'], result['email'])
|
||||
self.assertEqual(pool_id, result['pool_id'])
|
||||
self.assertIn('status', result)
|
||||
|
||||
def test_create_domain_duplicate(self):
|
||||
@ -558,10 +558,10 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_domains(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['name'], domain_one['name'])
|
||||
self.assertEqual(results[0]['email'], domain_one['email'])
|
||||
self.assertEqual(domain_one['name'], results[0]['name'])
|
||||
self.assertEqual(domain_one['email'], results[0]['email'])
|
||||
self.assertIn('status', domain_one)
|
||||
|
||||
criterion = dict(
|
||||
@ -572,8 +572,8 @@ class StorageTestCase(object):
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
self.assertEqual(results[0]['name'], domain_two['name'])
|
||||
self.assertEqual(results[0]['email'], domain_two['email'])
|
||||
self.assertEqual(domain_two['name'], results[0]['name'])
|
||||
self.assertEqual(domain_two['email'], results[0]['email'])
|
||||
self.assertIn('status', domain_two)
|
||||
|
||||
def test_find_domains_all_tenants(self):
|
||||
@ -613,8 +613,8 @@ class StorageTestCase(object):
|
||||
expected = self.create_domain()
|
||||
actual = self.storage.get_domain(self.admin_context, expected['id'])
|
||||
|
||||
self.assertEqual(actual['name'], expected['name'])
|
||||
self.assertEqual(actual['email'], expected['email'])
|
||||
self.assertEqual(expected['name'], actual['name'])
|
||||
self.assertEqual(expected['email'], actual['email'])
|
||||
self.assertIn('status', actual)
|
||||
|
||||
def test_get_domain_missing(self):
|
||||
@ -641,8 +641,8 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.find_domain(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['name'], domain_one['name'])
|
||||
self.assertEqual(result['email'], domain_one['email'])
|
||||
self.assertEqual(domain_one['name'], result['name'])
|
||||
self.assertEqual(domain_one['email'], result['email'])
|
||||
self.assertIn('status', domain_one)
|
||||
|
||||
criterion = dict(
|
||||
@ -651,8 +651,8 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.find_domain(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['name'], domain_two['name'])
|
||||
self.assertEqual(result['email'], domain_two['email'])
|
||||
self.assertEqual(domain_two['name'], result['name'])
|
||||
self.assertEqual(domain_two['email'], result['email'])
|
||||
self.assertIn('status', domain_one)
|
||||
self.assertIn('status', domain_two)
|
||||
|
||||
@ -686,7 +686,7 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.find_domain(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['name'], domain['name'])
|
||||
self.assertEqual(domain['name'], result['name'])
|
||||
|
||||
def test_find_domain_criterion_greaterthan(self):
|
||||
domain = self.create_domain()
|
||||
@ -708,7 +708,7 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.find_domain(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['name'], domain['name'])
|
||||
self.assertEqual(domain['name'], result['name'])
|
||||
|
||||
def test_update_domain(self):
|
||||
# Create a domain
|
||||
@ -758,7 +758,7 @@ class StorageTestCase(object):
|
||||
def test_count_domains(self):
|
||||
# in the beginning, there should be nothing
|
||||
domains = self.storage.count_domains(self.admin_context)
|
||||
self.assertEqual(domains, 0)
|
||||
self.assertEqual(0, domains)
|
||||
|
||||
# Create a single domain
|
||||
self.create_domain()
|
||||
@ -767,7 +767,7 @@ class StorageTestCase(object):
|
||||
domains = self.storage.count_domains(self.admin_context)
|
||||
|
||||
# well, did we get 1?
|
||||
self.assertEqual(domains, 1)
|
||||
self.assertEqual(1, domains)
|
||||
|
||||
def test_count_domains_none_result(self):
|
||||
rp = mock.Mock()
|
||||
@ -775,7 +775,7 @@ class StorageTestCase(object):
|
||||
with mock.patch.object(self.storage.session, 'execute',
|
||||
return_value=rp):
|
||||
domains = self.storage.count_domains(self.admin_context)
|
||||
self.assertEqual(domains, 0)
|
||||
self.assertEqual(0, domains)
|
||||
|
||||
def test_create_recordset(self):
|
||||
domain = self.create_domain()
|
||||
@ -794,8 +794,8 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['name'], values['name'])
|
||||
self.assertEqual(result['type'], values['type'])
|
||||
self.assertEqual(values['name'], result['name'])
|
||||
self.assertEqual(values['type'], result['type'])
|
||||
|
||||
def test_create_recordset_duplicate(self):
|
||||
domain = self.create_domain()
|
||||
@ -885,7 +885,7 @@ class StorageTestCase(object):
|
||||
results = self.storage.find_recordsets(self.admin_context,
|
||||
criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
criterion = dict(
|
||||
domain_id=domain['id'],
|
||||
@ -895,7 +895,7 @@ class StorageTestCase(object):
|
||||
results = self.storage.find_recordsets(self.admin_context,
|
||||
criterion)
|
||||
|
||||
self.assertEqual(len(results), 2)
|
||||
self.assertEqual(2, len(results))
|
||||
|
||||
def test_find_recordsets_criterion_wildcard(self):
|
||||
domain = self.create_domain()
|
||||
@ -912,7 +912,7 @@ class StorageTestCase(object):
|
||||
results = self.storage.find_recordsets(self.admin_context, criterion)
|
||||
|
||||
# Should be 3, as SOA and NS recordsets are automiatcally created
|
||||
self.assertEqual(len(results), 3)
|
||||
self.assertEqual(3, len(results))
|
||||
|
||||
def test_find_recordsets_with_records(self):
|
||||
domain = self.create_domain()
|
||||
@ -955,8 +955,8 @@ class StorageTestCase(object):
|
||||
|
||||
actual = self.storage.get_recordset(self.admin_context, expected['id'])
|
||||
|
||||
self.assertEqual(actual['name'], expected['name'])
|
||||
self.assertEqual(actual['type'], expected['type'])
|
||||
self.assertEqual(expected['name'], actual['name'])
|
||||
self.assertEqual(expected['type'], actual['type'])
|
||||
|
||||
def test_get_recordset_with_records(self):
|
||||
domain = self.create_domain()
|
||||
@ -994,8 +994,8 @@ class StorageTestCase(object):
|
||||
|
||||
actual = self.storage.find_recordset(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(actual['name'], expected['name'])
|
||||
self.assertEqual(actual['type'], expected['type'])
|
||||
self.assertEqual(expected['name'], actual['name'])
|
||||
self.assertEqual(expected['type'], actual['type'])
|
||||
|
||||
def test_find_recordset_criterion_missing(self):
|
||||
domain = self.create_domain()
|
||||
@ -1180,7 +1180,7 @@ class StorageTestCase(object):
|
||||
def test_count_recordsets(self):
|
||||
# in the beginning, there should be nothing
|
||||
recordsets = self.storage.count_recordsets(self.admin_context)
|
||||
self.assertEqual(recordsets, 0)
|
||||
self.assertEqual(0, recordsets)
|
||||
|
||||
# Create a single domain & recordset
|
||||
domain = self.create_domain()
|
||||
@ -1188,12 +1188,12 @@ class StorageTestCase(object):
|
||||
|
||||
# we should have 3 recordsets now, including SOA & NS
|
||||
recordsets = self.storage.count_recordsets(self.admin_context)
|
||||
self.assertEqual(recordsets, 3)
|
||||
self.assertEqual(3, recordsets)
|
||||
|
||||
# Delete the domain, we should be back to 0 recordsets
|
||||
self.storage.delete_domain(self.admin_context, domain.id)
|
||||
recordsets = self.storage.count_recordsets(self.admin_context)
|
||||
self.assertEqual(recordsets, 0)
|
||||
self.assertEqual(0, recordsets)
|
||||
|
||||
def test_count_recordsets_none_result(self):
|
||||
rp = mock.Mock()
|
||||
@ -1201,7 +1201,7 @@ class StorageTestCase(object):
|
||||
with mock.patch.object(self.storage.session, 'execute',
|
||||
return_value=rp):
|
||||
recordsets = self.storage.count_recordsets(self.admin_context)
|
||||
self.assertEqual(recordsets, 0)
|
||||
self.assertEqual(0, recordsets)
|
||||
|
||||
def test_create_record(self):
|
||||
domain = self.create_domain()
|
||||
@ -1220,8 +1220,8 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['hash'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['tenant_id'], self.admin_context.tenant)
|
||||
self.assertEqual(result['data'], values['data'])
|
||||
self.assertEqual(self.admin_context.tenant, result['tenant_id'])
|
||||
self.assertEqual(values['data'], result['data'])
|
||||
self.assertIn('status', result)
|
||||
|
||||
def test_create_record_duplicate(self):
|
||||
@ -1292,7 +1292,7 @@ class StorageTestCase(object):
|
||||
)
|
||||
|
||||
results = self.storage.find_records(self.admin_context, criterion)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
criterion = dict(
|
||||
domain_id=domain['id'],
|
||||
@ -1301,7 +1301,7 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_records(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 2)
|
||||
self.assertEqual(2, len(results))
|
||||
|
||||
def test_find_records_criterion_wildcard(self):
|
||||
domain = self.create_domain()
|
||||
@ -1319,7 +1319,7 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_records(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
def test_find_records_all_tenants(self):
|
||||
# Create two contexts with different tenant_id's
|
||||
@ -1370,7 +1370,7 @@ class StorageTestCase(object):
|
||||
|
||||
actual = self.storage.get_record(self.admin_context, expected['id'])
|
||||
|
||||
self.assertEqual(actual['data'], expected['data'])
|
||||
self.assertEqual(expected['data'], actual['data'])
|
||||
self.assertIn('status', actual)
|
||||
|
||||
def test_get_record_missing(self):
|
||||
@ -1392,7 +1392,7 @@ class StorageTestCase(object):
|
||||
|
||||
actual = self.storage.find_record(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(actual['data'], expected['data'])
|
||||
self.assertEqual(expected['data'], actual['data'])
|
||||
self.assertIn('status', actual)
|
||||
|
||||
def test_find_record_criterion_missing(self):
|
||||
@ -1468,7 +1468,7 @@ class StorageTestCase(object):
|
||||
def test_count_records(self):
|
||||
# in the beginning, there should be nothing
|
||||
records = self.storage.count_records(self.admin_context)
|
||||
self.assertEqual(records, 0)
|
||||
self.assertEqual(0, records)
|
||||
|
||||
# Create a single domain & record
|
||||
domain = self.create_domain()
|
||||
@ -1477,12 +1477,12 @@ class StorageTestCase(object):
|
||||
|
||||
# we should have 3 records now, including NS and SOA
|
||||
records = self.storage.count_records(self.admin_context)
|
||||
self.assertEqual(records, 3)
|
||||
self.assertEqual(3, records)
|
||||
|
||||
# Delete the domain, we should be back to 0 records
|
||||
self.storage.delete_domain(self.admin_context, domain.id)
|
||||
records = self.storage.count_records(self.admin_context)
|
||||
self.assertEqual(records, 0)
|
||||
self.assertEqual(0, records)
|
||||
|
||||
def test_count_records_none_result(self):
|
||||
rp = mock.Mock()
|
||||
@ -1490,7 +1490,7 @@ class StorageTestCase(object):
|
||||
with mock.patch.object(self.storage.session, 'execute',
|
||||
return_value=rp):
|
||||
records = self.storage.count_records(self.admin_context)
|
||||
self.assertEqual(records, 0)
|
||||
self.assertEqual(0, records)
|
||||
|
||||
def test_ping(self):
|
||||
pong = self.storage.ping(self.admin_context)
|
||||
@ -1519,8 +1519,8 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
self.assertIsNotNone(result['version'])
|
||||
self.assertEqual(result['name'], values['name'])
|
||||
self.assertEqual(result['description'], values['description'])
|
||||
self.assertEqual(values['name'], result['name'])
|
||||
self.assertEqual(values['description'], result['description'])
|
||||
|
||||
def test_create_tld_with_duplicate(self):
|
||||
# Create the First Tld
|
||||
@ -1560,9 +1560,9 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_tlds(self.admin_context,
|
||||
criterion_one)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['name'], tld_one['name'])
|
||||
self.assertEqual(tld_one['name'], results[0]['name'])
|
||||
|
||||
criterion_two = dict(name=tld_two['name'])
|
||||
|
||||
@ -1570,14 +1570,14 @@ class StorageTestCase(object):
|
||||
criterion_two)
|
||||
self.assertEqual(len(results), 1)
|
||||
|
||||
self.assertEqual(results[0]['name'], tld_two['name'])
|
||||
self.assertEqual(tld_two['name'], results[0]['name'])
|
||||
|
||||
def test_get_tld(self):
|
||||
# Create a tld
|
||||
expected = self.create_tld()
|
||||
actual = self.storage.get_tld(self.admin_context, expected['id'])
|
||||
|
||||
self.assertEqual(actual['name'], expected['name'])
|
||||
self.assertEqual(expected['name'], actual['name'])
|
||||
|
||||
def test_get_tld_missing(self):
|
||||
with testtools.ExpectedException(exceptions.TldNotFound):
|
||||
@ -1595,14 +1595,14 @@ class StorageTestCase(object):
|
||||
result = self.storage.find_tld(self.admin_context, criterion)
|
||||
|
||||
# Assert names match
|
||||
self.assertEqual(result['name'], tld_one['name'])
|
||||
self.assertEqual(tld_one['name'], result['name'])
|
||||
|
||||
# Repeat with tld_two
|
||||
criterion = dict(name=tld_two['name'])
|
||||
|
||||
result = self.storage.find_tld(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['name'], tld_two['name'])
|
||||
self.assertEqual(tld_two['name'], result['name'])
|
||||
|
||||
def test_find_tld_criterion_missing(self):
|
||||
expected = self.create_tld()
|
||||
@ -1675,8 +1675,8 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['version'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['pattern'], values['pattern'])
|
||||
self.assertEqual(result['description'], values['description'])
|
||||
self.assertEqual(values['pattern'], result['pattern'])
|
||||
self.assertEqual(values['description'], result['description'])
|
||||
|
||||
def test_create_blacklist_duplicate(self):
|
||||
# Create the initial Blacklist
|
||||
@ -1716,22 +1716,22 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_blacklists(self.admin_context,
|
||||
criterion)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0]['pattern'], blacklist_one['pattern'])
|
||||
self.assertEqual(1, len(results))
|
||||
self.assertEqual(blacklist_one['pattern'], results[0]['pattern'])
|
||||
|
||||
# Verify blacklist_two
|
||||
criterion = dict(pattern=blacklist_two['pattern'])
|
||||
|
||||
results = self.storage.find_blacklists(self.admin_context,
|
||||
criterion)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0]['pattern'], blacklist_two['pattern'])
|
||||
self.assertEqual(1, len(results))
|
||||
self.assertEqual(blacklist_two['pattern'], results[0]['pattern'])
|
||||
|
||||
def test_get_blacklist(self):
|
||||
expected = self.create_blacklist(fixture=0)
|
||||
actual = self.storage.get_blacklist(self.admin_context, expected['id'])
|
||||
|
||||
self.assertEqual(actual['pattern'], expected['pattern'])
|
||||
self.assertEqual(expected['pattern'], actual['pattern'])
|
||||
|
||||
def test_get_blacklist_missing(self):
|
||||
with testtools.ExpectedException(exceptions.BlacklistNotFound):
|
||||
@ -1746,13 +1746,13 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.find_blacklist(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['pattern'], blacklist_one['pattern'])
|
||||
self.assertEqual(blacklist_one['pattern'], result['pattern'])
|
||||
|
||||
criterion = dict(pattern=blacklist_two['pattern'])
|
||||
|
||||
result = self.storage.find_blacklist(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['pattern'], blacklist_two['pattern'])
|
||||
self.assertEqual(blacklist_two['pattern'], result['pattern'])
|
||||
|
||||
def test_find_blacklist_criterion_missing(self):
|
||||
expected = self.create_blacklist(fixture=0)
|
||||
@ -1823,9 +1823,9 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['name'], values['name'])
|
||||
self.assertEqual(result['tenant_id'], values['tenant_id'])
|
||||
self.assertEqual(result['provisioner'], values['provisioner'])
|
||||
self.assertEqual(values['name'], result['name'])
|
||||
self.assertEqual(values['tenant_id'], result['tenant_id'])
|
||||
self.assertEqual(values['provisioner'], result['provisioner'])
|
||||
|
||||
def test_create_pool_duplicate(self):
|
||||
# Create the first pool
|
||||
@ -1875,27 +1875,27 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_pools(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['name'], pool_one['name'])
|
||||
self.assertEqual(results[0]['provisioner'], pool_one['provisioner'])
|
||||
self.assertEqual(pool_one['name'], results[0]['name'])
|
||||
self.assertEqual(pool_one['provisioner'], results[0]['provisioner'])
|
||||
|
||||
criterion = dict(name=pool_two['name'])
|
||||
|
||||
results = self.storage.find_pools(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['name'], pool_two['name'])
|
||||
self.assertEqual(results[0]['provisioner'], pool_two['provisioner'])
|
||||
self.assertEqual(pool_two['name'], results[0]['name'])
|
||||
self.assertEqual(pool_two['provisioner'], results[0]['provisioner'])
|
||||
|
||||
def test_get_pool(self):
|
||||
# Create a pool
|
||||
expected = self.create_pool()
|
||||
actual = self.storage.get_pool(self.admin_context, expected['id'])
|
||||
|
||||
self.assertEqual(actual['name'], expected['name'])
|
||||
self.assertEqual(actual['provisioner'], expected['provisioner'])
|
||||
self.assertEqual(expected['name'], actual['name'])
|
||||
self.assertEqual(expected['provisioner'], actual['provisioner'])
|
||||
|
||||
def test_get_pool_missing(self):
|
||||
with testtools.ExpectedException(exceptions.PoolNotFound):
|
||||
@ -1910,15 +1910,15 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.find_pool(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['name'], pool_one['name'])
|
||||
self.assertEqual(result['provisioner'], pool_one['provisioner'])
|
||||
self.assertEqual(pool_one['name'], result['name'])
|
||||
self.assertEqual(pool_one['provisioner'], result['provisioner'])
|
||||
|
||||
criterion = dict(name=pool_two['name'])
|
||||
|
||||
result = self.storage.find_pool(self.admin_context, criterion)
|
||||
|
||||
self.assertEqual(result['name'], pool_two['name'])
|
||||
self.assertEqual(result['provisioner'], pool_two['provisioner'])
|
||||
self.assertEqual(pool_two['name'], result['name'])
|
||||
self.assertEqual(pool_two['provisioner'], result['provisioner'])
|
||||
|
||||
def test_find_pool_criterion_missing(self):
|
||||
expected = self.create_pool()
|
||||
@ -1983,7 +1983,7 @@ class StorageTestCase(object):
|
||||
result = self.storage.create_zone_transfer_request(
|
||||
self.admin_context, objects.ZoneTransferRequest.from_dict(values))
|
||||
|
||||
self.assertEqual(result['tenant_id'], self.admin_context.tenant)
|
||||
self.assertEqual(self.admin_context.tenant, result['tenant_id'])
|
||||
self.assertIn('status', result)
|
||||
|
||||
def test_create_zone_transfer_request_scoped(self):
|
||||
@ -2005,15 +2005,15 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['tenant_id'], self.admin_context.tenant)
|
||||
self.assertEqual(result['target_tenant_id'], tenant_2_context.tenant)
|
||||
self.assertEqual(self.admin_context.tenant, result['tenant_id'])
|
||||
self.assertEqual(tenant_2_context.tenant, result['target_tenant_id'])
|
||||
self.assertIn('status', result)
|
||||
|
||||
stored_ztr = self.storage.get_zone_transfer_request(
|
||||
tenant_2_context, result.id)
|
||||
|
||||
self.assertEqual(stored_ztr['tenant_id'], self.admin_context.tenant)
|
||||
self.assertEqual(result['id'], stored_ztr['id'])
|
||||
self.assertEqual(self.admin_context.tenant, stored_ztr['tenant_id'])
|
||||
self.assertEqual(stored_ztr['id'], result['id'])
|
||||
|
||||
with testtools.ExpectedException(
|
||||
exceptions.ZoneTransferRequestNotFound):
|
||||
@ -2034,7 +2034,7 @@ class StorageTestCase(object):
|
||||
|
||||
requests = self.storage.find_zone_transfer_requests(
|
||||
self.admin_context, {"tenant_id": self.admin_context.tenant})
|
||||
self.assertEqual(len(requests), 1)
|
||||
self.assertEqual(1, len(requests))
|
||||
|
||||
def test_delete_zone_transfer_request(self):
|
||||
domain = self.create_domain()
|
||||
@ -2055,7 +2055,7 @@ class StorageTestCase(object):
|
||||
zt_request.description = 'New description'
|
||||
result = self.storage.update_zone_transfer_request(
|
||||
self.admin_context, zt_request)
|
||||
self.assertEqual(result.description, 'New description')
|
||||
self.assertEqual('New description', result.description)
|
||||
|
||||
def test_get_zone_transfer_request(self):
|
||||
domain = self.create_domain()
|
||||
@ -2063,8 +2063,8 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.get_zone_transfer_request(
|
||||
self.admin_context, zt_request.id)
|
||||
self.assertEqual(result.id, zt_request.id)
|
||||
self.assertEqual(result.domain_id, zt_request.domain_id)
|
||||
self.assertEqual(zt_request.id, result.id)
|
||||
self.assertEqual(zt_request.domain_id, result.domain_id)
|
||||
|
||||
def test_create_zone_transfer_accept(self):
|
||||
domain = self.create_domain()
|
||||
@ -2083,7 +2083,7 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['tenant_id'], self.admin_context.tenant)
|
||||
self.assertEqual(self.admin_context.tenant, result['tenant_id'])
|
||||
self.assertIn('status', result)
|
||||
|
||||
def test_find_zone_transfer_accepts(self):
|
||||
@ -2101,7 +2101,7 @@ class StorageTestCase(object):
|
||||
|
||||
accepts = self.storage.find_zone_transfer_accepts(
|
||||
self.admin_context, {"tenant_id": self.admin_context.tenant})
|
||||
self.assertEqual(len(accepts), 1)
|
||||
self.assertEqual(1, len(accepts))
|
||||
|
||||
def test_find_zone_transfer_accept(self):
|
||||
domain = self.create_domain()
|
||||
@ -2118,7 +2118,7 @@ class StorageTestCase(object):
|
||||
|
||||
accept = self.storage.find_zone_transfer_accept(
|
||||
self.admin_context, {"id": result.id})
|
||||
self.assertEqual(accept.id, result.id)
|
||||
self.assertEqual(result.id, accept.id)
|
||||
|
||||
def test_transfer_zone_ownership(self):
|
||||
tenant_1_context = self.get_context(tenant='1')
|
||||
@ -2145,9 +2145,9 @@ class StorageTestCase(object):
|
||||
saved_record = self.storage.get_record(
|
||||
admin_context, record.id)
|
||||
|
||||
self.assertEqual(saved_domain.tenant_id, tenant_2_context.tenant)
|
||||
self.assertEqual(saved_recordset.tenant_id, tenant_2_context.tenant)
|
||||
self.assertEqual(saved_record.tenant_id, tenant_2_context.tenant)
|
||||
self.assertEqual(tenant_2_context.tenant, saved_domain.tenant_id)
|
||||
self.assertEqual(tenant_2_context.tenant, saved_recordset.tenant_id)
|
||||
self.assertEqual(tenant_2_context.tenant, saved_record.tenant_id)
|
||||
|
||||
def test_delete_zone_transfer_accept(self):
|
||||
domain = self.create_domain()
|
||||
@ -2170,7 +2170,7 @@ class StorageTestCase(object):
|
||||
zt_accept.status = 'COMPLETE'
|
||||
result = self.storage.update_zone_transfer_accept(
|
||||
self.admin_context, zt_accept)
|
||||
self.assertEqual(result.status, 'COMPLETE')
|
||||
self.assertEqual('COMPLETE', result.status)
|
||||
|
||||
def test_get_zone_transfer_accept(self):
|
||||
domain = self.create_domain()
|
||||
@ -2179,8 +2179,8 @@ class StorageTestCase(object):
|
||||
|
||||
result = self.storage.get_zone_transfer_accept(
|
||||
self.admin_context, zt_accept.id)
|
||||
self.assertEqual(result.id, zt_accept.id)
|
||||
self.assertEqual(result.domain_id, zt_accept.domain_id)
|
||||
self.assertEqual(zt_accept.id, result.id)
|
||||
self.assertEqual(zt_accept.domain_id, result.domain_id)
|
||||
|
||||
# PoolAttribute tests
|
||||
def test_create_pool_attribute(self):
|
||||
@ -2199,9 +2199,9 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['version'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
|
||||
self.assertEqual(result['pool_id'], values['pool_id'])
|
||||
self.assertEqual(result['key'], values['key'])
|
||||
self.assertEqual(result['value'], values['value'])
|
||||
self.assertEqual(values['pool_id'], result['pool_id'])
|
||||
self.assertEqual(values['key'], result['key'])
|
||||
self.assertEqual(values['value'], result['value'])
|
||||
|
||||
def test_find_pool_attribute(self):
|
||||
# Verify that there are no Pool Attributes created
|
||||
@ -2236,10 +2236,10 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_pool_attributes(self.admin_context,
|
||||
criterion)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0]['pool_id'], pool_attribute_one['pool_id'])
|
||||
self.assertEqual(results[0]['key'], pool_attribute_one['key'])
|
||||
self.assertEqual(results[0]['value'], pool_attribute_one['value'])
|
||||
self.assertEqual(1, len(results))
|
||||
self.assertEqual(pool_attribute_one['pool_id'], results[0]['pool_id'])
|
||||
self.assertEqual(pool_attribute_one['key'], results[0]['key'])
|
||||
self.assertEqual(pool_attribute_one['value'], results[0]['value'])
|
||||
|
||||
# Verify pool_attribute_two
|
||||
criterion = dict(key=pool_attribute_two['key'])
|
||||
@ -2247,19 +2247,19 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_pool_attributes(self.admin_context,
|
||||
criterion)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(results[0]['pool_id'], pool_attribute_two['pool_id'])
|
||||
self.assertEqual(results[0]['key'], pool_attribute_two['key'])
|
||||
self.assertEqual(results[0]['value'], pool_attribute_two['value'])
|
||||
self.assertEqual(1, len(results))
|
||||
self.assertEqual(pool_attribute_two['pool_id'], results[0]['pool_id'])
|
||||
self.assertEqual(pool_attribute_two['key'], results[0]['key'])
|
||||
self.assertEqual(pool_attribute_two['value'], results[0]['value'])
|
||||
|
||||
def test_get_pool_attribute(self):
|
||||
expected = self.create_pool_attribute(fixture=0)
|
||||
actual = self.storage.get_pool_attribute(self.admin_context,
|
||||
expected['id'])
|
||||
|
||||
self.assertEqual(actual['pool_id'], expected['pool_id'])
|
||||
self.assertEqual(actual['key'], expected['key'])
|
||||
self.assertEqual(actual['value'], expected['value'])
|
||||
self.assertEqual(expected['pool_id'], actual['pool_id'])
|
||||
self.assertEqual(expected['key'], actual['key'])
|
||||
self.assertEqual(expected['value'], actual['value'])
|
||||
|
||||
def test_get_pool_attribute_missing(self):
|
||||
with testtools.ExpectedException(exceptions.PoolAttributeNotFound):
|
||||
@ -2275,18 +2275,18 @@ class StorageTestCase(object):
|
||||
result = self.storage.find_pool_attribute(self.admin_context,
|
||||
criterion)
|
||||
|
||||
self.assertEqual(result['pool_id'], pool_attribute_one['pool_id'])
|
||||
self.assertEqual(result['key'], pool_attribute_one['key'])
|
||||
self.assertEqual(result['value'], pool_attribute_one['value'])
|
||||
self.assertEqual(pool_attribute_one['pool_id'], result['pool_id'])
|
||||
self.assertEqual(pool_attribute_one['key'], result['key'])
|
||||
self.assertEqual(pool_attribute_one['value'], result['value'])
|
||||
|
||||
criterion = dict(key=pool_attribute_two['key'])
|
||||
|
||||
result = self.storage.find_pool_attribute(self.admin_context,
|
||||
criterion)
|
||||
|
||||
self.assertEqual(result['pool_id'], pool_attribute_two['pool_id'])
|
||||
self.assertEqual(result['key'], pool_attribute_two['key'])
|
||||
self.assertEqual(result['value'], pool_attribute_two['value'])
|
||||
self.assertEqual(pool_attribute_two['pool_id'], result['pool_id'])
|
||||
self.assertEqual(pool_attribute_two['key'], result['key'])
|
||||
self.assertEqual(pool_attribute_two['value'], result['value'])
|
||||
|
||||
def test_find_pool_attribute_criterion_missing(self):
|
||||
expected = self.create_pool_attribute(fixture=0)
|
||||
@ -2368,7 +2368,7 @@ class StorageTestCase(object):
|
||||
self.assertIsNotNone(result['created_at'])
|
||||
self.assertIsNone(result['updated_at'])
|
||||
self.assertIsNotNone(result['version'])
|
||||
self.assertEqual(result['status'], values['status'])
|
||||
self.assertEqual(values['status'], result['status'])
|
||||
self.assertIsNone(result['domain_id'])
|
||||
self.assertIsNone(result['message'])
|
||||
|
||||
@ -2402,17 +2402,17 @@ class StorageTestCase(object):
|
||||
|
||||
results = self.storage.find_zone_imports(self.admin_context,
|
||||
criterion_one)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['status'], zone_import_one['status'])
|
||||
self.assertEqual(zone_import_one['status'], results[0]['status'])
|
||||
|
||||
criterion_two = dict(status=zone_import_two['status'])
|
||||
|
||||
results = self.storage.find_zone_imports(self.admin_context,
|
||||
criterion_two)
|
||||
self.assertEqual(len(results), 1)
|
||||
self.assertEqual(1, len(results))
|
||||
|
||||
self.assertEqual(results[0]['status'], zone_import_two['status'])
|
||||
self.assertEqual(zone_import_two['status'], results[0]['status'])
|
||||
|
||||
def test_get_zone_import(self):
|
||||
# Create a zone_import
|
||||
@ -2420,7 +2420,7 @@ class StorageTestCase(object):
|
||||
actual = self.storage.get_zone_import(self.admin_context,
|
||||
expected['id'])
|
||||
|
||||
self.assertEqual(actual['status'], expected['status'])
|
||||
self.assertEqual(expected['status'], actual['status'])
|
||||
|
||||
def test_get_zone_import_missing(self):
|
||||
with testtools.ExpectedException(exceptions.ZoneImportNotFound):
|
||||
|
@ -77,7 +77,7 @@ class DeletedDomainPurgeTest(TaskTest):
|
||||
)
|
||||
|
||||
pxy = self.central_service.storage.session.execute(query)
|
||||
self.assertEqual(pxy.rowcount, 1)
|
||||
self.assertEqual(1, pxy.rowcount)
|
||||
return domain
|
||||
|
||||
def _create_deleted_zones(self):
|
||||
@ -104,4 +104,4 @@ class DeletedDomainPurgeTest(TaskTest):
|
||||
|
||||
zones = self._fetch_all_domains()
|
||||
LOG.info("Number of zones: %d", len(zones))
|
||||
self.assertEqual(len(zones), 7)
|
||||
self.assertEqual(7, len(zones))
|
||||
|
@ -158,21 +158,21 @@ class SSLMiddlewareTest(oslotest.base.BaseTestCase):
|
||||
self.request.environ['HTTP_X_FORWARDED_PROTO'] = 'poo'
|
||||
self.app(self.request)
|
||||
|
||||
self.assertEqual(self.request.environ['wsgi.url_scheme'], 'poo')
|
||||
self.assertEqual('poo', self.request.environ['wsgi.url_scheme'])
|
||||
|
||||
def test_http_header(self):
|
||||
self.request.environ['wsgi.url_scheme'] = ''
|
||||
self.request.environ['HTTP_X_FORWARDED_PROTO'] = 'http'
|
||||
self.app(self.request)
|
||||
|
||||
self.assertEqual(self.request.environ['wsgi.url_scheme'], 'http')
|
||||
self.assertEqual('http', self.request.environ['wsgi.url_scheme'])
|
||||
|
||||
def test_https_header(self):
|
||||
self.request.environ['wsgi.url_scheme'] = 'http'
|
||||
self.request.environ['HTTP_X_FORWARDED_PROTO'] = 'https'
|
||||
self.app(self.request)
|
||||
|
||||
self.assertEqual(self.request.environ['wsgi.url_scheme'], 'https')
|
||||
self.assertEqual('https', self.request.environ['wsgi.url_scheme'])
|
||||
|
||||
def test_override_proto(self):
|
||||
self.request.environ['wsgi.url_scheme'] = 'http'
|
||||
@ -181,4 +181,4 @@ class SSLMiddlewareTest(oslotest.base.BaseTestCase):
|
||||
|
||||
self.app(self.request)
|
||||
|
||||
self.assertEqual(self.request.environ['wsgi.url_scheme'], 'poo')
|
||||
self.assertEqual('poo', self.request.environ['wsgi.url_scheme'])
|
||||
|
@ -102,8 +102,8 @@ class RoObject(RwObject):
|
||||
class MockObjectTest(base.BaseTestCase):
|
||||
def test_ro(self):
|
||||
o = RoObject(a=1)
|
||||
self.assertEqual(o['a'], 1)
|
||||
self.assertEqual(o.a, 1)
|
||||
self.assertEqual(1, o['a'])
|
||||
self.assertEqual(1, o.a)
|
||||
with testtools.ExpectedException(NotImplementedError):
|
||||
o.a = 2
|
||||
with testtools.ExpectedException(NotImplementedError):
|
||||
@ -115,14 +115,14 @@ class MockObjectTest(base.BaseTestCase):
|
||||
|
||||
def test_rw(self):
|
||||
o = RwObject(a=1)
|
||||
self.assertEqual(o['a'], 1)
|
||||
self.assertEqual(o.a, 1)
|
||||
self.assertEqual(1, o['a'])
|
||||
self.assertEqual(1, o.a)
|
||||
o.a = 2
|
||||
self.assertEqual(o.a, 2)
|
||||
self.assertEqual(o['a'], 2)
|
||||
self.assertEqual(2, o.a)
|
||||
self.assertEqual(2, o['a'])
|
||||
o['a'] = 3
|
||||
self.assertEqual(o.a, 3)
|
||||
self.assertEqual(o['a'], 3)
|
||||
self.assertEqual(3, o.a)
|
||||
self.assertEqual(3, o['a'])
|
||||
with testtools.ExpectedException(NotImplementedError):
|
||||
o.new = 1
|
||||
with testtools.ExpectedException(NotImplementedError):
|
||||
@ -396,10 +396,10 @@ class CentralServiceTestCase(CentralBasic):
|
||||
|
||||
ctx, md, rset = self.service._create_recordset_in_storage.call_args[0]
|
||||
|
||||
self.assertEqual(rset.name, 'example.org.')
|
||||
self.assertEqual(rset.type, 'SOA')
|
||||
self.assertEqual(rset.type, 'SOA')
|
||||
self.assertEqual(len(rset.records.objects), 1)
|
||||
self.assertEqual('example.org.', rset.name)
|
||||
self.assertEqual('SOA', rset.type)
|
||||
self.assertEqual('SOA', rset.type)
|
||||
self.assertEqual(1, len(rset.records.objects))
|
||||
self.assertTrue(rset.records.objects[0].managed)
|
||||
|
||||
def test__create_domain_in_storage(self):
|
||||
@ -417,10 +417,10 @@ class CentralServiceTestCase(CentralBasic):
|
||||
domain = self.service._create_domain_in_storage(
|
||||
self.context, MockDomain()
|
||||
)
|
||||
self.assertEqual(domain.status, 'PENDING')
|
||||
self.assertEqual(domain.action, 'CREATE')
|
||||
self.assertEqual('PENDING', domain.status)
|
||||
self.assertEqual('CREATE', domain.action)
|
||||
ctx, domain, hostnames = self.service._create_ns.call_args[0]
|
||||
self.assertEqual(hostnames, ['host_foo'])
|
||||
self.assertEqual(['host_foo'], hostnames)
|
||||
|
||||
@unittest.expectedFailure # FIXME
|
||||
def test_create_domain_forbidden(self):
|
||||
@ -438,7 +438,7 @@ class CentralServiceTestCase(CentralBasic):
|
||||
parent_domain = self.service._is_subdomain(
|
||||
self.context, 'bogusname', 1234)
|
||||
|
||||
# self.assertEqual(parent_domain, '')
|
||||
# self.assertEqual('', parent_domain)
|
||||
self.service.check_for_tlds = False
|
||||
with testtools.ExpectedException(exceptions.Forbidden):
|
||||
self.service.create_domain(self.context, MockDomain())
|
||||
@ -541,8 +541,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
'CNAME',
|
||||
)
|
||||
self.assertEqual(
|
||||
e.message,
|
||||
'CNAME recordsets may not be created at the zone apex'
|
||||
'CNAME recordsets may not be created at the zone apex',
|
||||
e.message
|
||||
)
|
||||
|
||||
def test_is_valid_recordset_placement_failing(self):
|
||||
@ -559,8 +559,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
'A',
|
||||
)
|
||||
self.assertEqual(
|
||||
e.message,
|
||||
'CNAME recordsets may not share a name with any other records'
|
||||
'CNAME recordsets may not share a name with any other records',
|
||||
e.message
|
||||
)
|
||||
|
||||
def test_is_valid_recordset_placement_failing_2(self):
|
||||
@ -578,8 +578,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
'A',
|
||||
)
|
||||
self.assertEqual(
|
||||
e.message,
|
||||
'CNAME recordsets may not share a name with any other records'
|
||||
'CNAME recordsets may not share a name with any other records',
|
||||
e.message
|
||||
)
|
||||
|
||||
def test_is_valid_recordset_placement(self):
|
||||
@ -643,7 +643,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.service.storage.find_domains = Mock()
|
||||
self.service._is_superdomain(self.context, 'example.org.', '1')
|
||||
_class_self_, crit = self.service.storage.find_domains.call_args[0]
|
||||
self.assertEqual(crit, {'name': '%.example.org.', 'pool_id': '1'})
|
||||
self.assertEqual({'name': '%.example.org.', 'pool_id': '1'}, crit)
|
||||
|
||||
@patch('designate.central.service.utils.increment_serial')
|
||||
def FIXME_test__increment_domain_serial(self, utils_inc_ser):
|
||||
@ -661,9 +661,9 @@ class CentralDomainTestCase(CentralBasic):
|
||||
ctx, zone, rset = \
|
||||
self.service._create_recordset_in_storage.call_args[0]
|
||||
|
||||
self.assertEqual(rset.name, 'example.org.')
|
||||
self.assertEqual(rset.type, 'NS')
|
||||
self.assertEqual(len(rset.records), 3)
|
||||
self.assertEqual('example.org.', rset.name)
|
||||
self.assertEqual('NS', rset.type)
|
||||
self.assertEqual(3, len(rset.records))
|
||||
self.assertTrue(rset.records[0].managed)
|
||||
|
||||
def test__create_ns_skip(self):
|
||||
@ -709,7 +709,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.service._update_recordset_in_storage.call_args[0]
|
||||
self.assertEqual(len(rset.records), 1)
|
||||
self.assertTrue(rset.records[0].managed)
|
||||
self.assertEqual(rset.records[0].data.name, 'bar')
|
||||
self.assertEqual('bar', rset.records[0].data.name)
|
||||
|
||||
def test__add_ns_with_other_ns_rs(self):
|
||||
self.service._update_recordset_in_storage = Mock()
|
||||
@ -730,9 +730,9 @@ class CentralDomainTestCase(CentralBasic):
|
||||
)
|
||||
ctx, zone, rset = \
|
||||
self.service._update_recordset_in_storage.call_args[0]
|
||||
self.assertEqual(len(rset.records), 1)
|
||||
self.assertEqual(1, len(rset.records))
|
||||
self.assertTrue(rset.records[0].managed)
|
||||
self.assertEqual(rset.records[0].data.name, 'bar')
|
||||
self.assertEqual('bar', rset.records[0].data.name)
|
||||
|
||||
def test_create_domain_no_servers(self):
|
||||
self.service._enforce_domain_quota = Mock()
|
||||
@ -786,7 +786,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
type='PRIMARY'
|
||||
)
|
||||
)
|
||||
self.assertEqual(out.name, 'example.com.')
|
||||
self.assertEqual('example.com.', out.name)
|
||||
|
||||
def test_get_domain(self):
|
||||
self.service.storage.get_domain.return_value = RoObject(
|
||||
@ -795,9 +795,9 @@ class CentralDomainTestCase(CentralBasic):
|
||||
)
|
||||
self.service.get_domain(self.context, '1')
|
||||
n, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(target['domain_id'], '1')
|
||||
self.assertEqual(target['domain_name'], 'foo')
|
||||
self.assertEqual(target['tenant_id'], '2')
|
||||
self.assertEqual('1', target['domain_id'])
|
||||
self.assertEqual('foo', target['domain_name'])
|
||||
self.assertEqual('2', target['tenant_id'])
|
||||
|
||||
def test_get_domain_servers(self):
|
||||
self.service.storage.get_domain.return_value = RoObject(
|
||||
@ -812,7 +812,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
)
|
||||
|
||||
ctx, pool_id = self.service.storage.get_pool.call_args[0]
|
||||
self.assertEqual(pool_id, '3')
|
||||
self.assertEqual('3', pool_id)
|
||||
|
||||
def test_find_domains(self):
|
||||
self.context = RoObject(tenant='t')
|
||||
@ -821,7 +821,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
assert self.service.storage.find_domains.called
|
||||
pcheck, ctx, target = \
|
||||
designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(pcheck, 'find_domains')
|
||||
self.assertEqual('find_domains', pcheck)
|
||||
|
||||
def test_find_domain(self):
|
||||
self.context = RoObject(tenant='t')
|
||||
@ -830,7 +830,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
assert self.service.storage.find_domain.called
|
||||
pcheck, ctx, target = \
|
||||
designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(pcheck, 'find_domain')
|
||||
self.assertEqual('find_domain', pcheck)
|
||||
|
||||
def test_delete_domain_has_subdomain(self):
|
||||
self.context.abandon = False
|
||||
@ -844,7 +844,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
|
||||
pcheck, ctx, target = \
|
||||
designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(pcheck, 'delete_domain')
|
||||
self.assertEqual('delete_domain', pcheck)
|
||||
|
||||
def test_delete_domain_abandon(self):
|
||||
self.service.storage.get_domain.return_value = RoObject(
|
||||
@ -865,7 +865,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
assert not self.service.pool_manager_api.delete_domain.called
|
||||
pcheck, ctx, target = \
|
||||
designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(pcheck, 'abandon_domain')
|
||||
self.assertEqual('abandon_domain', pcheck)
|
||||
|
||||
def test_delete_domain(self):
|
||||
self.context.abandon = False
|
||||
@ -885,11 +885,11 @@ class CentralDomainTestCase(CentralBasic):
|
||||
assert designate.central.service.policy.check.called
|
||||
ctx, deleted_dom = \
|
||||
self.service.pool_manager_api.delete_domain.call_args[0]
|
||||
self.assertEqual(deleted_dom.name, 'foo')
|
||||
self.assertEqual(out.name, 'foo')
|
||||
self.assertEqual('foo', deleted_dom.name)
|
||||
self.assertEqual('foo', out.name)
|
||||
pcheck, ctx, target = \
|
||||
designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(pcheck, 'delete_domain')
|
||||
self.assertEqual('delete_domain', pcheck)
|
||||
|
||||
def test__delete_domain_in_storage(self):
|
||||
self.service._delete_domain_in_storage(
|
||||
@ -897,8 +897,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
RwObject(action='', status=''),
|
||||
)
|
||||
d = self.service.storage.update_domain.call_args[0][1]
|
||||
self.assertEqual(d.action, 'DELETE')
|
||||
self.assertEqual(d.status, 'PENDING')
|
||||
self.assertEqual('DELETE', d.action)
|
||||
self.assertEqual('PENDING', d.status)
|
||||
|
||||
def test__xfr_domain_secondary(self):
|
||||
self.service.storage.get_domain.return_value = RoObject(
|
||||
@ -916,8 +916,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
|
||||
assert designate.central.service.policy.check.called
|
||||
self.assertEqual(
|
||||
designate.central.service.policy.check.call_args[0][0],
|
||||
'xfr_domain'
|
||||
'xfr_domain',
|
||||
designate.central.service.policy.check.call_args[0][0]
|
||||
)
|
||||
|
||||
def test__xfr_domain_not_secondary(self):
|
||||
@ -937,7 +937,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.context,
|
||||
criterion=None
|
||||
)
|
||||
self.assertEqual(reports, [{'zones': 1, 'records': 2, 'tenants': 3}])
|
||||
self.assertEqual([{'zones': 1, 'records': 2, 'tenants': 3}], reports)
|
||||
|
||||
def test_count_report_zones(self):
|
||||
self.service.count_domains = Mock(return_value=1)
|
||||
@ -947,7 +947,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.context,
|
||||
criterion='zones'
|
||||
)
|
||||
self.assertEqual(reports, [{'zones': 1}])
|
||||
self.assertEqual([{'zones': 1}], reports)
|
||||
|
||||
def test_count_report_records(self):
|
||||
self.service.count_domains = Mock(return_value=1)
|
||||
@ -957,7 +957,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.context,
|
||||
criterion='records'
|
||||
)
|
||||
self.assertEqual(reports, [{'records': 2}])
|
||||
self.assertEqual([{'records': 2}], reports)
|
||||
|
||||
def test_count_report_tenants(self):
|
||||
self.service.count_domains = Mock(return_value=1)
|
||||
@ -967,7 +967,7 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.context,
|
||||
criterion='tenants'
|
||||
)
|
||||
self.assertEqual(reports, [{'tenants': 3}])
|
||||
self.assertEqual([{'tenants': 3}], reports)
|
||||
|
||||
def test_count_report_not_found(self):
|
||||
self.service.count_domains = Mock(return_value=1)
|
||||
@ -990,8 +990,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
|
||||
assert designate.central.service.policy.check.called
|
||||
self.assertEqual(
|
||||
designate.central.service.policy.check.call_args[0][0],
|
||||
'touch_domain'
|
||||
'touch_domain',
|
||||
designate.central.service.policy.check.call_args[0][0]
|
||||
)
|
||||
|
||||
def test_get_recordset_not_found(self):
|
||||
@ -1024,17 +1024,16 @@ class CentralDomainTestCase(CentralBasic):
|
||||
'2',
|
||||
)
|
||||
self.assertEqual(
|
||||
designate.central.service.policy.check.call_args[0][0],
|
||||
'get_recordset'
|
||||
'get_recordset',
|
||||
designate.central.service.policy.check.call_args[0][0]
|
||||
)
|
||||
t, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(t, 'get_recordset')
|
||||
self.assertEqual(target, {
|
||||
self.assertEqual('get_recordset', t)
|
||||
self.assertEqual({
|
||||
'domain_id': '1',
|
||||
'domain_name': 'example.org.',
|
||||
'recordset_id': '3',
|
||||
'tenant_id': '2'
|
||||
})
|
||||
'tenant_id': '2'}, target)
|
||||
|
||||
def test_find_recordsets(self):
|
||||
self.context = Mock()
|
||||
@ -1042,8 +1041,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.service.find_recordsets(self.context)
|
||||
assert self.service.storage.find_recordsets.called
|
||||
n, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(n, 'find_recordsets')
|
||||
self.assertEqual(target, {'tenant_id': 't'})
|
||||
self.assertEqual('find_recordsets', n)
|
||||
self.assertEqual({'tenant_id': 't'}, target)
|
||||
|
||||
def test_find_recordset(self):
|
||||
self.context = Mock()
|
||||
@ -1051,8 +1050,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.service.find_recordset(self.context)
|
||||
assert self.service.storage.find_recordset.called
|
||||
n, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(n, 'find_recordset')
|
||||
self.assertEqual(target, {'tenant_id': 't'})
|
||||
self.assertEqual('find_recordset', n)
|
||||
self.assertEqual({'tenant_id': 't'}, target)
|
||||
|
||||
def test_update_recordset_fail_on_changes(self):
|
||||
self.service.storage.get_domain.return_value = RoObject()
|
||||
@ -1115,14 +1114,13 @@ class CentralDomainTestCase(CentralBasic):
|
||||
assert self.service._update_recordset_in_storage.called
|
||||
|
||||
n, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(n, 'update_recordset')
|
||||
self.assertEqual(target, {
|
||||
self.assertEqual('update_recordset', n)
|
||||
self.assertEqual({
|
||||
'domain_id': '1',
|
||||
'domain_name': 'example.org.',
|
||||
'domain_type': 'foo',
|
||||
'recordset_id': '1',
|
||||
'tenant_id': '2',
|
||||
})
|
||||
'tenant_id': '2'}, target)
|
||||
|
||||
def test__update_recordset_in_storage(self):
|
||||
recordset = Mock()
|
||||
@ -1144,21 +1142,21 @@ class CentralDomainTestCase(CentralBasic):
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
self.service._is_valid_recordset_name.call_args[0][2],
|
||||
'n'
|
||||
'n',
|
||||
self.service._is_valid_recordset_name.call_args[0][2]
|
||||
)
|
||||
self.assertEqual(
|
||||
self.service._is_valid_recordset_placement.call_args[0][2:],
|
||||
('n', 't', 'i')
|
||||
('n', 't', 'i'),
|
||||
self.service._is_valid_recordset_placement.call_args[0][2:]
|
||||
)
|
||||
self.assertEqual(
|
||||
'n',
|
||||
self.service._is_valid_recordset_placement_subdomain.
|
||||
call_args[0][2],
|
||||
'n'
|
||||
call_args[0][2]
|
||||
)
|
||||
self.assertEqual(
|
||||
self.service._is_valid_ttl.call_args[0][1],
|
||||
90
|
||||
90,
|
||||
self.service._is_valid_ttl.call_args[0][1]
|
||||
)
|
||||
assert self.service.storage.update_recordset.called
|
||||
assert self.service._update_domain_in_storage.called
|
||||
@ -1188,17 +1186,17 @@ class CentralDomainTestCase(CentralBasic):
|
||||
)
|
||||
|
||||
self.assertEqual(
|
||||
self.service._is_valid_recordset_name.call_args[0][2],
|
||||
'n'
|
||||
'n',
|
||||
self.service._is_valid_recordset_name.call_args[0][2]
|
||||
)
|
||||
self.assertEqual(
|
||||
self.service._is_valid_recordset_placement.call_args[0][2:],
|
||||
('n', 't', 'i')
|
||||
('n', 't', 'i'),
|
||||
self.service._is_valid_recordset_placement.call_args[0][2:]
|
||||
)
|
||||
self.assertEqual(
|
||||
'n',
|
||||
self.service._is_valid_recordset_placement_subdomain.
|
||||
call_args[0][2],
|
||||
'n'
|
||||
call_args[0][2]
|
||||
)
|
||||
assert not self.service._is_valid_ttl.called
|
||||
assert not self.service._update_domain_in_storage.called
|
||||
@ -1300,10 +1298,10 @@ class CentralDomainTestCase(CentralBasic):
|
||||
assert self.service.storage.update_recordset.called
|
||||
assert self.service.storage.delete_recordset.called
|
||||
rs = self.service.storage.update_recordset.call_args[0][1]
|
||||
self.assertEqual(len(rs.records), 1)
|
||||
self.assertEqual(rs.records[0].action, 'DELETE')
|
||||
self.assertEqual(rs.records[0].status, 'PENDING')
|
||||
self.assertEqual(rs.records[0].serial, 1)
|
||||
self.assertEqual(1, len(rs.records))
|
||||
self.assertEqual('DELETE', rs.records[0].action)
|
||||
self.assertEqual('PENDING', rs.records[0].status)
|
||||
self.assertEqual(1, rs.records[0].serial)
|
||||
|
||||
def test__delete_recordset_in_storage_no_increment_serial(self):
|
||||
self.service._update_domain_in_storage = Mock()
|
||||
@ -1326,11 +1324,11 @@ class CentralDomainTestCase(CentralBasic):
|
||||
def test_count_recordset(self):
|
||||
self.service.count_recordsets(self.context)
|
||||
n, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(n, 'count_recordsets')
|
||||
self.assertEqual(target, {'tenant_id': None})
|
||||
self.assertEqual('count_recordsets', n)
|
||||
self.assertEqual({'tenant_id': None}, target)
|
||||
self.assertEqual(
|
||||
self.service.storage.count_recordsets.call_args[0][1],
|
||||
{}
|
||||
{},
|
||||
self.service.storage.count_recordsets.call_args[0][1]
|
||||
)
|
||||
|
||||
def test_create_record_fail_on_delete(self):
|
||||
@ -1373,15 +1371,14 @@ class CentralDomainTestCase(CentralBasic):
|
||||
assert self.service.pool_manager_api.update_domain.called
|
||||
|
||||
n, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(n, 'create_record')
|
||||
self.assertEqual(target, {
|
||||
self.assertEqual('create_record', n)
|
||||
self.assertEqual({
|
||||
'domain_id': 1,
|
||||
'domain_name': 'example.org.',
|
||||
'domain_type': 'foo',
|
||||
'recordset_id': 2,
|
||||
'recordset_name': 'rs',
|
||||
'tenant_id': '2'
|
||||
})
|
||||
'tenant_id': '2'}, target)
|
||||
|
||||
def test__create_record_in_storage(self):
|
||||
self.service._enforce_record_quota = Mock()
|
||||
@ -1397,11 +1394,11 @@ class CentralDomainTestCase(CentralBasic):
|
||||
increment_serial=False
|
||||
)
|
||||
ctx, did, rid, record = self.service.storage.create_record.call_args[0]
|
||||
self.assertEqual(did, 1)
|
||||
self.assertEqual(rid, 2)
|
||||
self.assertEqual(record.action, 'CREATE')
|
||||
self.assertEqual(record.status, 'PENDING')
|
||||
self.assertEqual(record.serial, 4)
|
||||
self.assertEqual(1, did)
|
||||
self.assertEqual(2, rid)
|
||||
self.assertEqual('CREATE', record.action)
|
||||
self.assertEqual('PENDING', record.status)
|
||||
self.assertEqual(4, record.serial)
|
||||
|
||||
def test_get_record_not_found(self):
|
||||
self.service.storage.get_domain.return_value = RoObject(
|
||||
@ -1450,19 +1447,18 @@ class CentralDomainTestCase(CentralBasic):
|
||||
)
|
||||
self.service.get_record(self.context, 1, 2, 3)
|
||||
self.assertEqual(
|
||||
designate.central.service.policy.check.call_args[0][0],
|
||||
'get_record'
|
||||
'get_record',
|
||||
designate.central.service.policy.check.call_args[0][0]
|
||||
)
|
||||
t, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(t, 'get_record')
|
||||
self.assertEqual(target, {
|
||||
self.assertEqual('get_record', t)
|
||||
self.assertEqual({
|
||||
'domain_id': 1,
|
||||
'domain_name': 'example.org.',
|
||||
'record_id': 5,
|
||||
'recordset_id': 2,
|
||||
'recordset_name': 'foo',
|
||||
'tenant_id': 2
|
||||
})
|
||||
'tenant_id': 2}, target)
|
||||
|
||||
def test_update_record_fail_on_changes(self):
|
||||
self.service.storage.get_domain.return_value = RoObject(
|
||||
@ -1536,16 +1532,15 @@ class CentralDomainTestCase(CentralBasic):
|
||||
assert self.service._update_record_in_storage.called
|
||||
|
||||
n, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(n, 'update_record')
|
||||
self.assertEqual(target, {
|
||||
self.assertEqual('update_record', n)
|
||||
self.assertEqual({
|
||||
'domain_id': '1',
|
||||
'domain_name': 'n',
|
||||
'domain_type': 't',
|
||||
'record_id': '1',
|
||||
'recordset_id': '1',
|
||||
'recordset_name': 'rsn',
|
||||
'tenant_id': 'tid'
|
||||
})
|
||||
'tenant_id': 'tid'}, target)
|
||||
|
||||
def test__update_record_in_storage(self):
|
||||
self.service._update_domain_in_storage = Mock()
|
||||
@ -1560,9 +1555,9 @@ class CentralDomainTestCase(CentralBasic):
|
||||
increment_serial=False
|
||||
)
|
||||
ctx, record = self.service.storage.update_record.call_args[0]
|
||||
self.assertEqual(record.action, 'UPDATE')
|
||||
self.assertEqual(record.status, 'PENDING')
|
||||
self.assertEqual(record.serial, 1)
|
||||
self.assertEqual('UPDATE', record.action)
|
||||
self.assertEqual('PENDING', record.status)
|
||||
self.assertEqual(1, record.serial)
|
||||
|
||||
def test_delete_record_action_delete(self):
|
||||
self.service.storage.get_domain.return_value = RoObject(
|
||||
@ -1621,16 +1616,15 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.service.delete_record(self.context, 1, 2, 3)
|
||||
|
||||
t, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(t, 'delete_record')
|
||||
self.assertEqual(target, {
|
||||
self.assertEqual('delete_record', t)
|
||||
self.assertEqual({
|
||||
'domain_id': 1,
|
||||
'domain_name': 'dn',
|
||||
'domain_type': 't',
|
||||
'record_id': 4,
|
||||
'recordset_id': 2,
|
||||
'recordset_name': 'rsn',
|
||||
'tenant_id': 'tid'
|
||||
})
|
||||
'tenant_id': 'tid'}, target)
|
||||
|
||||
def test_delete_record_fail_on_managed(self):
|
||||
self.service._delete_record_in_storage = Mock(
|
||||
@ -1668,15 +1662,15 @@ class CentralDomainTestCase(CentralBasic):
|
||||
increment_serial=False
|
||||
)
|
||||
r = self.service.storage.update_record.call_args[0][1]
|
||||
self.assertEqual(r.action, 'DELETE')
|
||||
self.assertEqual(r.status, 'PENDING')
|
||||
self.assertEqual(r.serial, 2)
|
||||
self.assertEqual('DELETE', r.action)
|
||||
self.assertEqual('PENDING', r.status)
|
||||
self.assertEqual(2, r.serial)
|
||||
|
||||
def test_count_records(self):
|
||||
self.service.count_records(self.context)
|
||||
t, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(t, 'count_records')
|
||||
self.assertEqual(target, {'tenant_id': None})
|
||||
self.assertEqual('count_records', t)
|
||||
self.assertEqual({'tenant_id': None}, target)
|
||||
|
||||
def test_sync_domains(self):
|
||||
self.service._sync_domain = Mock()
|
||||
@ -1687,8 +1681,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
|
||||
res = self.service.sync_domains(self.context)
|
||||
t, ctx = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(t, 'diagnostics_sync_domains')
|
||||
self.assertEqual(len(res), 2)
|
||||
self.assertEqual('diagnostics_sync_domains', t)
|
||||
self.assertEqual(2, len(res))
|
||||
|
||||
def test_sync_domain(self):
|
||||
self.service._sync_domain = Mock()
|
||||
@ -1701,9 +1695,9 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.service.sync_domain(self.context, 1)
|
||||
|
||||
t, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(t, 'diagnostics_sync_domain')
|
||||
self.assertEqual(target, {'tenant_id': 'tid', 'domain_id': 1,
|
||||
'domain_name': 'n'})
|
||||
self.assertEqual('diagnostics_sync_domain', t)
|
||||
self.assertEqual({'tenant_id': 'tid', 'domain_id': 1,
|
||||
'domain_name': 'n'}, target)
|
||||
|
||||
def test_sync_record(self):
|
||||
self.service.storage.get_domain.return_value = RoObject(
|
||||
@ -1718,27 +1712,26 @@ class CentralDomainTestCase(CentralBasic):
|
||||
self.service.sync_record(self.context, 1, 2, 3)
|
||||
|
||||
t, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(t, 'diagnostics_sync_record')
|
||||
self.assertEqual(target, {
|
||||
self.assertEqual('diagnostics_sync_record', t)
|
||||
self.assertEqual({
|
||||
'domain_id': 1,
|
||||
'domain_name': 'n',
|
||||
'record_id': 3,
|
||||
'recordset_id': 2,
|
||||
'recordset_name': 'n',
|
||||
'tenant_id': 'tid'
|
||||
})
|
||||
'tenant_id': 'tid'}, target)
|
||||
|
||||
def test_ping(self):
|
||||
self.service.storage.ping.return_value = True
|
||||
r = self.service.ping(self.context)
|
||||
self.assertEqual(r['backend'], {'status': None})
|
||||
self.assertEqual({'status': None}, r['backend'])
|
||||
self.assertTrue(r['status'])
|
||||
self.assertTrue(r['storage'])
|
||||
|
||||
def test_ping2(self):
|
||||
self.service.storage.ping.return_value = False
|
||||
r = self.service.ping(self.context)
|
||||
self.assertEqual(r['backend'], {'status': None})
|
||||
self.assertEqual({'status': None}, r['backend'])
|
||||
self.assertFalse(r['status'])
|
||||
self.assertFalse(r['storage'])
|
||||
|
||||
@ -1751,8 +1744,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
|
||||
fips = {}
|
||||
data, invalid = self.service._determine_floatingips(self.context, fips)
|
||||
self.assertEqual(data, {})
|
||||
self.assertEqual(invalid, [])
|
||||
self.assertEqual({}, data)
|
||||
self.assertEqual([], invalid)
|
||||
|
||||
def test__determine_floatingips_with_data(self):
|
||||
self.context = Mock()
|
||||
@ -1767,8 +1760,8 @@ class CentralDomainTestCase(CentralBasic):
|
||||
'k2': {'address': 2},
|
||||
}
|
||||
data, invalid = self.service._determine_floatingips(self.context, fips)
|
||||
self.assertEqual(len(invalid), 1)
|
||||
self.assertEqual(invalid[0].managed_tenant_id, 1)
|
||||
self.assertEqual(1, len(invalid))
|
||||
self.assertEqual(1, invalid[0].managed_tenant_id)
|
||||
self.assertEqual(data['k'], ({'address': 1}, None))
|
||||
|
||||
|
||||
@ -1793,16 +1786,16 @@ class IsSubdomainTestCase(CentralBasic):
|
||||
|
||||
def FIXME_test__is_subdomain_false2(self):
|
||||
r = self.service._is_subdomain(self.context, 'com.', '1')
|
||||
self.assertEqual(r, 'com.')
|
||||
self.assertEqual('com.', r)
|
||||
|
||||
def FIXME_test__is_subdomain_false3(self):
|
||||
r = self.service._is_subdomain(self.context, 'example.com.', '1')
|
||||
self.assertEqual(r, 'example.com.')
|
||||
self.assertEqual('example.com.', r)
|
||||
|
||||
def test__is_subdomain_false4(self):
|
||||
r = self.service._is_subdomain(self.context, 'foo.a.b.example.com.',
|
||||
'1')
|
||||
self.assertEqual(r, 'example.com.')
|
||||
self.assertEqual('example.com.', r)
|
||||
|
||||
|
||||
class CentralZoneExportTests(CentralBasic):
|
||||
@ -1840,11 +1833,11 @@ class CentralZoneExportTests(CentralBasic):
|
||||
self.context,
|
||||
'123'
|
||||
)
|
||||
self.assertEqual(out.domain_id, '123')
|
||||
self.assertEqual(out.status, 'PENDING')
|
||||
self.assertEqual(out.task_type, 'EXPORT')
|
||||
self.assertEqual('123', out.domain_id)
|
||||
self.assertEqual('PENDING', out.status)
|
||||
self.assertEqual('EXPORT', out.task_type)
|
||||
self.assertIsNone(out.message)
|
||||
self.assertEqual(out.tenant_id, 't')
|
||||
self.assertEqual('t', out.tenant_id)
|
||||
|
||||
def test_get_zone_export(self):
|
||||
self.context = Mock()
|
||||
@ -1863,14 +1856,14 @@ class CentralZoneExportTests(CentralBasic):
|
||||
n, ctx, target = designate.central.service.policy.check.call_args[0]
|
||||
|
||||
# Check arguments to policy
|
||||
self.assertEqual(target['tenant_id'], 't')
|
||||
self.assertEqual('t', target['tenant_id'])
|
||||
|
||||
# Check output
|
||||
self.assertEqual(out.domain_id, '123')
|
||||
self.assertEqual(out.status, 'PENDING')
|
||||
self.assertEqual(out.task_type, 'EXPORT')
|
||||
self.assertEqual('123', out.domain_id)
|
||||
self.assertEqual('PENDING', out.status)
|
||||
self.assertEqual('EXPORT', out.task_type)
|
||||
self.assertIsNone(out.message)
|
||||
self.assertEqual(out.tenant_id, 't')
|
||||
self.assertEqual('t', out.tenant_id)
|
||||
|
||||
def test_find_zone_exports(self):
|
||||
self.context = Mock()
|
||||
@ -1882,7 +1875,7 @@ class CentralZoneExportTests(CentralBasic):
|
||||
assert self.service.storage.find_zone_exports.called
|
||||
pcheck, ctx, target = \
|
||||
designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(pcheck, 'find_zone_exports')
|
||||
self.assertEqual('find_zone_exports', pcheck)
|
||||
|
||||
def test_delete_zone_export(self):
|
||||
self.context = Mock()
|
||||
@ -1902,13 +1895,13 @@ class CentralZoneExportTests(CentralBasic):
|
||||
|
||||
assert self.service.storage.delete_zone_export.called
|
||||
|
||||
self.assertEqual(out.domain_id, '123')
|
||||
self.assertEqual(out.status, 'PENDING')
|
||||
self.assertEqual(out.task_type, 'EXPORT')
|
||||
self.assertEqual('123', out.domain_id)
|
||||
self.assertEqual('PENDING', out.status)
|
||||
self.assertEqual('EXPORT', out.task_type)
|
||||
self.assertIsNone(out.message)
|
||||
self.assertEqual(out.tenant_id, 't')
|
||||
self.assertEqual('t', out.tenant_id)
|
||||
|
||||
assert designate.central.service.policy.check.called
|
||||
pcheck, ctx, target = \
|
||||
designate.central.service.policy.check.call_args[0]
|
||||
self.assertEqual(pcheck, 'delete_zone_export')
|
||||
self.assertEqual('delete_zone_export', pcheck)
|
||||
|
@ -29,7 +29,7 @@ class BlacklistTest(DesignateV2Test):
|
||||
def test_list_blacklists(self):
|
||||
self.useFixture(BlacklistFixture())
|
||||
resp, model = BlacklistClient.as_user('admin').list_blacklists()
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertGreater(len(model.blacklists), 0)
|
||||
|
||||
def test_create_blacklist(self):
|
||||
@ -43,19 +43,19 @@ class BlacklistTest(DesignateV2Test):
|
||||
patch_model = datagen.random_blacklist_data()
|
||||
resp, new_model = BlacklistClient.as_user('admin').patch_blacklist(
|
||||
old_model.id, patch_model)
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
resp, model = BlacklistClient.as_user('admin').get_blacklist(
|
||||
new_model.id)
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(new_model.id, old_model.id)
|
||||
self.assertEqual(new_model.pattern, model.pattern)
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertEqual(old_model.id, new_model.id)
|
||||
self.assertEqual(model.pattern, new_model.pattern)
|
||||
|
||||
def test_delete_blacklist(self):
|
||||
fixture = self.useFixture(BlacklistFixture())
|
||||
resp, model = BlacklistClient.as_user('admin').delete_blacklist(
|
||||
fixture.created_blacklist.id)
|
||||
self.assertEqual(resp.status, 204)
|
||||
self.assertEqual(204, resp.status)
|
||||
|
||||
def test_get_blacklist_404(self):
|
||||
client = BlacklistClient.as_user('admin')
|
||||
|
@ -29,7 +29,7 @@ class PoolTest(DesignateV2Test):
|
||||
def test_list_pools(self):
|
||||
self.useFixture(PoolFixture())
|
||||
resp, model = PoolClient.as_user('admin').list_pools()
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertGreater(len(model.pools), 0)
|
||||
|
||||
def test_create_pool(self):
|
||||
@ -46,17 +46,17 @@ class PoolTest(DesignateV2Test):
|
||||
patch_model = datagen.random_pool_data()
|
||||
resp, new_model = PoolClient.as_user('admin').patch_pool(
|
||||
old_model.id, patch_model)
|
||||
self.assertEqual(resp.status, 202)
|
||||
self.assertEqual(202, resp.status)
|
||||
|
||||
resp, model = PoolClient.as_user('admin').get_pool(new_model.id)
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(new_model.id, old_model.id)
|
||||
self.assertEqual(new_model.name, patch_model.name)
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertEqual(old_model.id, new_model.id)
|
||||
self.assertEqual(patch_model.name, new_model.name)
|
||||
|
||||
def test_delete_pool(self):
|
||||
pool = self.useFixture(PoolFixture()).created_pool
|
||||
resp, model = PoolClient.as_user('admin').delete_pool(pool.id)
|
||||
self.assertEqual(resp.status, 204)
|
||||
self.assertEqual(204, resp.status)
|
||||
|
||||
def test_get_pool_404(self):
|
||||
client = PoolClient.as_user('admin')
|
||||
|
@ -82,7 +82,7 @@ class RecordsetTest(DesignateV2Test):
|
||||
self.useFixture(RecordsetFixture(self.zone.id, post_model))
|
||||
resp, model = RecordsetClient.as_user('default') \
|
||||
.list_recordsets(self.zone.id)
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertGreater(len(model.recordsets), 0)
|
||||
|
||||
def assert_dns(self, model):
|
||||
@ -120,11 +120,11 @@ class RecordsetTest(DesignateV2Test):
|
||||
del put_model.name # don't try to update the name
|
||||
resp, put_resp_model = RecordsetClient.as_user('default') \
|
||||
.put_recordset(self.zone.id, recordset_id, put_model)
|
||||
self.assertEqual(resp.status, 202, "on put response")
|
||||
self.assertEqual(put_resp_model.status, "PENDING")
|
||||
self.assertEqual(put_resp_model.name, post_model.name)
|
||||
self.assertEqual(put_resp_model.records, put_model.records)
|
||||
self.assertEqual(put_resp_model.ttl, put_model.ttl)
|
||||
self.assertEqual(202, resp.status, "on put response")
|
||||
self.assertEqual("PENDING", put_resp_model.status)
|
||||
self.assertEqual(post_model.name, put_resp_model.name)
|
||||
self.assertEqual(put_model.records, put_resp_model.records)
|
||||
self.assertEqual(put_model.ttl, put_resp_model.ttl)
|
||||
|
||||
RecordsetClient.as_user('default').wait_for_recordset(
|
||||
self.zone.id, recordset_id)
|
||||
@ -134,7 +134,7 @@ class RecordsetTest(DesignateV2Test):
|
||||
|
||||
resp, delete_resp_model = RecordsetClient.as_user('default') \
|
||||
.delete_recordset(self.zone.id, recordset_id)
|
||||
self.assertEqual(resp.status, 202, "on delete response")
|
||||
self.assertEqual(202, resp.status, "on delete response")
|
||||
RecordsetClient.as_user('default').wait_for_404(
|
||||
self.zone.id, recordset_id)
|
||||
|
||||
|
@ -43,7 +43,7 @@ class TransferZoneOwnerShipTest(DesignateV2Test):
|
||||
))
|
||||
resp, model = TransferRequestClient.as_user('default') \
|
||||
.list_transfer_requests()
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
self.assertGreater(len(model.transfer_requests), 0)
|
||||
|
||||
def test_create_zone_transfer_request(self):
|
||||
@ -51,19 +51,19 @@ class TransferZoneOwnerShipTest(DesignateV2Test):
|
||||
zone=self.zone,
|
||||
post_model=datagen.random_transfer_request_data(),
|
||||
))
|
||||
self.assertEqual(fixture.post_resp.status, 201)
|
||||
self.assertEqual(fixture.transfer_request.zone_id, self.zone.id)
|
||||
self.assertEqual(201, fixture.post_resp.status)
|
||||
self.assertEqual(self.zone.id, fixture.transfer_request.zone_id)
|
||||
# todo: this fails. the zone_name is null in the POST's response, but
|
||||
# it's filled in on a subsequent get
|
||||
# self.assertEqual(fixture.transfer_request.zone_name, self.zone.name)
|
||||
self.assertEqual(fixture.transfer_request.project_id,
|
||||
TransferRequestClient.as_user(fixture.user).tenant_id)
|
||||
# self.assertEqual(self.zone.name, fixture.transfer_request.zone_name)
|
||||
self.assertEqual(TransferRequestClient.as_user(fixture.user).tenant_id,
|
||||
fixture.transfer_request.project_id)
|
||||
self.assertEqual(fixture.transfer_request.target_project_id, None)
|
||||
|
||||
# check that the zone_name is filled in
|
||||
resp, transfer_request = TransferRequestClient.as_user(fixture.user) \
|
||||
.get_transfer_request(fixture.transfer_request.id)
|
||||
self.assertEqual(transfer_request.zone_name, self.zone.name)
|
||||
self.assertEqual(self.zone.name, transfer_request.zone_name)
|
||||
|
||||
def test_view_zone_transfer_request(self):
|
||||
fixture = self.useFixture(TransferRequestFixture(
|
||||
@ -88,19 +88,19 @@ class TransferZoneOwnerShipTest(DesignateV2Test):
|
||||
target_user='alt',
|
||||
))
|
||||
|
||||
self.assertEqual(fixture.post_resp.status, 201)
|
||||
self.assertEqual(fixture.transfer_request.zone_id, self.zone.id)
|
||||
self.assertEqual(201, fixture.post_resp.status)
|
||||
self.assertEqual(self.zone.id, fixture.transfer_request.zone_id)
|
||||
# todo: the zone_name is null initially, but shows up on later gets
|
||||
# self.assertEqual(fixture.transfer_request.zone_name, self.zone.name)
|
||||
self.assertEqual(fixture.transfer_request.project_id,
|
||||
TransferRequestClient.as_user(fixture.user).tenant_id)
|
||||
self.assertEqual(fixture.transfer_request.target_project_id,
|
||||
target_project_id)
|
||||
self.assertEqual(TransferRequestClient.as_user(fixture.user).tenant_id,
|
||||
fixture.transfer_request.project_id)
|
||||
self.assertEqual(target_project_id,
|
||||
fixture.transfer_request.target_project_id)
|
||||
|
||||
resp, transfer_request = TransferRequestClient.as_user('alt')\
|
||||
.get_transfer_request(fixture.transfer_request.id)
|
||||
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
def test_view_zone_transfer_request_scoped(self):
|
||||
target_project_id = TransferRequestClient.as_user('admin').tenant_id
|
||||
@ -126,13 +126,13 @@ class TransferZoneOwnerShipTest(DesignateV2Test):
|
||||
resp, transfer_request = TransferRequestClient.as_user('admin')\
|
||||
.get_transfer_request(transfer_request.id)
|
||||
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
def test_create_zone_transfer_request_no_body(self):
|
||||
client = TransferRequestClient.as_user('default')
|
||||
resp, transfer_request = client \
|
||||
.post_transfer_request_empty_body(self.zone.id)
|
||||
self.assertEqual(resp.status, 201)
|
||||
self.assertEqual(201, resp.status)
|
||||
self.addCleanup(TransferRequestFixture.cleanup_transfer_request,
|
||||
client, transfer_request.id)
|
||||
|
||||
@ -151,7 +151,7 @@ class TransferZoneOwnerShipTest(DesignateV2Test):
|
||||
key=transfer_request.key,
|
||||
zone_transfer_request_id=transfer_request.id
|
||||
))
|
||||
self.assertEqual(resp.status, 201)
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
def test_do_zone_transfer_scoped(self):
|
||||
target_project_id = TransferRequestClient.as_user('alt').tenant_id
|
||||
@ -168,7 +168,7 @@ class TransferZoneOwnerShipTest(DesignateV2Test):
|
||||
resp, retrived_transfer_request = TransferRequestClient.\
|
||||
as_user('alt').get_transfer_request(transfer_request.id)
|
||||
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
|
||||
resp, transfer_accept = TransferAcceptClient.as_user('alt')\
|
||||
.post_transfer_accept(
|
||||
@ -176,7 +176,7 @@ class TransferZoneOwnerShipTest(DesignateV2Test):
|
||||
key=transfer_request.key,
|
||||
zone_transfer_request_id=transfer_request.id
|
||||
))
|
||||
self.assertEqual(resp.status, 201)
|
||||
self.assertEqual(201, resp.status)
|
||||
|
||||
client = ZoneClient.as_user('default')
|
||||
|
||||
@ -186,4 +186,4 @@ class TransferZoneOwnerShipTest(DesignateV2Test):
|
||||
|
||||
resp, zone = ZoneClient.as_user('alt').get_zone(self.zone.id)
|
||||
|
||||
self.assertEqual(resp.status, 200)
|
||||
self.assertEqual(200, resp.status)
|
||||
|
@ -32,8 +32,8 @@ class MetaTest(tempest_lib.base.BaseTestCase):
|
||||
mydomain.com. IN NS ns1.example.com.
|
||||
mydomain.com. IN SOA ns1.example.com. mail.mydomain.com. 1 2 3 4 5
|
||||
""")
|
||||
self.assertEqual(zone_file.origin, 'mydomain.com.')
|
||||
self.assertEqual(zone_file.ttl, 1234)
|
||||
self.assertEqual('mydomain.com.', zone_file.origin)
|
||||
self.assertEqual(1234, zone_file.ttl)
|
||||
|
||||
ns_record = ZoneFileRecord(
|
||||
name='mydomain.com.', type='NS', data='ns1.example.com.')
|
||||
@ -69,4 +69,4 @@ class MetaTest(tempest_lib.base.BaseTestCase):
|
||||
ZoneFileRecord(name="mydomain.com.", type="NS", data="ns2.a.com."),
|
||||
ZoneFileRecord(name="mydomain.com.", type="NS", data="ns3.a.com."),
|
||||
]
|
||||
self.assertEqual(records, expected)
|
||||
self.assertEqual(expected, records)
|
||||
|
Loading…
Reference in New Issue
Block a user