Correct a test and order of parameters to assertEqual

In debugging a failing test (test_update_heartbeat) I observed that
the test was asserting incorrect values and also that the order of
parameters to assertEqual were inverted. It appears that the order of
parameters to assertEqual is a more widespread problem.

The correct order makes the assertion message more useful.

Change-Id: Ide9f60f8338398ad8e61ad81f4a30058e415f8e9
Closes-Bug: #1424741
Partial-Bug: #1277104
This commit is contained in:
Amrith Kumar 2015-02-23 12:45:31 -05:00
parent c8a9d467fd
commit b9820ab41a
13 changed files with 130 additions and 129 deletions

View File

@ -228,11 +228,11 @@ class LimitMiddlewareTest(BaseLimitTestSuite):
body = jsonutils.loads(response.body)
expected = "Only 1 GET request(s) can be made to * every minute."
value = body["overLimit"]["details"].strip()
self.assertEqual(value, expected)
self.assertEqual(expected, value)
self.assertTrue("retryAfter" in body["overLimit"])
retryAfter = body["overLimit"]["retryAfter"]
self.assertEqual(retryAfter, "60")
self.assertEqual("60", retryAfter)
class LimitTest(BaseLimitTestSuite):
@ -313,28 +313,28 @@ class ParseLimitsTest(BaseLimitTestSuite):
assert False, str(e)
# Make sure the number of returned limits are correct
self.assertEqual(len(l), 4)
self.assertEqual(4, len(l))
# Check all the verbs...
expected = ['GET', 'PUT', 'POST', 'SAY']
self.assertEqual([t.verb for t in l], expected)
self.assertEqual(expected, [t.verb for t in l])
# ...the URIs...
expected = ['*', '/foo*', '/bar*', '/derp*']
self.assertEqual([t.uri for t in l], expected)
self.assertEqual(expected, [t.uri for t in l])
# ...the regexes...
expected = ['.*', '/foo.*', '/bar.*', '/derp.*']
self.assertEqual([t.regex for t in l], expected)
self.assertEqual(expected, [t.regex for t in l])
# ...the values...
expected = [20, 10, 5, 1]
self.assertEqual([t.value for t in l], expected)
self.assertEqual(expected, [t.value for t in l])
# ...and the units...
expected = [limits.PER_MINUTE, limits.PER_HOUR,
limits.PER_SECOND, limits.PER_DAY]
self.assertEqual([t.unit for t in l], expected)
self.assertEqual(expected, [t.unit for t in l])
class LimiterTest(BaseLimitTestSuite):
@ -370,12 +370,12 @@ class LimiterTest(BaseLimitTestSuite):
didn"t set.
"""
delay = self.limiter.check_for_delay("GET", "/anything")
self.assertEqual(delay, (None, None))
self.assertEqual((None, None), delay)
def test_no_delay_PUT(self):
# Simple test to ensure no delay on a single call for a known limit.
delay = self.limiter.check_for_delay("PUT", "/anything")
self.assertEqual(delay, (None, None))
self.assertEqual((None, None), delay)
def test_delay_PUT(self):
"""
@ -450,7 +450,7 @@ class LimiterTest(BaseLimitTestSuite):
def test_user_limit(self):
# Test user-specific limits.
self.assertEqual(self.limiter.levels['user3'], [])
self.assertEqual([], self.limiter.levels['user3'])
def test_multiple_users(self):
# Tests involving multiple users.
@ -512,39 +512,39 @@ class WsgiLimiterTest(BaseLimitTestSuite):
response = request.get_response(self.app)
if "X-Wait-Seconds" in response.headers:
self.assertEqual(response.status_int, 403)
self.assertEqual(403, response.status_int)
return response.headers["X-Wait-Seconds"]
self.assertEqual(response.status_int, 204)
self.assertEqual(204, response.status_int)
def test_invalid_methods(self):
# Only POSTs should work.
for method in ["GET", "PUT", "DELETE", "HEAD", "OPTIONS"]:
request = webob.Request.blank("/", method=method)
response = request.get_response(self.app)
self.assertEqual(response.status_int, 405)
self.assertEqual(405, response.status_int)
def test_good_url(self):
delay = self._request("GET", "/something")
self.assertEqual(delay, None)
self.assertEqual(None, delay)
def test_escaping(self):
delay = self._request("GET", "/something/jump%20up")
self.assertEqual(delay, None)
self.assertEqual(None, delay)
def test_response_to_delays(self):
delay = self._request("GET", "/delayed")
self.assertEqual(delay, None)
self.assertEqual(None, delay)
delay = self._request("GET", "/delayed")
self.assertAlmostEqual(float(delay), 60, 1)
def test_response_to_delays_usernames(self):
delay = self._request("GET", "/delayed", "user1")
self.assertEqual(delay, None)
self.assertEqual(None, delay)
delay = self._request("GET", "/delayed", "user2")
self.assertEqual(delay, None)
self.assertEqual(None, delay)
delay = self._request("GET", "/delayed", "user1")
self.assertAlmostEqual(float(delay), 60, 1)
@ -663,19 +663,19 @@ class WsgiLimiterProxyTest(BaseLimitTestSuite):
def test_200(self):
# Successful request test.
delay = self.proxy.check_for_delay("GET", "/anything")
self.assertEqual(delay, (None, None))
self.assertEqual((None, None), delay)
def test_403(self):
# Forbidden request test.
delay = self.proxy.check_for_delay("GET", "/delayed")
self.assertEqual(delay, (None, None))
self.assertEqual((None, None), delay)
delay, error = self.proxy.check_for_delay("GET", "/delayed")
error = error.strip()
self.assertAlmostEqual(float(delay), 60, 1)
self.assertEqual(error, "403 Forbidden\n\nOnly 1 GET request(s) can be"
" made to /delayed every minute.")
self.assertEqual("403 Forbidden\n\nOnly 1 GET request(s) can be"
" made to /delayed every minute.", error)
def tearDown(self):
# restore original HTTPConnection object

