Fix parameters of assertEqual are misplaced

Many assertEqual sentences don't follow assertEqual(expected, actual),
These misplaces have 2 impacts:
1, giving confusing messages when some tests failed.
2, mislead other developers, new test modules may follow these wrong pattern.

This patch fix all of them.

Change-Id: Ia5155114c60f5e27cb00b8aba6de5cf32e046dd7
Closes-Bug: #1604213
This commit is contained in:
yan.haifeng 2016-07-20 18:05:24 +08:00
parent a794790880
commit c1735c503f
12 changed files with 122 additions and 123 deletions

View File

@ -1673,9 +1673,9 @@ class LiveFacadeTest(test_base.DbTestCase):
session = self.sessionmaker(autocommit=True)
self.assertEqual(
[("u1_commit",)],
session.query(
self.User.name).order_by(self.User.name).all(),
[("u1_commit",)]
self.User.name).order_by(self.User.name).all()
)
def test_replace_scope(self):

View File

@ -802,7 +802,7 @@ class TestDeadlock(TestsExceptionFilter):
if isinstance(matched, exception.DBError):
matched = matched.inner_exception
self.assertEqual(matched.orig.__class__.__name__, expected_dbapi_cls)
self.assertEqual(expected_dbapi_cls, matched.orig.__class__.__name__)
def test_mysql_pymysql_deadlock(self):
self._run_deadlock_detect_test(
@ -1031,7 +1031,7 @@ class TestDBDisconnected(TestsExceptionFilter):
with self._fixture(dialect_name, exc_obj, 1, is_disconnect):
conn = self.engine.connect()
with conn.begin():
self.assertEqual(conn.scalar(sqla.select([1])), 1)
self.assertEqual(1, conn.scalar(sqla.select([1])))
self.assertFalse(conn.closed)
self.assertFalse(conn.invalidated)
self.assertTrue(conn.in_transaction())
@ -1044,7 +1044,7 @@ class TestDBDisconnected(TestsExceptionFilter):
# test implicit execution
with self._fixture(dialect_name, exc_obj, 1):
self.assertEqual(self.engine.scalar(sqla.select([1])), 1)
self.assertEqual(1, self.engine.scalar(sqla.select([1])))
def test_mysql_ping_listener_disconnected(self):
for code in [2006, 2013, 2014, 2045, 2055]:
@ -1151,7 +1151,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
2, -1
)
# conn is good
self.assertEqual(conn.scalar(sqla.select([1])), 1)
self.assertEqual(1, conn.scalar(sqla.select([1])))
def test_connect_retry_past_failure(self):
conn = self._run_test(
@ -1160,7 +1160,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
2, 3
)
# conn is good
self.assertEqual(conn.scalar(sqla.select([1])), 1)
self.assertEqual(1, conn.scalar(sqla.select([1])))
def test_connect_retry_not_candidate_exception(self):
self.assertRaises(
@ -1188,7 +1188,7 @@ class TestDBConnectRetry(TestsExceptionFilter):
2, -1
)
# conn is good
self.assertEqual(conn.scalar(sqla.select([1])), 1)
self.assertEqual(1, conn.scalar(sqla.select([1])))
def test_db2_error_negative(self):
self.assertRaises(
@ -1245,7 +1245,7 @@ class TestDBConnectPingWrapping(TestsExceptionFilter):
self, dialect_name, exc_obj, is_disconnect=True):
with self._fixture(dialect_name, exc_obj, 3, is_disconnect):
conn = self.engine.connect()
self.assertEqual(conn.scalar(sqla.select([1])), 1)
self.assertEqual(1, conn.scalar(sqla.select([1])))
conn.close()
with self._fixture(dialect_name, exc_obj, 1, is_disconnect):

View File

