Fix the wrong order of parameters when using assertEqual

The first parameter is the expected value while the second
parameter is the actual value.

Change-Id: Id19a86def58b984e3a1ae3b8b6b43900a2b50c92
This commit is contained in:
Dave Chen 2015-03-06 23:11:05 +08:00
parent e2e5b93a2f
commit f1d69e66de
12 changed files with 159 additions and 160 deletions

View File

@ -330,7 +330,7 @@ class MongoCache(tests.BaseTestCase):
random_key = uuid.uuid4().hex
region.set(random_key, "dummyValue10")
# There is no proxy so can access MongoCacheBackend directly
self.assertEqual(region.backend.api.w, 1)
self.assertEqual(1, region.backend.api.w)
def test_incorrect_read_preference(self):
self.arguments['read_preference'] = 'inValidValue'
@ -338,8 +338,7 @@ class MongoCache(tests.BaseTestCase):
arguments=self.arguments)
# As per delayed loading of pymongo, read_preference value should
# still be string and NOT enum
self.assertEqual(region.backend.api.read_preference,
'inValidValue')
self.assertEqual('inValidValue', region.backend.api.read_preference)
random_key = uuid.uuid4().hex
self.assertRaises(ValueError, region.set,
@ -351,15 +350,15 @@ class MongoCache(tests.BaseTestCase):
arguments=self.arguments)
# As per delayed loading of pymongo, read_preference value should
# still be string and NOT enum
self.assertEqual(region.backend.api.read_preference,
'secondaryPreferred')
self.assertEqual('secondaryPreferred',
region.backend.api.read_preference)
random_key = uuid.uuid4().hex
region.set(random_key, "dummyValue10")
# Now as pymongo is loaded so expected read_preference value is enum.
# There is no proxy so can access MongoCacheBackend directly
self.assertEqual(region.backend.api.read_preference, 3)
self.assertEqual(3, region.backend.api.read_preference)
def test_missing_replica_set_name(self):
self.arguments['use_replica'] = True
@ -390,15 +389,15 @@ class MongoCache(tests.BaseTestCase):
region = dp_region.make_region().configure('keystone.cache.mongo',
arguments=self.arguments)
# There is no proxy so can access MongoCacheBackend directly
self.assertEqual(region.backend.api.hosts, 'localhost:27017')
self.assertEqual(region.backend.api.db_name, 'ks_cache')
self.assertEqual(region.backend.api.cache_collection, 'cache')
self.assertEqual(region.backend.api.username, 'test_user')
self.assertEqual(region.backend.api.password, 'test_password')
self.assertEqual(region.backend.api.use_replica, True)
self.assertEqual(region.backend.api.replicaset_name, 'my_replica')
self.assertEqual(region.backend.api.conn_kwargs['ssl'], False)
self.assertEqual(region.backend.api.ttl_seconds, 60)
self.assertEqual('localhost:27017', region.backend.api.hosts)
self.assertEqual('ks_cache', region.backend.api.db_name)
self.assertEqual('cache', region.backend.api.cache_collection)
self.assertEqual('test_user', region.backend.api.username)
self.assertEqual('test_password', region.backend.api.password)
self.assertEqual(True, region.backend.api.use_replica)
self.assertEqual('my_replica', region.backend.api.replicaset_name)
self.assertEqual(False, region.backend.api.conn_kwargs['ssl'])
self.assertEqual(60, region.backend.api.ttl_seconds)
def test_multiple_region_cache_configuration(self):
arguments1 = copy.copy(self.arguments)
@ -407,11 +406,11 @@ class MongoCache(tests.BaseTestCase):
region1 = dp_region.make_region().configure('keystone.cache.mongo',
arguments=arguments1)
# There is no proxy so can access MongoCacheBackend directly
self.assertEqual(region1.backend.api.hosts, 'localhost:27017')
self.assertEqual(region1.backend.api.db_name, 'ks_cache')
self.assertEqual(region1.backend.api.cache_collection, 'cache_region1')
self.assertEqual(region1.backend.api.username, 'test_user')
self.assertEqual(region1.backend.api.password, 'test_password')
self.assertEqual('localhost:27017', region1.backend.api.hosts)
self.assertEqual('ks_cache', region1.backend.api.db_name)
self.assertEqual('cache_region1', region1.backend.api.cache_collection)
self.assertEqual('test_user', region1.backend.api.username)
self.assertEqual('test_password', region1.backend.api.password)
# Should be None because of delayed initialization
self.assertIsNone(region1.backend.api._data_manipulator)
@ -431,9 +430,9 @@ class MongoCache(tests.BaseTestCase):
region2 = dp_region.make_region().configure('keystone.cache.mongo',
arguments=arguments2)
# There is no proxy so can access MongoCacheBackend directly
self.assertEqual(region2.backend.api.hosts, 'localhost:27017')
self.assertEqual(region2.backend.api.db_name, 'ks_cache')
self.assertEqual(region2.backend.api.cache_collection, 'cache_region2')
self.assertEqual('localhost:27017', region2.backend.api.hosts)
self.assertEqual('ks_cache', region2.backend.api.db_name)
self.assertEqual('cache_region2', region2.backend.api.cache_collection)
# Should be None because of delayed initialization
self.assertIsNone(region2.backend.api._data_manipulator)
@ -483,7 +482,7 @@ class MongoCache(tests.BaseTestCase):
self.arguments['mongo_ttl_seconds'] = '3600'
region = dp_region.make_region().configure('keystone.cache.mongo',
arguments=self.arguments)
self.assertEqual(region.backend.api.ttl_seconds, 3600)
self.assertEqual(3600, region.backend.api.ttl_seconds)
random_key = uuid.uuid4().hex
region.set(random_key, "dummyValue")
self.assertEqual("dummyValue", region.get(random_key))
@ -493,7 +492,7 @@ class MongoCache(tests.BaseTestCase):
self.arguments['mongo_ttl_seconds'] = 1800
region = dp_region.make_region().configure('keystone.cache.mongo',
arguments=self.arguments)
self.assertEqual(region.backend.api.ttl_seconds, 1800)
self.assertEqual(1800, region.backend.api.ttl_seconds)
random_key = uuid.uuid4().hex
region.set(random_key, "dummyValue")
self.assertEqual("dummyValue", region.get(random_key))
@ -711,10 +710,10 @@ class MongoCache(tests.BaseTestCase):
# There is no proxy so can access MongoCacheBackend directly
api_methargs = region.backend.api.meth_kwargs
self.assertEqual(api_methargs['wtimeout'], 30000)
self.assertEqual(api_methargs['j'], True)
self.assertEqual(api_methargs['continue_on_error'], True)
self.assertEqual(api_methargs['secondary_acceptable_latency_ms'], 60)
self.assertEqual(30000, api_methargs['wtimeout'])
self.assertEqual(True, api_methargs['j'])
self.assertEqual(True, api_methargs['continue_on_error'])
self.assertEqual(60, api_methargs['secondary_acceptable_latency_ms'])
random_key = uuid.uuid4().hex
region.set(random_key, "dummyValue1")