View File

@ -42,7 +42,7 @@ class TestBackupController(TestCase):
validator = jsonschema.Draft4Validator(schema)
self.assertFalse(validator.is_valid(body))
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
self.assertEqual(len(errors), 1)
self.assertEqual(1, len(errors))
self.assertIn("' ' does not match '^.*[0-9a-zA-Z]+.*$'",
errors[0].message)
@ -53,7 +53,7 @@ class TestBackupController(TestCase):
validator = jsonschema.Draft4Validator(schema)
self.assertFalse(validator.is_valid(body))
errors = sorted(validator.iter_errors(body), key=lambda e: e.path)
self.assertEqual(len(errors), 1)
self.assertEqual(1, len(errors))
self.assertIn("'$#@&?' does not match '^.*[0-9a-zA-Z]+.*$'",
errors[0].message)

View File

@ -191,9 +191,9 @@ class BackupAgentTest(testtools.TestCase):
' | gzip |'
' openssl enc -aes-256-cbc -salt '
'-pass pass:default_aes_cbc_key')
self.assertEqual(mysql_dump.cmd, str_mysql_dump_cmd)
self.assertEqual(str_mysql_dump_cmd, mysql_dump.cmd)
self.assertIsNotNone(mysql_dump.manifest)
self.assertEqual(mysql_dump.manifest, 'abc.gz.enc')
self.assertEqual('abc.gz.enc', mysql_dump.manifest)
def test_backup_impl_InnoBackupEx(self):
"""This test is for
@ -208,10 +208,10 @@ class BackupAgentTest(testtools.TestCase):
' | gzip |'
' openssl enc -aes-256-cbc -salt '
'-pass pass:default_aes_cbc_key')
self.assertEqual(inno_backup_ex.cmd, str_innobackup_cmd)
self.assertEqual(str_innobackup_cmd, inno_backup_ex.cmd)
self.assertIsNotNone(inno_backup_ex.manifest)
str_innobackup_manifest = 'innobackupex.xbstream.gz.enc'
self.assertEqual(inno_backup_ex.manifest, str_innobackup_manifest)
self.assertEqual(str_innobackup_manifest, inno_backup_ex.manifest)
def test_backup_impl_CbBackup(self):
operating_system.get_ip_address = Mock(return_value="1.1.1.1")
@ -232,14 +232,14 @@ class BackupAgentTest(testtools.TestCase):
BackupRunner.cmd = "%s"
backup_runner = BackupRunner('sample', cmd='echo command')
if backup_runner.is_zipped:
self.assertEqual(backup_runner.zip_manifest, '.gz')
self.assertEqual('.gz', backup_runner.zip_manifest)
self.assertIsNotNone(backup_runner.zip_manifest)
self.assertIsNotNone(backup_runner.zip_cmd)
self.assertEqual(backup_runner.zip_cmd, ' | gzip')
self.assertEqual(' | gzip', backup_runner.zip_cmd)
else:
self.assertIsNone(backup_runner.zip_manifest)
self.assertIsNone(backup_runner.zip_cmd)
self.assertEqual(backup_runner.backup_type, 'BackupRunner')
self.assertEqual('BackupRunner', backup_runner.backup_type)
def test_execute_backup(self):
"""This test should ensure backup agent