@ -285,11 +285,11 @@ class TestMigrationMultipleExtensions(test_base.BaseTestCase):
def test_upgrade_right_order(self):
results = self.migration_manager.upgrade(None)
self.assertEqual(results, [100, 200])
self.assertEqual([100, 200], results)
def test_downgrade_right_order(self):
results = self.migration_manager.downgrade(None)
self.assertEqual(results, [100, 0])
self.assertEqual([100, 0], results)
def test_upgrade_does_not_go_too_far(self):
self.first_ext.obj.has_revision.return_value = True

View File

@ -65,7 +65,7 @@ class TestMigrationCommon(test_base.DbTestCase):
def test_find_migrate_repo_called_once(self):
my_repository = migration._find_migrate_repo(self.path)
self.repository.assert_called_once_with(self.path)
self.assertEqual(my_repository, self.return_value)
self.assertEqual(self.return_value, my_repository)
def test_find_migrate_repo_called_few_times(self):
repo1 = migration._find_migrate_repo(self.path)
@ -82,7 +82,7 @@ class TestMigrationCommon(test_base.DbTestCase):
version = migration.db_version_control(
self.engine, self.path, self.test_version)
self.assertEqual(version, self.test_version)
self.assertEqual(self.test_version, version)
mock_version_control.assert_called_once_with(
self.engine, self.return_value, self.test_version)
@ -111,7 +111,7 @@ class TestMigrationCommon(test_base.DbTestCase):
def test_db_version_return(self):
ret_val = migration.db_version(self.engine, self.path,
self.init_version)
self.assertEqual(ret_val, self.test_version)
self.assertEqual(self.test_version, ret_val)
def test_db_version_raise_not_controlled_error_first(self):
with mock.patch.object(migration, 'db_version_control') as mock_ver:
@ -122,7 +122,7 @@ class TestMigrationCommon(test_base.DbTestCase):
ret_val = migration.db_version(self.engine, self.path,
self.init_version)
self.assertEqual(ret_val, self.test_version)
self.assertEqual(self.test_version, ret_val)
mock_ver.assert_called_once_with(self.engine, self.path,
version=self.init_version)

View File

@ -115,10 +115,10 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
int(self.REPOSITORY.latest) + 1)
upgraded = [mock.call(v, with_data=True)
for v in versions]
self.assertEqual(self.migrate_up.call_args_list, upgraded)
self.assertEqual(upgraded, self.migrate_up.call_args_list)
downgraded = [mock.call(v - 1) for v in reversed(versions)]
self.assertEqual(self.migrate_down.call_args_list, downgraded)
self.assertEqual(downgraded, self.migrate_down.call_args_list)
@mock.patch.object(migrate.WalkVersionsMixin, 'migrate_up')
@mock.patch.object(migrate.WalkVersionsMixin, 'migrate_down')
@ -143,7 +143,7 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
downgraded_2.append(mock.call(v - 1))
downgraded_2.append(mock.call(v - 1))
downgraded = downgraded_1 + downgraded_2
self.assertEqual(self.migrate_down.call_args_list, downgraded)
self.assertEqual(downgraded, self.migrate_down.call_args_list)
@mock.patch.object(migrate.WalkVersionsMixin, 'migrate_up')
@mock.patch.object(migrate.WalkVersionsMixin, 'migrate_down')
@ -163,7 +163,7 @@ class TestWalkVersions(test.BaseTestCase, migrate.WalkVersionsMixin):
self.assertEqual(upgraded, self.migrate_up.call_args_list)
downgraded = [mock.call(v - 1, with_data=True) for v in versions]
self.assertEqual(self.migrate_down.call_args_list, downgraded)
self.assertEqual(downgraded, self.migrate_down.call_args_list)
@mock.patch.object(migrate.WalkVersionsMixin, 'migrate_up')
@mock.patch.object(migrate.WalkVersionsMixin, 'migrate_down')
@ -511,7 +511,7 @@ class TestOldCheckForeignKeys(test_base.DbTestCase):
)
for cmd, fk, tname, fk_info in diffs
]
self.assertEqual(diffs, compare_to)
self.assertEqual(compare_to, diffs)
def test_fk_added(self):
self._fk_added_fixture()