View File

@ -35,15 +35,15 @@ class ConfigTestCase(tests.TestCase):
return config_files
def test_paste_config(self):
self.assertEqual(config.find_paste_config(),
tests.dirs.etc('keystone-paste.ini'))
self.assertEqual(tests.dirs.etc('keystone-paste.ini'),
config.find_paste_config())
self.config_fixture.config(group='paste_deploy',
config_file=uuid.uuid4().hex)
self.assertRaises(exception.ConfigFileNotFound,
config.find_paste_config)
self.config_fixture.config(group='paste_deploy', config_file='')
self.assertEqual(config.find_paste_config(),
tests.dirs.etc('keystone.conf.sample'))
self.assertEqual(tests.dirs.etc('keystone.conf.sample'),
config.find_paste_config())
def test_config_default(self):
self.assertEqual('keystone.auth.plugins.password.Password',

View File

@ -40,7 +40,7 @@ class IPv6TestCase(tests.TestCase):
'::1', CONF.eventlet_server.admin_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
self.assertEqual(300, resp.status)
# Verify Public
with appserver.AppServer(paste_conf, appserver.MAIN, host="::1"):
@ -48,4 +48,4 @@ class IPv6TestCase(tests.TestCase):
'::1', CONF.eventlet_server.public_port)
conn.request('GET', '/')
resp = conn.getresponse()
self.assertEqual(resp.status, 300)
self.assertEqual(300, resp.status)

View File