View File

@ -50,7 +50,7 @@ class TemplateTest(testtools.TestCase):
raise "Could not find text in template"
# Check that the last group has been rendered
memsize = found_group.split(" ")[2]
self.assertEqual(memsize, "%sM" % (8 * flavor_multiplier))
self.assertEqual("%sM" % (8 * flavor_multiplier), memsize)
self.assertIsNotNone(server_id)
self.assertTrue(server_id > 1)
@ -149,12 +149,12 @@ class HeatTemplateLoadTest(testtools.TestCase):
self.assertIsNotNone(mongo_tmpl)
self.assertIsNotNone(percona_tmpl)
self.assertIsNotNone(couchbase_tmpl)
self.assertEqual(mysql_tmpl.name, self.default)
self.assertEqual(redis_tmpl.name, self.default)
self.assertEqual(cassandra_tmpl.name, self.default)
self.assertEqual(mongo_tmpl.name, self.default)
self.assertEqual(percona_tmpl.name, self.default)
self.assertEqual(couchbase_tmpl.name, self.default)
self.assertEqual(self.default, mysql_tmpl.name)
self.assertEqual(self.default, redis_tmpl.name)
self.assertEqual(self.default, cassandra_tmpl.name)
self.assertEqual(self.default, mongo_tmpl.name)
self.assertEqual(self.default, percona_tmpl.name)
self.assertEqual(self.default, couchbase_tmpl.name)
def test_render_templates_with_ports_from_config(self):
mysql_tmpl = template.load_heat_template('mysql')

View File

@ -41,9 +41,9 @@ class TestConfigurationParser(TestCase):
parsed = cfg_parser.parse()
d_parsed = dict(parsed)
self.assertIsNotNone(d_parsed)
self.assertEqual(d_parsed["pid-file"], "/var/run/mysqld/mysqld.pid")
self.assertEqual(d_parsed["connect_timeout"], '15')
self.assertEqual(d_parsed["skip-external-locking"], '1')
self.assertEqual("/var/run/mysqld/mysqld.pid", d_parsed["pid-file"])
self.assertEqual('15', d_parsed["connect_timeout"])
self.assertEqual('1', d_parsed["skip-external-locking"])
class TestConfigurationController(TestCase):

View File

@ -27,9 +27,9 @@ class TestCapabilities(TestDatastoreBase):
def test_capability(self):
cap = Capability.load(self.capability_name)
self.assertEqual(cap.name, self.capability_name)
self.assertEqual(cap.description, self.capability_desc)
self.assertEqual(cap.enabled, self.capability_enabled)
self.assertEqual(self.capability_name, cap.name)
self.assertEqual(self.capability_desc, cap.description)
self.assertEqual(self.capability_enabled, cap.enabled)
def test_ds_capability_create_disabled(self):
self.ds_cap = CapabilityOverride.create(

View File

@ -28,4 +28,4 @@ class TestDatastore(TestDatastoreBase):
def test_load_datastore(self):
datastore = Datastore.load(self.ds_name)
self.assertEqual(datastore.name, self.ds_name)
self.assertEqual(self.ds_name, datastore.name)

View File

@ -107,4 +107,4 @@ class TestDbMigrationUtils(testtools.TestCase):
call(columns=test_columns,
refcolumns=test_refcolumns,
name='constraint2')]
self.assertEqual(mock_constraint.call_args_list, expected)
self.assertEqual(expected, mock_constraint.call_args_list)