View File

@ -52,13 +52,13 @@ class ModelBaseTest(test_base.DbTestCase):
def test_modelbase_set(self):
self.mb['world'] = 'hello'
self.assertEqual(self.mb['world'], 'hello')
self.assertEqual('hello', self.mb['world'])
def test_modelbase_update(self):
h = {'a': '1', 'b': '2'}
self.mb.update(h)
for key in h.keys():
self.assertEqual(self.mb[key], h[key])
self.assertEqual(h[key], self.mb[key])
def test_modelbase_contains(self):
mb = models.ModelBase()
@ -92,8 +92,8 @@ class ModelBaseTest(test_base.DbTestCase):
'b': '2',
}
self.ekm.update(h)
self.assertEqual(dict(self.ekm.items()), expected)
self.assertEqual(dict(self.ekm.iteritems()), expected)
self.assertEqual(expected, dict(self.ekm.items()))
self.assertEqual(expected, dict(self.ekm.iteritems()))
def test_modelbase_dict(self):
h = {'a': '1', 'b': '2'}
@ -105,7 +105,7 @@ class ModelBaseTest(test_base.DbTestCase):
'b': '2',
}
self.ekm.update(h)
self.assertEqual(dict(self.ekm), expected)
self.assertEqual(expected, dict(self.ekm))
def test_modelbase_iter(self):
expected = {
@ -125,12 +125,11 @@ class ModelBaseTest(test_base.DbTestCase):
self.assertEqual(len(expected), found_items)
def test_modelbase_keys(self):
self.assertEqual(set(self.ekm.keys()),
set(('id', 'smth', 'name')))
self.assertEqual(set(('id', 'smth', 'name')), set(self.ekm.keys()))
self.ekm.update({'a': '1', 'b': '2'})
self.assertEqual(set(self.ekm.keys()),
set(('a', 'b', 'id', 'smth', 'name')))
self.assertEqual(set(('a', 'b', 'id', 'smth', 'name')),
set(self.ekm.keys()))
def test_modelbase_several_iters(self):
mb = ExtraKeysModel()
@ -138,22 +137,23 @@ class ModelBaseTest(test_base.DbTestCase):
it2 = iter(mb)
self.assertFalse(it1 is it2)
self.assertEqual(dict(it1), dict(mb))
self.assertEqual(dict(it2), dict(mb))
self.assertEqual(dict(mb), dict(it1))
self.assertEqual(dict(mb), dict(it2))
def test_extra_keys_empty(self):
"""Test verifies that by default extra_keys return empty list."""
self.assertEqual(self.mb._extra_keys, [])
self.assertEqual([], self.mb._extra_keys)
def test_extra_keys_defined(self):
"""Property _extra_keys will return list with attributes names."""
self.assertEqual(self.ekm._extra_keys, ['name'])
self.assertEqual(['name'], self.ekm._extra_keys)
def test_model_with_extra_keys(self):
data = dict(self.ekm)
self.assertEqual(data, {'smth': None,
'id': None,
'name': 'NAME'})
self.assertEqual({'smth': None,
'id': None,
'name': 'NAME'},
data)
class ExtraKeysModel(BASE, models.ModelBase):

View File

@ -39,14 +39,14 @@ sql_connection_debug=60
sql_connection_trace=True
"""]])[0]
self.conf(['--config-file', path])
self.assertEqual(self.conf.database.connection, 'x://y.z')
self.assertEqual(self.conf.database.min_pool_size, 10)
self.assertEqual(self.conf.database.max_pool_size, 20)
self.assertEqual(self.conf.database.max_retries, 30)
self.assertEqual(self.conf.database.retry_interval, 40)
self.assertEqual(self.conf.database.max_overflow, 50)
self.assertEqual(self.conf.database.connection_debug, 60)
self.assertEqual(self.conf.database.connection_trace, True)
self.assertEqual('x://y.z', self.conf.database.connection)
self.assertEqual(10, self.conf.database.min_pool_size)
self.assertEqual(20, self.conf.database.max_pool_size)
self.assertEqual(30, self.conf.database.max_retries)
self.assertEqual(40, self.conf.database.retry_interval)
self.assertEqual(50, self.conf.database.max_overflow)
self.assertEqual(60, self.conf.database.connection_debug)
self.assertEqual(True, self.conf.database.connection_trace)
def test_session_parameters(self):
path = self.create_tempfiles([["tmp", b"""[database]
@ -61,15 +61,15 @@ connection_trace=True
pool_timeout=7
"""]])[0]
self.conf(['--config-file', path])
self.assertEqual(self.conf.database.connection, 'x://y.z')
self.assertEqual(self.conf.database.min_pool_size, 10)
self.assertEqual(self.conf.database.max_pool_size, 20)
self.assertEqual(self.conf.database.max_retries, 30)
self.assertEqual(self.conf.database.retry_interval, 40)
self.assertEqual(self.conf.database.max_overflow, 50)
self.assertEqual(self.conf.database.connection_debug, 60)
self.assertEqual(self.conf.database.connection_trace, True)
self.assertEqual(self.conf.database.pool_timeout, 7)
self.assertEqual('x://y.z', self.conf.database.connection)
self.assertEqual(10, self.conf.database.min_pool_size)
self.assertEqual(20, self.conf.database.max_pool_size)
self.assertEqual(30, self.conf.database.max_retries)
self.assertEqual(40, self.conf.database.retry_interval)
self.assertEqual(50, self.conf.database.max_overflow)
self.assertEqual(60, self.conf.database.connection_debug)
self.assertEqual(True, self.conf.database.connection_trace)
self.assertEqual(7, self.conf.database.pool_timeout)
def test_dbapi_database_deprecated_parameters(self):
path = self.create_tempfiles([['tmp', b'[DATABASE]\n'
@ -83,14 +83,14 @@ pool_timeout=7
b'sqlalchemy_pool_timeout=5\n'
]])[0]
self.conf(['--config-file', path])
self.assertEqual(self.conf.database.connection, 'fake_connection')
self.assertEqual(self.conf.database.idle_timeout, 100)
self.assertEqual(self.conf.database.min_pool_size, 99)
self.assertEqual(self.conf.database.max_pool_size, 199)
self.assertEqual(self.conf.database.max_retries, 22)
self.assertEqual(self.conf.database.retry_interval, 17)
self.assertEqual(self.conf.database.max_overflow, 101)
self.assertEqual(self.conf.database.pool_timeout, 5)
self.assertEqual('fake_connection', self.conf.database.connection)
self.assertEqual(100, self.conf.database.idle_timeout)
self.assertEqual(99, self.conf.database.min_pool_size)
self.assertEqual(199, self.conf.database.max_pool_size)
self.assertEqual(22, self.conf.database.max_retries)
self.assertEqual(17, self.conf.database.retry_interval)
self.assertEqual(101, self.conf.database.max_overflow)
self.assertEqual(5, self.conf.database.pool_timeout)
def test_dbapi_database_deprecated_parameters_sql(self):
path = self.create_tempfiles([['tmp', b'[sql]\n'
@ -98,8 +98,8 @@ pool_timeout=7
b'idle_timeout=99\n'
]])[0]
self.conf(['--config-file', path])
self.assertEqual(self.conf.database.connection, 'test_sql_connection')
self.assertEqual(self.conf.database.idle_timeout, 99)
self.assertEqual('test_sql_connection', self.conf.database.connection)
self.assertEqual(99, self.conf.database.idle_timeout)
def test_deprecated_dbapi_parameters(self):
path = self.create_tempfiles([['tmp', b'[DEFAULT]\n'
@ -107,7 +107,7 @@ pool_timeout=7
]])[0]
self.conf(['--config-file', path])
self.assertEqual(self.conf.database.backend, 'test_123')
self.assertEqual('test_123', self.conf.database.backend)
def test_dbapi_parameters(self):
path = self.create_tempfiles([['tmp', b'[database]\n'
@ -115,7 +115,7 @@ pool_timeout=7
]])[0]
self.conf(['--config-file', path])
self.assertEqual(self.conf.database.backend, 'test_123')
self.assertEqual('test_123', self.conf.database.backend)
def test_set_defaults(self):
conf = cfg.ConfigOpts()

View File

@ -122,7 +122,7 @@ class RetainSchemaTest(oslo_test_base.BaseTestCase):
with engine.connect() as conn:
rows = conn.execute(self.test_table.select())
self.assertEqual(rows.fetchall(), [])
self.assertEqual([], rows.fetchall())
trans = conn.begin()
conn.execute(
@ -132,7 +132,7 @@ class RetainSchemaTest(oslo_test_base.BaseTestCase):
trans.rollback()
rows = conn.execute(self.test_table.select())
self.assertEqual(rows.fetchall(), [])
self.assertEqual([], rows.fetchall())
trans = conn.begin()
conn.execute(
@ -142,7 +142,7 @@ class RetainSchemaTest(oslo_test_base.BaseTestCase):
trans.commit()
rows = conn.execute(self.test_table.select())
self.assertEqual(rows.fetchall(), [(2, 3)])
self.assertEqual([(2, 3)], rows.fetchall())
transaction_resource.finishedWith(engine)

View File

@ -74,7 +74,7 @@ class RegexpFilterTestCase(test_base.DbTestCase):
regexp_op = RegexpTable.bar.op('REGEXP')(regexp)
result = _session.query(RegexpTable).filter(regexp_op).all()
self.assertEqual([r.bar for r in result], expected)
self.assertEqual(expected, [r.bar for r in result])
def test_regexp_filter(self):
self._test_regexp_filter('10', ['10'])
@ -402,15 +402,15 @@ class SQLiteConnectTest(oslo_test.BaseTestCase):
def test_sqlite_fk_listener(self):
engine = self._fixture(sqlite_fk=True)
self.assertEqual(
engine.scalar("pragma foreign_keys"),
1
1,
engine.scalar("pragma foreign_keys")
)
engine = self._fixture(sqlite_fk=False)
self.assertEqual(
engine.scalar("pragma foreign_keys"),
0
0,
engine.scalar("pragma foreign_keys")
)
def test_sqlite_synchronous_listener(self):
@ -419,15 +419,15 @@ class SQLiteConnectTest(oslo_test.BaseTestCase):
# "The default setting is synchronous=FULL." (e.g. 2)
# http://www.sqlite.org/pragma.html#pragma_synchronous
self.assertEqual(
engine.scalar("pragma synchronous"),
2
2,
engine.scalar("pragma synchronous")
)
engine = self._fixture(sqlite_synchronous=False)
self.assertEqual(
engine.scalar("pragma synchronous"),
0
0,
engine.scalar("pragma synchronous")
)
@ -564,8 +564,8 @@ class CreateEngineTest(oslo_test.BaseTestCase):
engines._init_connection_args(
url.make_url("mysql+pymysql://u:p@host/test"), self.args,
max_pool_size=10, max_overflow=10)
self.assertEqual(self.args['pool_size'], 10)
self.assertEqual(self.args['max_overflow'], 10)
self.assertEqual(10, self.args['pool_size'])
self.assertEqual(10, self.args['max_overflow'])
def test_sqlite_memory_pool_args(self):
for _url in ("sqlite://", "sqlite:///:memory:"):
@ -579,8 +579,8 @@ class CreateEngineTest(oslo_test.BaseTestCase):
self.assertTrue(
'max_overflow' not in self.args)
self.assertEqual(self.args['connect_args']['check_same_thread'],
False)
self.assertEqual(False,
self.args['connect_args']['check_same_thread'])
# due to memory connection
self.assertTrue('poolclass' in self.args)
@ -603,11 +603,11 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def _test_mysql_connect_args_default(self, connect_args):
if six.PY3:
self.assertEqual(connect_args,
{'charset': 'utf8', 'use_unicode': 1})
self.assertEqual({'charset': 'utf8', 'use_unicode': 1},
connect_args)
else:
self.assertEqual(connect_args,
{'charset': 'utf8', 'use_unicode': 0})
self.assertEqual({'charset': 'utf8', 'use_unicode': 0},
connect_args)
def test_mysql_connect_args_default(self):
engines._init_connection_args(
@ -622,8 +622,7 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def test_mysql_pymysql_connect_args_default(self):
engines._init_connection_args(
url.make_url("mysql+pymysql://u:p@host/test"), self.args)
self.assertEqual(self.args['connect_args'],
{'charset': 'utf8'})
self.assertEqual({'charset': 'utf8'}, self.args['connect_args'])
def test_mysql_mysqldb_connect_args_default(self):
engines._init_connection_args(
@ -633,14 +632,14 @@ class CreateEngineTest(oslo_test.BaseTestCase):
def test_postgresql_connect_args_default(self):
engines._init_connection_args(
url.make_url("postgresql://u:p@host/test"), self.args)
self.assertEqual(self.args['client_encoding'], 'utf8')
self.assertEqual('utf8', self.args['client_encoding'])
self.assertFalse(self.args['connect_args'])
def test_mysqlconnector_raise_on_warnings_default(self):
engines._init_connection_args(
url.make_url("mysql+mysqlconnector://u:p@host/test"),
self.args)
self.assertEqual(self.args['connect_args']['raise_on_warnings'], False)
self.assertEqual(False, self.args['connect_args']['raise_on_warnings'])
def test_mysqlconnector_raise_on_warnings_override(self):
engines._init_connection_args(

View File

@ -88,19 +88,19 @@ class JsonTypesTestCase(test_base.DbTestCase):
def test_mysql_variants(self):
self.assertEqual(
"LONGTEXT",
str(
types.JsonEncodedDict(mysql_as_long=True).compile(
dialect=mysql.dialect())
),
"LONGTEXT"
)
)
self.assertEqual(
"MEDIUMTEXT",
str(
types.JsonEncodedDict(mysql_as_medium=True).compile(
dialect=mysql.dialect())
),
"MEDIUMTEXT"
)
)
self.assertRaises(

View File

@ -293,8 +293,8 @@ class TestPaginateQueryActualSQL(test_base.BaseTestCase):
)
self.assertEqual(
str(q.statement.compile()),
str(expected_core_sql.compile())
str(expected_core_sql.compile()),
str(q.statement.compile())
)
@ -357,7 +357,7 @@ class TestMigrationUtils(db_test_base.DbTestCase):
real_ids = [row[0] for row in
self.engine.execute(select([test_table.c.id])).fetchall()]
self.assertEqual(len(real_ids), len(expected_ids))
self.assertEqual(len(expected_ids), len(real_ids))
for id_ in expected_ids:
self.assertTrue(id_ in real_ids)
@ -397,7 +397,7 @@ class TestMigrationUtils(db_test_base.DbTestCase):
rows_select = base_select.where(table.c.deleted != table.c.id)
row_ids = [row['id'] for row in
self.engine.execute(rows_select).fetchall()]
self.assertEqual(len(row_ids), len(expected_values))
self.assertEqual(len(expected_values), len(row_ids))
for value in expected_values:
self.assertTrue(value['id'] in row_ids)
@ -406,8 +406,8 @@ class TestMigrationUtils(db_test_base.DbTestCase):
deleted_rows_ids = [row['id'] for row in
self.engine.execute(
deleted_rows_select).fetchall()]
self.assertEqual(len(deleted_rows_ids),
len(values) - len(row_ids))
self.assertEqual(len(values) - len(row_ids),
len(deleted_rows_ids))
for value in soft_deleted_values:
self.assertTrue(value['id'] in deleted_rows_ids)
@ -435,12 +435,12 @@ class TestMigrationUtils(db_test_base.DbTestCase):
insp = reflection.Inspector.from_engine(self.engine)
real_indexes = insp.get_indexes(table_name)
self.assertEqual(len(real_indexes), 3)
self.assertEqual(3, len(real_indexes))
for index in real_indexes:
name = index['name']
self.assertIn(name, indexes)
self.assertEqual(set(index['column_names']),
set(indexes[name]))
self.assertEqual(set(indexes[name]),
set(index['column_names']))
def test_change_deleted_column_type_to_id_type_integer(self):
table_name = 'abc'
@ -611,13 +611,13 @@ class TestMigrationUtils(db_test_base.DbTestCase):
query_insert)
result_insert = self.conn.execute(insert_statement)
# Verify we insert 4 rows
self.assertEqual(result_insert.rowcount, 4)
self.assertEqual(4, result_insert.rowcount)
query_all = select([insert_table]).where(
insert_table.c.uuid.in_(uuidstrs))
rows = self.conn.execute(query_all).fetchall()
# Verify we really have 4 rows in insert_table
self.assertEqual(len(rows), 4)
self.assertEqual(4, len(rows))
def test_insert_from_select_with_specified_columns(self):
insert_table_name = "__test_insert_to_table__"
@ -651,13 +651,13 @@ class TestMigrationUtils(db_test_base.DbTestCase):
query_insert, ['id', 'uuid'])
result_insert = self.conn.execute(insert_statement)
# Verify we insert 4 rows
self.assertEqual(result_insert.rowcount, 4)
self.assertEqual(4, result_insert.rowcount)
query_all = select([insert_table]).where(
insert_table.c.uuid.in_(uuidstrs))
rows = self.conn.execute(query_all).fetchall()
# Verify we really have 4 rows in insert_table
self.assertEqual(len(rows), 4)
self.assertEqual(4, len(rows))
def test_insert_from_select_with_specified_columns_negative(self):
insert_table_name = "__test_insert_to_table__"
@ -737,12 +737,12 @@ class TestConnectionUtils(test_utils.BaseTestCase):
def test_connect_string(self):
connect_string = utils.get_connect_string(**self.full_credentials)
self.assertEqual(connect_string, self.connect_string)
self.assertEqual(self.connect_string, connect_string)
def test_connect_string_sqlite(self):
sqlite_credentials = {'backend': 'sqlite', 'database': 'test.db'}
connect_string = utils.get_connect_string(**sqlite_credentials)
self.assertEqual(connect_string, 'sqlite:///test.db')
self.assertEqual('sqlite:///test.db', connect_string)
def test_is_backend_avail(self):
self.mox.StubOutWithMock(sqlalchemy.engine.base.Engine, 'connect')
@ -809,13 +809,13 @@ class TestConnectionUtils(test_utils.BaseTestCase):
def test_get_db_connection_info(self):
conn_pieces = parse.urlparse(self.connect_string)
self.assertEqual(utils.get_db_connection_info(conn_pieces),
('dude', 'pass', 'test', 'localhost'))
self.assertEqual(('dude', 'pass', 'test', 'localhost'),
utils.get_db_connection_info(conn_pieces))
def test_connect_string_host(self):
self.full_credentials['host'] = 'myhost'
connect_string = utils.get_connect_string(**self.full_credentials)
self.assertEqual(connect_string, 'postgresql://dude:pass@myhost/test')
self.assertEqual('postgresql://dude:pass@myhost/test', connect_string)
class MyModelSoftDeletedProjectId(declarative_base(), models.ModelBase,
@ -858,8 +858,8 @@ class TestModelQuery(test_base.BaseTestCase):
MyModelSoftDeleted, session=self.session, deleted=False)
deleted_filter = mock_query.filter.call_args[0][0]
self.assertEqual(str(deleted_filter),
'soft_deleted_test_model.deleted = :deleted_1')
self.assertEqual('soft_deleted_test_model.deleted = :deleted_1',
str(deleted_filter))
self.assertEqual(deleted_filter.right.value,
MyModelSoftDeleted.__mapper__.c.deleted.default.arg)
@ -876,7 +876,7 @@ class TestModelQuery(test_base.BaseTestCase):
@mock.patch.object(utils, "_read_deleted_filter")
def test_no_deleted_value(self, _read_deleted_filter):
utils.model_query(MyModelSoftDeleted, session=self.session)
self.assertEqual(_read_deleted_filter.call_count, 0)
self.assertEqual(0, _read_deleted_filter.call_count)
def test_project_filter(self):
project_id = 10
@ -887,9 +887,9 @@ class TestModelQuery(test_base.BaseTestCase):
deleted_filter = mock_query.filter.call_args[0][0]
self.assertEqual(
str(deleted_filter),
'soft_deleted_project_id_test_model.project_id = :project_id_1')
self.assertEqual(deleted_filter.right.value, project_id)
'soft_deleted_project_id_test_model.project_id = :project_id_1',
str(deleted_filter))
self.assertEqual(project_id, deleted_filter.right.value)
def test_project_filter_wrong_model(self):
self.assertRaises(ValueError, utils.model_query,
@ -902,9 +902,9 @@ class TestModelQuery(test_base.BaseTestCase):
session=self.session, project_id=(10, None))
self.assertEqual(
str(mock_query.filter.call_args[0][0]),
'soft_deleted_project_id_test_model.project_id'
' IN (:project_id_1, NULL)'
' IN (:project_id_1, NULL)',
str(mock_query.filter.call_args[0][0])
)
def test_model_query_common(self):
@ -1168,7 +1168,7 @@ class TestDialectFunctionDispatcher(test_base.BaseTestCase):
callable_fn.mysql_pymysql.return_value = 5
self.assertEqual(
dispatcher("mysql+pymysql://u:p@h/t", 3), 5
5, dispatcher("mysql+pymysql://u:p@h/t", 3)
)
def test_engine(self):

View File

@ -63,13 +63,13 @@ class TpoolDbapiWrapperTestCase(test_utils.BaseTestCase):
mock_db_api.from_config.assert_called_once_with(
conf=self.conf, backend_mapping=FAKE_BACKEND_MAPPING)
self.assertEqual(self.db_api._db_api, fake_db_api)
self.assertEqual(fake_db_api, self.db_api._db_api)
self.assertFalse(self.eventlet.tpool.Proxy.called)
# get access to other db-api method to be sure that api didn't changed
self.db_api.fake_call_2
self.assertEqual(self.db_api._db_api, fake_db_api)
self.assertEqual(fake_db_api, self.db_api._db_api)
self.assertFalse(self.eventlet.tpool.Proxy.called)
self.assertEqual(1, mock_db_api.from_config.call_count)
@ -92,7 +92,7 @@ class TpoolDbapiWrapperTestCase(test_utils.BaseTestCase):
mock_db_api.from_config.assert_called_once_with(
conf=self.conf, backend_mapping=FAKE_BACKEND_MAPPING)
self.eventlet.tpool.Proxy.assert_called_once_with(fake_db_api)
self.assertEqual(self.db_api._db_api, self.proxy)
self.assertEqual(self.proxy, self.db_api._db_api)
@mock.patch('oslo_db.api.DBAPI')
def test_db_api_without_installed_eventlet(self, mock_db_api):