@ -126,12 +126,12 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity):
alias_dereferencing='searching')
self.identity_api = identity_ldap.Identity()
user_ref = self.identity_api.get_user('alt_fake1')
self.assertEqual(user_ref['id'], 'alt_fake1')
self.assertEqual('alt_fake1', user_ref['id'])
self.config_fixture.config(group='ldap', alias_dereferencing='always')
self.identity_api = identity_ldap.Identity()
user_ref = self.identity_api.get_user('alt_fake1')
self.assertEqual(user_ref['id'], 'alt_fake1')
self.assertEqual('alt_fake1', user_ref['id'])
# FakeLDAP does not correctly process filters, so this test can only be
# run against a live LDAP server
@ -153,7 +153,7 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity):
for x in range(0, USER_COUNT):
group_refs = self.identity_api.list_groups_for_user(
test_users[x]['id'])
self.assertEqual(len(group_refs), 0)
self.assertEqual(0, len(group_refs))
for x in range(0, GROUP_COUNT):
new_group = {'domain_id': domain['id'],
@ -163,37 +163,37 @@ class LiveLDAPIdentity(test_backend_ldap.LDAPIdentity):
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(len(group_refs), x)
self.assertEqual(x, len(group_refs))
self.identity_api.add_user_to_group(
positive_user['id'],
new_group['id'])
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(len(group_refs), x + 1)
self.assertEqual(x + 1, len(group_refs))
group_refs = self.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(len(group_refs), 0)
self.assertEqual(0, len(group_refs))
self.config_fixture.config(group='ldap', group_filter='(dn=xx)')
self.reload_backends(CONF.identity.default_domain_id)
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(len(group_refs), 0)
self.assertEqual(0, len(group_refs))
group_refs = self.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(len(group_refs), 0)
self.assertEqual(0, len(group_refs))
self.config_fixture.config(group='ldap',
group_filter='(objectclass=*)')
self.reload_backends(CONF.identity.default_domain_id)
group_refs = self.identity_api.list_groups_for_user(
positive_user['id'])
self.assertEqual(len(group_refs), GROUP_COUNT)
self.assertEqual(GROUP_COUNT, len(group_refs))
group_refs = self.identity_api.list_groups_for_user(
negative_user['id'])
self.assertEqual(len(group_refs), 0)
self.assertEqual(0, len(group_refs))
def test_user_enable_attribute_mask(self):
self.config_fixture.config(

View File

@ -58,8 +58,8 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
handler = ldap_core._get_connection(CONF.ldap.url, use_pool=True)
self.assertNotEqual(type(handler.Connector),
type(fakeldap.FakeLdapPool))
self.assertEqual(type(handler.Connector),
type(ldappool.StateConnector))
self.assertEqual(type(ldappool.StateConnector),
type(handler.Connector))
def test_async_search_and_result3(self):
self.config_fixture.config(group='ldap', page_size=1)
@ -76,26 +76,26 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
return ldappool_cm.connection(who, cred)
with _get_conn() as c1: # 1
self.assertEqual(len(ldappool_cm), 1)
self.assertEqual(1, len(ldappool_cm))
self.assertTrue(c1.connected, True)
self.assertTrue(c1.active, True)
with _get_conn() as c2: # conn2
self.assertEqual(len(ldappool_cm), 2)
self.assertEqual(2, len(ldappool_cm))
self.assertTrue(c2.connected)
self.assertTrue(c2.active)
self.assertEqual(len(ldappool_cm), 2)
self.assertEqual(2, len(ldappool_cm))
# c2 went out of context, its connected but not active
self.assertTrue(c2.connected)
self.assertFalse(c2.active)
with _get_conn() as c3: # conn3
self.assertEqual(len(ldappool_cm), 2)
self.assertEqual(2, len(ldappool_cm))
self.assertTrue(c3.connected)
self.assertTrue(c3.active)
self.assertTrue(c3 is c2) # same connection is reused
self.assertTrue(c2.active)
with _get_conn() as c4: # conn4
self.assertEqual(len(ldappool_cm), 3)
self.assertEqual(3, len(ldappool_cm))
self.assertTrue(c4.connected)
self.assertTrue(c4.active)
@ -136,15 +136,15 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
user1 = self._create_user_and_authenticate(password)
auth_cm = self._get_auth_conn_pool_cm()
self.assertEqual(len(auth_cm), 1)
self.assertEqual(1, len(auth_cm))
user2 = self._create_user_and_authenticate(password)
self.assertEqual(len(auth_cm), 1)
self.assertEqual(1, len(auth_cm))
user3 = self._create_user_and_authenticate(password)
self.assertEqual(len(auth_cm), 1)
self.assertEqual(1, len(auth_cm))
user4 = self._create_user_and_authenticate(password)
self.assertEqual(len(auth_cm), 1)
self.assertEqual(1, len(auth_cm))
user5 = self._create_user_and_authenticate(password)
self.assertEqual(len(auth_cm), 1)
self.assertEqual(1, len(auth_cm))
# connection pool size remains 1 even for different user ldap bind
# as there is only one active connection at a time
@ -166,7 +166,7 @@ class LiveLDAPPoolIdentity(test_backend_ldap_pool.LdapPoolCommonTestMixin,
with auth_cm.connection(u3_dn, password) as _:
with auth_cm.connection(u4_dn, password) as _:
with auth_cm.connection(u5_dn, password) as _:
self.assertEqual(len(auth_cm), 5)
self.assertEqual(5, len(auth_cm))
_.unbind_s()
user3['password'] = new_password

View File

@ -281,8 +281,8 @@ class SqlUpgradeTests(SqlMigrateBase):
version = migration.db_version(sql.get_engine(), self.repo_path,
migrate_repo.DB_INIT_VERSION)
self.assertEqual(
version,
migrate_repo.DB_INIT_VERSION,
version,
'DB is not at version %s' % migrate_repo.DB_INIT_VERSION)
def test_two_steps_forward_one_step_back(self):
@ -1323,12 +1323,12 @@ class SqlUpgradeTests(SqlMigrateBase):
'service_id', 'url', 'extra', 'enabled',
'region_id'])
region_table = sqlalchemy.Table('region', self.metadata, autoload=True)
self.assertEqual(region_table.c.id.type.length, 255)
self.assertEqual(region_table.c.parent_region_id.type.length, 255)
self.assertEqual(255, region_table.c.id.type.length)
self.assertEqual(255, region_table.c.parent_region_id.type.length)
endpoint_table = sqlalchemy.Table('endpoint',
self.metadata,
autoload=True)
self.assertEqual(endpoint_table.c.region_id.type.length, 255)
self.assertEqual(255, endpoint_table.c.region_id.type.length)
def test_endpoint_region_downgrade_columns(self):
self.upgrade(53)
@ -1338,12 +1338,12 @@ class SqlUpgradeTests(SqlMigrateBase):
'service_id', 'url', 'extra', 'enabled',
'region'])
region_table = sqlalchemy.Table('region', self.metadata, autoload=True)
self.assertEqual(region_table.c.id.type.length, 64)
self.assertEqual(region_table.c.parent_region_id.type.length, 64)
self.assertEqual(64, region_table.c.id.type.length)
self.assertEqual(64, region_table.c.parent_region_id.type.length)
endpoint_table = sqlalchemy.Table('endpoint',
self.metadata,
autoload=True)
self.assertEqual(endpoint_table.c.region.type.length, 255)
self.assertEqual(255, endpoint_table.c.region.type.length)
def test_endpoint_region_migration(self):
self.upgrade(52)