View File

@ -80,11 +80,11 @@ class AgentHeartBeatTest(testtools.TestCase):
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(heartbeat_found)
self.assertEqual(heartbeat_found.id, heartbeat.id)
self.assertEqual(heartbeat_found.instance_id, heartbeat.instance_id)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
self.assertEqual(heartbeat.id, heartbeat_found.id)
self.assertEqual(heartbeat.instance_id, heartbeat_found.instance_id)
self.assertEqual(heartbeat.updated_at, heartbeat_found.updated_at)
self.assertEqual(heartbeat.guest_agent_version,
heartbeat_found.guest_agent_version)
def test_find_by_instance_id_none(self):
"""
@ -186,11 +186,11 @@ class AgentHeartBeatTest(testtools.TestCase):
heartbeat_found = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(heartbeat_found)
self.assertEqual(heartbeat_found.id, heartbeat.id)
self.assertEqual(heartbeat_found.instance_id, heartbeat.instance_id)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
self.assertEqual(heartbeat.id, heartbeat_found.id)
self.assertEqual(heartbeat.instance_id, heartbeat_found.instance_id)
self.assertEqual(heartbeat.updated_at, heartbeat_found.updated_at)
self.assertEqual(heartbeat.guest_agent_version,
heartbeat_found.guest_agent_version)
# update
AgentHeartBeat().update(id=heartbeat_found.id,
@ -201,9 +201,9 @@ class AgentHeartBeatTest(testtools.TestCase):
updated_heartbeat = AgentHeartBeat.find_by_instance_id(
instance_id=instance_id)
self.assertIsNotNone(updated_heartbeat)
self.assertEqual(updated_heartbeat.id, heartbeat.id)
self.assertEqual(updated_heartbeat.instance_id, heartbeat.instance_id)
self.assertEqual(
heartbeat_found.guest_agent_version, heartbeat.guest_agent_version)
self.assertEqual(heartbeat.id, updated_heartbeat.id)
self.assertEqual(heartbeat.instance_id, updated_heartbeat.instance_id)
self.assertEqual(heartbeat.guest_agent_version,
updated_heartbeat.guest_agent_version)
self.assertEqual(heartbeat_found.updated_at, heartbeat.updated_at)
self.assertEqual(heartbeat.updated_at, updated_heartbeat.updated_at)

View File

@ -93,16 +93,16 @@ class GuestAgentBackupTest(testtools.TestCase):
backupBase.BackupRunner.is_encrypted = False
RunnerClass = utils.import_class(BACKUP_XTRA_CLS)
bkup = RunnerClass(12345, extra_opts="")
self.assertEqual(bkup.command, XTRA_BACKUP + PIPE + ZIP)
self.assertEqual(bkup.manifest, "12345.xbstream.gz")
self.assertEqual(XTRA_BACKUP + PIPE + ZIP, bkup.command)
self.assertEqual("12345.xbstream.gz", bkup.manifest)
def test_backup_decrypted_xtrabackup_with_extra_opts_command(self):
backupBase.BackupRunner.is_zipped = True
backupBase.BackupRunner.is_encrypted = False
RunnerClass = utils.import_class(BACKUP_XTRA_CLS)
bkup = RunnerClass(12345, extra_opts="--no-lock")
self.assertEqual(bkup.command, XTRA_BACKUP_EXTRA_OPTS + PIPE + ZIP)
self.assertEqual(bkup.manifest, "12345.xbstream.gz")
self.assertEqual(XTRA_BACKUP_EXTRA_OPTS + PIPE + ZIP, bkup.command)
self.assertEqual("12345.xbstream.gz", bkup.manifest)
def test_backup_encrypted_xtrabackup_command(self):
backupBase.BackupRunner.is_zipped = True
@ -110,9 +110,9 @@ class GuestAgentBackupTest(testtools.TestCase):
backupBase.BackupRunner.encrypt_key = CRYPTO_KEY
RunnerClass = utils.import_class(BACKUP_XTRA_CLS)
bkup = RunnerClass(12345, extra_opts="")
self.assertEqual(bkup.command,
XTRA_BACKUP + PIPE + ZIP + PIPE + ENCRYPT)
self.assertEqual(bkup.manifest, "12345.xbstream.gz.enc")
self.assertEqual(XTRA_BACKUP + PIPE + ZIP + PIPE + ENCRYPT,
bkup.command)
self.assertEqual("12345.xbstream.gz.enc", bkup.manifest)
def test_backup_xtrabackup_incremental(self):
backupBase.BackupRunner.is_zipped = True
@ -150,16 +150,16 @@ class GuestAgentBackupTest(testtools.TestCase):
backupBase.BackupRunner.is_encrypted = False
RunnerClass = utils.import_class(BACKUP_SQLDUMP_CLS)
bkup = RunnerClass(12345, extra_opts="")
self.assertEqual(bkup.command, SQLDUMP_BACKUP + PIPE + ZIP)
self.assertEqual(bkup.manifest, "12345.gz")
self.assertEqual(SQLDUMP_BACKUP + PIPE + ZIP, bkup.command)
self.assertEqual("12345.gz", bkup.manifest)
def test_backup_decrypted_mysqldump_with_extra_opts_command(self):
backupBase.BackupRunner.is_zipped = True
backupBase.BackupRunner.is_encrypted = False
RunnerClass = utils.import_class(BACKUP_SQLDUMP_CLS)
bkup = RunnerClass(12345, extra_opts="--events --routines --triggers")
self.assertEqual(bkup.command, SQLDUMP_BACKUP_EXTRA_OPTS + PIPE + ZIP)
self.assertEqual(bkup.manifest, "12345.gz")
self.assertEqual(SQLDUMP_BACKUP_EXTRA_OPTS + PIPE + ZIP, bkup.command)
self.assertEqual("12345.gz", bkup.manifest)
def test_backup_encrypted_mysqldump_command(self):
backupBase.BackupRunner.is_zipped = True
@ -168,9 +168,9 @@ class GuestAgentBackupTest(testtools.TestCase):
RunnerClass = utils.import_class(BACKUP_SQLDUMP_CLS)
bkup = RunnerClass(12345, user="user",
password="password", extra_opts="")
self.assertEqual(bkup.command,
SQLDUMP_BACKUP + PIPE + ZIP + PIPE + ENCRYPT)
self.assertEqual(bkup.manifest, "12345.gz.enc")
self.assertEqual(SQLDUMP_BACKUP + PIPE + ZIP + PIPE + ENCRYPT,
bkup.command)
self.assertEqual("12345.gz.enc", bkup.manifest)
def test_restore_decrypted_xtrabackup_command(self):
restoreBase.RestoreRunner.is_zipped = True
@ -178,8 +178,8 @@ class GuestAgentBackupTest(testtools.TestCase):
RunnerClass = utils.import_class(RESTORE_XTRA_CLS)
restr = RunnerClass(None, restore_location="/var/lib/mysql",
location="filename", checksum="md5")
self.assertEqual(restr.restore_cmd, UNZIP + PIPE + XTRA_RESTORE)
self.assertEqual(restr.prepare_cmd, PREPARE)
self.assertEqual(UNZIP + PIPE + XTRA_RESTORE, restr.restore_cmd)
self.assertEqual(PREPARE, restr.prepare_cmd)
def test_restore_encrypted_xtrabackup_command(self):
restoreBase.RestoreRunner.is_zipped = True
@ -188,9 +188,9 @@ class GuestAgentBackupTest(testtools.TestCase):
RunnerClass = utils.import_class(RESTORE_XTRA_CLS)
restr = RunnerClass(None, restore_location="/var/lib/mysql",
location="filename", checksum="md5")
self.assertEqual(restr.restore_cmd,
DECRYPT + PIPE + UNZIP + PIPE + XTRA_RESTORE)
self.assertEqual(restr.prepare_cmd, PREPARE)
self.assertEqual(DECRYPT + PIPE + UNZIP + PIPE + XTRA_RESTORE,
restr.restore_cmd)
self.assertEqual(PREPARE, restr.prepare_cmd)
def test_restore_xtrabackup_incremental_prepare_command(self):
RunnerClass = utils.import_class(RESTORE_XTRA_INCR_CLS)
@ -244,7 +244,7 @@ class GuestAgentBackupTest(testtools.TestCase):
RunnerClass = utils.import_class(RESTORE_SQLDUMP_CLS)
restr = RunnerClass(None, restore_location="/var/lib/mysql",
location="filename", checksum="md5")
self.assertEqual(restr.restore_cmd, UNZIP + PIPE + SQLDUMP_RESTORE)
self.assertEqual(UNZIP + PIPE + SQLDUMP_RESTORE, restr.restore_cmd)
def test_restore_encrypted_mysqldump_command(self):
restoreBase.RestoreRunner.is_zipped = True
@ -253,8 +253,8 @@ class GuestAgentBackupTest(testtools.TestCase):
RunnerClass = utils.import_class(RESTORE_SQLDUMP_CLS)
restr = RunnerClass(None, restore_location="/var/lib/mysql",
location="filename", checksum="md5")
self.assertEqual(restr.restore_cmd,
DECRYPT + PIPE + UNZIP + PIPE + SQLDUMP_RESTORE)
self.assertEqual(DECRYPT + PIPE + UNZIP + PIPE + SQLDUMP_RESTORE,
restr.restore_cmd)
def test_backup_encrypted_cbbackup_command(self):
backupBase.BackupRunner.is_encrypted = True
@ -283,7 +283,7 @@ class GuestAgentBackupTest(testtools.TestCase):
RunnerClass = utils.import_class(RESTORE_CBBACKUP_CLS)
restr = RunnerClass(None, restore_location="/tmp",
location="filename", checksum="md5")
self.assertEqual(restr.restore_cmd, UNZIP + PIPE + CBBACKUP_RESTORE)
self.assertEqual(UNZIP + PIPE + CBBACKUP_RESTORE, restr.restore_cmd)
def test_restore_encrypted_cbbackup_command(self):
restoreBase.RestoreRunner.is_zipped = True
@ -292,8 +292,8 @@ class GuestAgentBackupTest(testtools.TestCase):
RunnerClass = utils.import_class(RESTORE_CBBACKUP_CLS)
restr = RunnerClass(None, restore_location="/tmp",
location="filename", checksum="md5")
self.assertEqual(restr.restore_cmd,
DECRYPT + PIPE + UNZIP + PIPE + CBBACKUP_RESTORE)
self.assertEqual(DECRYPT + PIPE + UNZIP + PIPE + CBBACKUP_RESTORE,
restr.restore_cmd)
class CouchbaseBackupTests(testtools.TestCase):

View File

@ -265,38 +265,38 @@ class RevokeTest(QueryTestBase):
def test_defaults(self):
r = sql_query.Revoke()
# Technically, this isn't valid for MySQL.
self.assertEqual(str(r), "REVOKE ALL ON *.* FROM ``@`%`;")
self.assertEqual("REVOKE ALL ON *.* FROM ``@`%`;", str(r))
def test_permissions(self):
r = sql_query.Revoke()
r.user = 'x'
r.permissions = ['CREATE', 'DELETE', 'DROP']
self.assertEqual(str(r),
"REVOKE CREATE, DELETE, DROP ON *.* FROM `x`@`%`;")
self.assertEqual("REVOKE CREATE, DELETE, DROP ON *.* FROM `x`@`%`;",
str(r))
def test_database(self):
r = sql_query.Revoke()
r.user = 'x'
r.database = 'foo'
self.assertEqual(str(r), "REVOKE ALL ON `foo`.* FROM `x`@`%`;")
self.assertEqual("REVOKE ALL ON `foo`.* FROM `x`@`%`;", str(r))
def test_table(self):
r = sql_query.Revoke()
r.user = 'x'
r.database = 'foo'
r.table = 'bar'
self.assertEqual(str(r), "REVOKE ALL ON `foo`.'bar' FROM `x`@`%`;")
self.assertEqual("REVOKE ALL ON `foo`.'bar' FROM `x`@`%`;", str(r))
def test_user(self):
r = sql_query.Revoke()
r.user = 'x'
self.assertEqual(str(r), "REVOKE ALL ON *.* FROM `x`@`%`;")
self.assertEqual("REVOKE ALL ON *.* FROM `x`@`%`;", str(r))
def test_user_host(self):
r = sql_query.Revoke()
r.user = 'x'
r.host = 'y'
self.assertEqual(str(r), "REVOKE ALL ON *.* FROM `x`@`y`;")
self.assertEqual("REVOKE ALL ON *.* FROM `x`@`y`;", str(r))
class CreateDatabaseTest(QueryTestBase):
@ -308,19 +308,19 @@ class CreateDatabaseTest(QueryTestBase):
def test_defaults(self):
cd = sql_query.CreateDatabase('foo')
self.assertEqual(str(cd), "CREATE DATABASE IF NOT EXISTS `foo`;")
self.assertEqual("CREATE DATABASE IF NOT EXISTS `foo`;", str(cd))
def test_charset(self):
cd = sql_query.CreateDatabase('foo')
cd.charset = "foo"
self.assertEqual(str(cd), ("CREATE DATABASE IF NOT EXISTS `foo` "
"CHARACTER SET = 'foo';"))
self.assertEqual(("CREATE DATABASE IF NOT EXISTS `foo` "
"CHARACTER SET = 'foo';"), str(cd))
def test_collate(self):
cd = sql_query.CreateDatabase('foo')
cd.collate = "bar"
self.assertEqual(str(cd), ("CREATE DATABASE IF NOT EXISTS `foo` "
"COLLATE = 'bar';"))
self.assertEqual(("CREATE DATABASE IF NOT EXISTS `foo` "
"COLLATE = 'bar';"), str(cd))
class DropDatabaseTest(QueryTestBase):
@ -332,7 +332,7 @@ class DropDatabaseTest(QueryTestBase):
def test_defaults(self):
dd = sql_query.DropDatabase('foo')
self.assertEqual(str(dd), "DROP DATABASE `foo`;")
self.assertEqual("DROP DATABASE `foo`;", str(dd))
class CreateUserTest(QueryTestBase):
@ -347,8 +347,8 @@ class CreateUserTest(QueryTestBase):
hostname = 'localhost'
password = 'password123'
cu = sql_query.CreateUser(user=username, host=hostname, clear=password)
self.assertEqual(str(cu), "CREATE USER :user@:host "
"IDENTIFIED BY 'password123';")
self.assertEqual("CREATE USER :user@:host "
"IDENTIFIED BY 'password123';", str(cu))
class UpdateUserTest(QueryTestBase):
@ -364,9 +364,9 @@ class UpdateUserTest(QueryTestBase):
new_user = 'root123'
uu = sql_query.UpdateUser(user=username, host=hostname,
new_user=new_user)
self.assertEqual(str(uu), "UPDATE mysql.user SET User='root123' "
"WHERE User = 'root' "
"AND Host = 'localhost';")
self.assertEqual("UPDATE mysql.user SET User='root123' "
"WHERE User = 'root' "
"AND Host = 'localhost';", str(uu))
def test_change_password(self):
username = 'root'
@ -374,10 +374,10 @@ class UpdateUserTest(QueryTestBase):
new_password = 'password123'
uu = sql_query.UpdateUser(user=username, host=hostname,
clear=new_password)
self.assertEqual(str(uu), "UPDATE mysql.user SET "
"Password=PASSWORD('password123') "
"WHERE User = 'root' "
"AND Host = 'localhost';")
self.assertEqual("UPDATE mysql.user SET "
"Password=PASSWORD('password123') "
"WHERE User = 'root' "
"AND Host = 'localhost';", str(uu))
def test_change_host(self):
username = 'root'
@ -385,9 +385,9 @@ class UpdateUserTest(QueryTestBase):
new_host = '%'
uu = sql_query.UpdateUser(user=username, host=hostname,
new_host=new_host)
self.assertEqual(str(uu), "UPDATE mysql.user SET Host='%' "
"WHERE User = 'root' "
"AND Host = 'localhost';")
self.assertEqual("UPDATE mysql.user SET Host='%' "
"WHERE User = 'root' "
"AND Host = 'localhost';", str(uu))
def test_change_password_and_username(self):
username = 'root'
@ -396,10 +396,10 @@ class UpdateUserTest(QueryTestBase):
new_password = 'password123'
uu = sql_query.UpdateUser(user=username, host=hostname,
clear=new_password, new_user=new_user)
self.assertEqual(str(uu), "UPDATE mysql.user SET User='root123', "
"Password=PASSWORD('password123') "
"WHERE User = 'root' "
"AND Host = 'localhost';")
self.assertEqual("UPDATE mysql.user SET User='root123', "
"Password=PASSWORD('password123') "
"WHERE User = 'root' "
"AND Host = 'localhost';", str(uu))
def test_change_username_password_hostname(self):
username = 'root'
@ -410,11 +410,11 @@ class UpdateUserTest(QueryTestBase):
uu = sql_query.UpdateUser(user=username, host=hostname,
clear=new_password, new_user=new_user,
new_host=new_host)
self.assertEqual(str(uu), "UPDATE mysql.user SET User='root123', "
"Host='%', "
"Password=PASSWORD('password123') "
"WHERE User = 'root' "
"AND Host = 'localhost';")
self.assertEqual("UPDATE mysql.user SET User='root123', "
"Host='%', "
"Password=PASSWORD('password123') "
"WHERE User = 'root' "
"AND Host = 'localhost';", str(uu))
def test_change_username_and_hostname(self):
username = 'root'
@ -423,10 +423,10 @@ class UpdateUserTest(QueryTestBase):
new_host = '%'
uu = sql_query.UpdateUser(user=username, host=hostname,
new_host=new_host, new_user=new_user)
self.assertEqual(str(uu), "UPDATE mysql.user SET User='root123', "
"Host='%' "
"WHERE User = 'root' "
"AND Host = 'localhost';")
self.assertEqual("UPDATE mysql.user SET User='root123', "
"Host='%' "
"WHERE User = 'root' "
"AND Host = 'localhost';", str(uu))
class DropUserTest(QueryTestBase):
@ -440,4 +440,4 @@ class DropUserTest(QueryTestBase):
username = 'root'
hostname = 'localhost'
du = sql_query.DropUser(user=username, host=hostname)
self.assertEqual(str(du), "DROP USER `root`@`localhost`;")
self.assertEqual("DROP USER `root`@`localhost`;", str(du))

View File

@ -96,7 +96,7 @@ class RedisGuestAgentManagerTest(testtools.TestCase):
overrides=None,
cluster_config=None)
self.assertEqual(redis_service.RedisAppStatus.get.call_count, 2)
self.assertEqual(2, redis_service.RedisAppStatus.get.call_count)
mock_status.begin_install.assert_any_call()
VolumeDevice.format.assert_any_call()
redis_service.RedisApp.install_if_needed.assert_any_call(self.packages)

View File

@ -147,8 +147,9 @@ class SecurityGroupDeleteTest(testtools.TestCase):
sec_mod.SecurityGroupInstanceAssociation.find_by = Mock(
return_value=new_fake_RemoteSecGrAssoc())
self.assertEqual(sec_mod.SecurityGroup.delete_for_instance(
i_id, self.context), None)
self.assertEqual(None,
sec_mod.SecurityGroup.delete_for_instance(
i_id, self.context))
def test_delete_secgr_assoc_with_db_exception(self):