View File

@ -498,7 +498,7 @@ class MappingCRUDTests(FederationTests):
self.assertIsNotNone(entities)
self.assertResponseStatus(resp, 200)
self.assertValidListLinks(resp.result.get('links'))
self.assertEqual(len(entities), 1)
self.assertEqual(1, len(entities))
def test_mapping_delete(self):
url = self.MAPPING_URL + '%(mapping_id)s'
@ -727,8 +727,8 @@ class MappingRuleEngineTests(FederationTests):
group_ids = values.get('group_ids')
name = values.get('user', {}).get('name')
self.assertEqual(name, user_name)
self.assertEqual(group_ids, [])
self.assertEqual(user_name, name)
self.assertEqual([], group_ids,)
def test_rule_engine_not_any_of_many_rules(self):
"""Should return group EMPLOYEE_GROUP_ID.
@ -749,7 +749,7 @@ class MappingRuleEngineTests(FederationTests):
group_ids = values.get('group_ids')
name = values.get('user', {}).get('name')
self.assertEqual(name, user_name)
self.assertEqual(user_name, name)
self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids)
def test_rule_engine_not_any_of_regex_verify_pass(self):
@ -1410,7 +1410,7 @@ class FederatedTokenTests(FederationTests):
r = self.v3_authenticate_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER)
token_resp = r.result['token']
domain_id = token_resp['domain']['id']
self.assertEqual(domain_id, self.domainA['id'])
self.assertEqual(self.domainA['id'], domain_id)
self._check_scoped_token_attributes(token_resp)
def test_scope_to_domain_multiple_tokens(self):
@ -1434,7 +1434,7 @@ class FederatedTokenTests(FederationTests):
r = self.v3_authenticate_token(body)
token_resp = r.result['token']
domain_id = token_resp['domain']['id']
self.assertEqual(domain_id, domain_id_ref)
self.assertEqual(domain_id_ref, domain_id)
self._check_scoped_token_attributes(token_resp)
def test_scope_to_domain_with_only_inherited_roles_fails(self):
@ -1465,7 +1465,7 @@ class FederatedTokenTests(FederationTests):
r = self.get(url, token=token)
projects_resp = r.result['projects']
projects = set(p['id'] for p in projects_resp)
self.assertEqual(projects, projects_ref,
self.assertEqual(projects_ref, projects,
'match failed for url %s' % url)
def test_list_domains(self):
@ -1491,7 +1491,7 @@ class FederatedTokenTests(FederationTests):
r = self.get(url, token=token)
domains_resp = r.result['domains']
domains = set(p['id'] for p in domains_resp)
self.assertEqual(domains, domains_ref,
self.assertEqual(domains_ref, domains,
'match failed for url %s' % url)
def test_full_workflow(self):
@ -1517,7 +1517,7 @@ class FederatedTokenTests(FederationTests):
r = self.v3_authenticate_token(v3_scope_request)
token_resp = r.result['token']
project_id = token_resp['project']['id']
self.assertEqual(project_id, project['id'])
self.assertEqual(project['id'], project_id)
self._check_scoped_token_attributes(token_resp)
def test_workflow_with_groups_deletion(self):

View File

@ -145,7 +145,7 @@ class IdentityTestFilteredCase(filtering.FilterTests,
self._set_policy(new_policy)
r = self.get('/domains?enabled=0', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertEqual(1, len(id_list))
self.assertIn(self.domainC['id'], id_list)
# Try a few ways of specifying 'false'
@ -159,14 +159,14 @@ class IdentityTestFilteredCase(filtering.FilterTests,
for val in ('1', 'true', 'True', 'TRUE', 'y', 'yes', 'on'):
r = self.get('/domains?enabled=%s' % val, auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertEqual(3, len(id_list))
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(CONF.identity.default_domain_id, id_list)
r = self.get('/domains?enabled', auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 3)
self.assertEqual(3, len(id_list))
self.assertIn(self.domainA['id'], id_list)
self.assertIn(self.domainB['id'], id_list)
self.assertIn(CONF.identity.default_domain_id, id_list)
@ -187,7 +187,7 @@ class IdentityTestFilteredCase(filtering.FilterTests,
my_url = '/domains?enabled&name=%s' % self.domainA['name']
r = self.get(my_url, auth=self.auth)
id_list = self._get_id_list_from_ref_list(r.result.get('domains'))
self.assertEqual(len(id_list), 1)
self.assertEqual(1, len(id_list))
self.assertIn(self.domainA['id'], id_list)
self.assertIs(True, r.result.get('domains')[0]['enabled'])
@ -210,7 +210,7 @@ class IdentityTestFilteredCase(filtering.FilterTests,
# domainA is returned and it is enabled, since enableds=0 is not the
# same as enabled=0
self.assertEqual(len(id_list), 1)
self.assertEqual(1, len(id_list))
self.assertIn(self.domainA['id'], id_list)
self.assertIs(True, r.result.get('domains')[0]['enabled'])
@ -232,8 +232,8 @@ class IdentityTestFilteredCase(filtering.FilterTests,
url_by_name = '/users?name=%my%name%'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 1)
self.assertEqual(r.result.get('users')[0]['id'], user['id'])
self.assertEqual(1, len(r.result.get('users')))
self.assertEqual(user['id'], r.result.get('users')[0]['id'])
def test_inexact_filters(self):
# Create 20 users
@ -263,38 +263,38 @@ class IdentityTestFilteredCase(filtering.FilterTests,
url_by_name = '/users?name__contains=Ministry'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 4)
self.assertEqual(4, len(r.result.get('users')))
self._match_with_list(r.result.get('users'), user_list,
list_start=6, list_end=10)
url_by_name = '/users?name__icontains=miNIstry'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 5)
self.assertEqual(5, len(r.result.get('users')))
self._match_with_list(r.result.get('users'), user_list,
list_start=6, list_end=11)
url_by_name = '/users?name__startswith=The'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 5)
self.assertEqual(5, len(r.result.get('users')))
self._match_with_list(r.result.get('users'), user_list,
list_start=5, list_end=10)
url_by_name = '/users?name__istartswith=the'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 6)
self.assertEqual(6, len(r.result.get('users')))
self._match_with_list(r.result.get('users'), user_list,
list_start=5, list_end=11)
url_by_name = '/users?name__endswith=of'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 1)
self.assertEqual(1, len(r.result.get('users')))
self.assertEqual(r.result.get('users')[0]['id'], user_list[7]['id'])
url_by_name = '/users?name__iendswith=OF'
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 2)
self.assertEqual(r.result.get('users')[0]['id'], user_list[7]['id'])
self.assertEqual(r.result.get('users')[1]['id'], user_list[10]['id'])
self.assertEqual(2, len(r.result.get('users')))
self.assertEqual(user_list[7]['id'], r.result.get('users')[0]['id'])
self.assertEqual(user_list[10]['id'], r.result.get('users')[1]['id'])
self._delete_test_data('user', user_list)
@ -314,7 +314,7 @@ class IdentityTestFilteredCase(filtering.FilterTests,
url_by_name = "/users?name=anything' or 'x'='x"
r = self.get(url_by_name, auth=self.auth)
self.assertEqual(len(r.result.get('users')), 0)
self.assertEqual(0, len(r.result.get('users')))
# See if we can add a SQL command...use the group table instead of the
# user table since 'user' is reserved word for SQLAlchemy.
@ -409,12 +409,12 @@ class IdentityTestListLimitCase(IdentityTestFilteredCase):
self.config_fixture.config(list_limit=5)
self.config_fixture.config(group=driver, list_limit=None)
r = self.get('/%s' % plural, auth=self.auth)
self.assertEqual(len(r.result.get(plural)), 5)
self.assertEqual(5, len(r.result.get(plural)))
self.assertIs(r.result.get('truncated'), True)
self.config_fixture.config(group=driver, list_limit=4)
r = self.get('/%s' % plural, auth=self.auth)
self.assertEqual(len(r.result.get(plural)), 4)
self.assertEqual(4, len(r.result.get(plural)))
self.assertIs(r.result.get('truncated'), True)
def test_users_list_limit(self):
@ -444,7 +444,7 @@ class IdentityTestListLimitCase(IdentityTestFilteredCase):
"""Check truncated attribute not set when list not limited."""
r = self.get('/services', auth=self.auth)
self.assertEqual(len(r.result.get('services')), 10)
self.assertEqual(10, len(r.result.get('services')))
self.assertIsNone(r.result.get('truncated'))
def test_at_limit(self):
@ -456,5 +456,5 @@ class IdentityTestListLimitCase(IdentityTestFilteredCase):
self.config_fixture.config(list_limit=5)
self.config_fixture.config(group='catalog', list_limit=10)
r = self.get('/services', auth=self.auth)
self.assertEqual(len(r.result.get('services')), 10)
self.assertEqual(10, len(r.result.get('services')))
self.assertIsNone(r.result.get('truncated'))

View File

@ -108,7 +108,7 @@ class ConsumerCRUDTests(OAuth1Tests):
body={'consumer': ref})
consumer = resp.result['consumer']
consumer_id = consumer['id']
self.assertEqual(consumer['description'], description)
self.assertEqual(description, consumer['description'])
self.assertIsNotNone(consumer_id)
self.assertIsNotNone(consumer['secret'])
return consumer
@ -146,8 +146,8 @@ class ConsumerCRUDTests(OAuth1Tests):
self_url = ['http://localhost/v3', self.CONSUMER_URL,
'/', consumer_id]
self_url = ''.join(self_url)
self.assertEqual(resp.result['consumer']['links']['self'], self_url)
self.assertEqual(resp.result['consumer']['id'], consumer_id)
self.assertEqual(self_url, resp.result['consumer']['links']['self'])
self.assertEqual(consumer_id, resp.result['consumer']['id'])
def test_consumer_list(self):
self._consumer_create()
@ -156,7 +156,7 @@ class ConsumerCRUDTests(OAuth1Tests):
self.assertIsNotNone(entities)
self_url = ['http://localhost/v3', self.CONSUMER_URL]
self_url = ''.join(self_url)
self.assertEqual(resp.result['links']['self'], self_url)
self.assertEqual(self_url, resp.result['links']['self'])
self.assertValidListLinks(resp.result['links'])
def test_consumer_update(self):
@ -169,8 +169,8 @@ class ConsumerCRUDTests(OAuth1Tests):
update_resp = self.patch(self.CONSUMER_URL + '/%s' % original_id,
body={'consumer': update_ref})
consumer = update_resp.result['consumer']
self.assertEqual(consumer['description'], update_description)
self.assertEqual(consumer['id'], original_id)
self.assertEqual(update_description, consumer['description'])
self.assertEqual(original_id, consumer['id'])
def test_consumer_update_bad_secret(self):
consumer = self._create_single_consumer()
@ -316,8 +316,8 @@ class AccessTokenCRUDTests(OAuthFlowTests):
}
resp = self.get(url)
entity = resp.result['access_token']
self.assertEqual(entity['id'], self.access_token.key)
self.assertEqual(entity['consumer_id'], self.consumer['key'])
self.assertEqual(self.access_token.key, entity['id'])
self.assertEqual(self.consumer['key'], entity['consumer_id'])
self.assertEqual('http://localhost/v3' + url, entity['links']['self'])
def test_get_access_token_dne(self):
@ -342,7 +342,7 @@ class AccessTokenCRUDTests(OAuthFlowTests):
'role': self.role_id})
resp = self.get(url)
entity = resp.result['role']
self.assertEqual(entity['id'], self.role_id)
self.assertEqual(self.role_id, entity['id'])
def test_get_role_in_access_token_dne(self):
self.test_oauth_flow()
@ -385,14 +385,14 @@ class AuthTokenTests(OAuthFlowTests):
# now verify the oauth section
oauth_section = r.result['token']['OS-OAUTH1']
self.assertEqual(oauth_section['access_token_id'],
self.access_token.key)
self.assertEqual(oauth_section['consumer_id'], self.consumer['key'])
self.assertEqual(self.access_token.key,
oauth_section['access_token_id'])
self.assertEqual(self.consumer['key'], oauth_section['consumer_id'])
# verify the roles section
roles_list = r.result['token']['roles']
# we can just verify the 0th role since we are only assigning one role
self.assertEqual(roles_list[0]['id'], self.role_id)
self.assertEqual(self.role_id, roles_list[0]['id'])
# verify that the token can perform delegated tasks
ref = self.new_user_ref(domain_id=self.domain_id)

View File

@ -81,7 +81,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
self.revoke_api.revoke_by_expiration(user_id, expires_at)
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(len(events), 1)
self.assertEqual(1, len(events))
self.assertReportedEventMatchesRecorded(events[0], sample, before_time)
def test_disabled_project_in_list(self):
@ -94,7 +94,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(len(events), 1)
self.assertEqual(1, len(events))
self.assertReportedEventMatchesRecorded(events[0], sample, before_time)
def test_disabled_domain_in_list(self):
@ -107,7 +107,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(len(events), 1)
self.assertEqual(1, len(events))
self.assertReportedEventMatchesRecorded(events[0], sample, before_time)
def test_list_since_invalid(self):
@ -116,7 +116,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
def test_list_since_valid(self):
resp = self.get('/OS-REVOKE/events?since=2013-02-27T18:30:59.999999Z')
events = resp.json_body['events']
self.assertEqual(len(events), 0)
self.assertEqual(0, len(events))
def test_since_future_time_no_events(self):
domain_id = uuid.uuid4().hex
@ -128,7 +128,7 @@ class OSRevokeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin):
resp = self.get('/OS-REVOKE/events')
events = resp.json_body['events']
self.assertEqual(len(events), 1)
self.assertEqual(1, len(events))
resp = self.get('/OS-REVOKE/events?since=%s' % _future_time_string())
events = resp.json_body['events']

View File

@ -626,7 +626,7 @@ class VersionTestCase(tests.TestCase):
def test_public_versions(self):
client = self.client(self.public_app)
resp = client.get('/')
self.assertEqual(resp.status_int, 300)
self.assertEqual(300, resp.status_int)
data = jsonutils.loads(resp.body)
expected = VERSIONS_RESPONSE
for version in expected['versions']['values']:
@ -643,7 +643,7 @@ class VersionTestCase(tests.TestCase):
def test_admin_versions(self):
client = self.client(self.admin_app)
resp = client.get('/')
self.assertEqual(resp.status_int, 300)
self.assertEqual(300, resp.status_int)
data = jsonutils.loads(resp.body)
expected = VERSIONS_RESPONSE
for version in expected['versions']['values']:
@ -663,7 +663,7 @@ class VersionTestCase(tests.TestCase):
for app in (self.public_app, self.admin_app):
client = self.client(app)
resp = client.get('/')
self.assertEqual(resp.status_int, 300)
self.assertEqual(300, resp.status_int)
data = jsonutils.loads(resp.body)
expected = VERSIONS_RESPONSE
for version in expected['versions']['values']:
@ -679,31 +679,31 @@ class VersionTestCase(tests.TestCase):
def test_public_version_v2(self):
client = self.client(self.public_app)
resp = client.get('/v2.0/')
self.assertEqual(resp.status_int, 200)
self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v2.0/' %
CONF.eventlet_server.public_port)
self.assertEqual(data, expected)
self.assertEqual(expected, data)
def test_admin_version_v2(self):
client = self.client(self.admin_app)
resp = client.get('/v2.0/')
self.assertEqual(resp.status_int, 200)
self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v2.0/' %
CONF.eventlet_server.admin_port)
self.assertEqual(data, expected)
self.assertEqual(expected, data)
def test_use_site_url_if_endpoint_unset_v2(self):
self.config_fixture.config(public_endpoint=None, admin_endpoint=None)
for app in (self.public_app, self.admin_app):
client = self.client(app)
resp = client.get('/v2.0/')
self.assertEqual(resp.status_int, 200)
self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'], 'http://localhost/v2.0/')
@ -712,52 +712,52 @@ class VersionTestCase(tests.TestCase):
def test_public_version_v3(self):
client = self.client(self.public_app)
resp = client.get('/v3/')
self.assertEqual(resp.status_int, 200)
self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v3_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v3/' %
CONF.eventlet_server.public_port)
self.assertEqual(data, expected)
self.assertEqual(expected, data)
def test_admin_version_v3(self):
client = self.client(self.public_app)
resp = client.get('/v3/')
self.assertEqual(resp.status_int, 200)
self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v3_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v3/' %
CONF.eventlet_server.admin_port)
self.assertEqual(data, expected)
self.assertEqual(expected, data)
def test_use_site_url_if_endpoint_unset_v3(self):
self.config_fixture.config(public_endpoint=None, admin_endpoint=None)
for app in (self.public_app, self.admin_app):
client = self.client(app)
resp = client.get('/v3/')
self.assertEqual(resp.status_int, 200)
self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v3_VERSION_RESPONSE
self._paste_in_port(expected['version'], 'http://localhost/v3/')
self.assertEqual(data, expected)
self.assertEqual(expected, data)
@mock.patch.object(controllers, '_VERSIONS', ['v3'])
def test_v2_disabled(self):
client = self.client(self.public_app)
# request to /v2.0 should fail
resp = client.get('/v2.0/')
self.assertEqual(resp.status_int, 404)
self.assertEqual(404, resp.status_int)
# request to /v3 should pass
resp = client.get('/v3/')
self.assertEqual(resp.status_int, 200)
self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v3_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v3/' %
CONF.eventlet_server.public_port)
self.assertEqual(data, expected)
self.assertEqual(expected, data)
# only v3 information should be displayed by requests to /
v3_only_response = {
@ -771,26 +771,26 @@ class VersionTestCase(tests.TestCase):
'http://localhost:%s/v3/' %
CONF.eventlet_server.public_port)
resp = client.get('/')
self.assertEqual(resp.status_int, 300)
self.assertEqual(300, resp.status_int)
data = jsonutils.loads(resp.body)
self.assertEqual(data, v3_only_response)
self.assertEqual(v3_only_response, data)
@mock.patch.object(controllers, '_VERSIONS', ['v2.0'])
def test_v3_disabled(self):
client = self.client(self.public_app)
# request to /v3 should fail
resp = client.get('/v3/')
self.assertEqual(resp.status_int, 404)
self.assertEqual(404, resp.status_int)
# request to /v2.0 should pass
resp = client.get('/v2.0/')
self.assertEqual(resp.status_int, 200)
self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
'http://localhost:%s/v2.0/' %
CONF.eventlet_server.public_port)
self.assertEqual(data, expected)
self.assertEqual(expected, data)
# only v2 information should be displayed by requests to /
v2_only_response = {
@ -804,9 +804,9 @@ class VersionTestCase(tests.TestCase):
'http://localhost:%s/v2.0/' %
CONF.eventlet_server.public_port)
resp = client.get('/')
self.assertEqual(resp.status_int, 300)
self.assertEqual(300, resp.status_int)
data = jsonutils.loads(resp.body)
self.assertEqual(data, v2_only_response)
self.assertEqual(v2_only_response, data)
def _test_json_home(self, path, exp_json_home_data):
client = self.client(self.public_app)
@ -923,7 +923,7 @@ class VersionSingleAppTestCase(tests.TestCase):
app = self.loadapp('keystone', app_name)
client = self.client(app)
resp = client.get('/')
self.assertEqual(resp.status_int, 300)
self.assertEqual(300, resp.status_int)
data = jsonutils.loads(resp.body)
expected = VERSIONS_RESPONSE
for version in expected['versions']['values']:

View File

@ -91,16 +91,16 @@ class ApplicationTest(BaseWSGITest):
body = b'{"attribute": "value"}'
resp = wsgi.render_response(body=data)
self.assertEqual(resp.status, '200 OK')
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, body)
self.assertEqual(resp.headers.get('Vary'), 'X-Auth-Token')
self.assertEqual(resp.headers.get('Content-Length'), str(len(body)))
self.assertEqual('200 OK', resp.status)
self.assertEqual(200, resp.status_int)
self.assertEqual(body, resp.body)
self.assertEqual('X-Auth-Token', resp.headers.get('Vary'))
self.assertEqual(str(len(body)), resp.headers.get('Content-Length'))
def test_render_response_custom_status(self):
resp = wsgi.render_response(status=(501, 'Not Implemented'))
self.assertEqual(resp.status, '501 Not Implemented')
self.assertEqual(resp.status_int, 501)
self.assertEqual('501 Not Implemented', resp.status)
self.assertEqual(501, resp.status_int)
def test_successful_require_attribute(self):
app = FakeAttributeCheckerApp()
@ -146,23 +146,23 @@ class ApplicationTest(BaseWSGITest):
def test_render_response_custom_headers(self):
resp = wsgi.render_response(headers=[('Custom-Header', 'Some-Value')])
self.assertEqual(resp.headers.get('Custom-Header'), 'Some-Value')
self.assertEqual(resp.headers.get('Vary'), 'X-Auth-Token')
self.assertEqual('Some-Value', resp.headers.get('Custom-Header'))
self.assertEqual('X-Auth-Token', resp.headers.get('Vary'))
def test_render_response_no_body(self):
resp = wsgi.render_response()
self.assertEqual(resp.status, '204 No Content')
self.assertEqual(resp.status_int, 204)
self.assertEqual(resp.body, b'')
self.assertEqual(resp.headers.get('Content-Length'), '0')
self.assertEqual('204 No Content', resp.status)
self.assertEqual(204, resp.status_int)
self.assertEqual(b'', resp.body)
self.assertEqual('0', resp.headers.get('Content-Length'))
self.assertIsNone(resp.headers.get('Content-Type'))
def test_render_response_head_with_body(self):
resp = wsgi.render_response({'id': uuid.uuid4().hex}, method='HEAD')
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, b'')
self.assertEqual(200, resp.status_int)
self.assertEqual(b'', resp.body)
self.assertNotEqual(resp.headers.get('Content-Length'), '0')
self.assertEqual(resp.headers.get('Content-Type'), 'application/json')
self.assertEqual('application/json', resp.headers.get('Content-Type'))
def test_application_local_config(self):
class FakeApp(wsgi.Application):
@ -176,14 +176,14 @@ class ApplicationTest(BaseWSGITest):
def test_render_exception(self):
e = exception.Unauthorized(message=u'\u7f51\u7edc')
resp = wsgi.render_exception(e)
self.assertEqual(resp.status_int, 401)
self.assertEqual(401, resp.status_int)
def test_render_exception_host(self):
e = exception.Unauthorized(message=u'\u7f51\u7edc')
context = {'host_url': 'http://%s:5000' % uuid.uuid4().hex}
resp = wsgi.render_exception(e, context=context)
self.assertEqual(resp.status_int, 401)
self.assertEqual(401, resp.status_int)
class ExtensionRouterTest(BaseWSGITest):
@ -226,7 +226,7 @@ class MiddlewareTest(BaseWSGITest):
req = self._make_request()
req.environ['REMOTE_ADDR'] = '127.0.0.1'
resp = FakeMiddleware(self.app)(req)
self.assertEqual(resp.status_int, exception.Unauthorized.code)
self.assertEqual(exception.Unauthorized.code, resp.status_int)
def test_middleware_type_error(self):
class FakeMiddleware(wsgi.Middleware):
@ -237,7 +237,7 @@ class MiddlewareTest(BaseWSGITest):
req.environ['REMOTE_ADDR'] = '127.0.0.1'
resp = FakeMiddleware(self.app)(req)
# This is a validationerror type
self.assertEqual(resp.status_int, exception.ValidationError.code)
self.assertEqual(exception.ValidationError.code, resp.status_int)
def test_middleware_exception_error(self):
@ -250,7 +250,7 @@ class MiddlewareTest(BaseWSGITest):
def do_request():
req = self._make_request()
resp = FakeMiddleware(self.app)(req)
self.assertEqual(resp.status_int, exception.UnexpectedError.code)
self.assertEqual(exception.UnexpectedError.code, resp.status_int)
return resp
# Exception data should not be in the message when debug is False
@ -287,7 +287,7 @@ class LocalizedResponseTest(tests.TestCase):
mock_gal.return_value = [language]
req = webob.Request.blank('/', headers={'Accept-Language': language})
self.assertEqual(wsgi.best_match_language(req), language)
self.assertEqual(language, wsgi.best_match_language(req))
@mock.patch.object(oslo_i18n, 'get_available_languages')
def test_request_match_language_unexpected(self, mock_gal):
@ -417,7 +417,7 @@ class ServerTest(tests.TestCase):
server.start()
self.addCleanup(server.stop)
self.assertEqual(mock_sock_dup.setsockopt.call_count, 2)
self.assertEqual(2, mock_sock_dup.setsockopt.call_count)
# Test the last set of call args i.e. for the keepidle
mock_sock_dup.setsockopt.assert_called_with(socket.IPPROTO_TCP,