Merge "Clean up comparison assertions"
This commit is contained in:
commit
1a8800c188
|
@ -35,6 +35,7 @@ import mox
|
|||
from oslo.config import cfg
|
||||
import stubout
|
||||
import testtools
|
||||
from testtools import matchers
|
||||
|
||||
from cinder.common import config # Need to register global_opts
|
||||
from cinder.db import migration
|
||||
|
@ -267,3 +268,25 @@ class TestCase(testtools.TestCase):
|
|||
'd1value': d1value,
|
||||
'd2value': d2value,
|
||||
})
|
||||
|
||||
def assertGreater(self, first, second, msg=None):
|
||||
"""Python < v2.7 compatibility. Assert 'first' > 'second'"""
|
||||
try:
|
||||
f = super(TestCase, self).assertGreater
|
||||
except AttributeError:
|
||||
self.assertThat(first,
|
||||
matchers.GreaterThan(second),
|
||||
message=msg or '')
|
||||
else:
|
||||
f(first, second, msg=msg)
|
||||
|
||||
def assertGreaterEqual(self, first, second, msg=None):
|
||||
"""Python < v2.7 compatibility. Assert 'first' >= 'second'"""
|
||||
try:
|
||||
f = super(TestCase, self).assertGreaterEqual
|
||||
except AttributeError:
|
||||
self.assertThat(first,
|
||||
matchers.Not(matchers.LessThan(second)),
|
||||
message=msg or '')
|
||||
else:
|
||||
f(first, second, msg=msg)
|
||||
|
|
|
@ -119,7 +119,7 @@ class ExtensionControllerTest(ExtensionTestCase):
|
|||
|
||||
# Make sure we have all the extensions, extras extensions being OK.
|
||||
exts = root.findall('{0}extension'.format(NS))
|
||||
self.assertTrue(len(exts) >= len(self.ext_list))
|
||||
self.assertGreaterEqual(len(exts), len(self.ext_list))
|
||||
|
||||
# Make sure that at least Fox in Sox is correct.
|
||||
(fox_ext, ) = [x for x in exts if x.get('alias') == 'FOXNSOX']
|
||||
|
|
|
@ -605,8 +605,8 @@ class LocalConnectorTestCase(test.TestCase):
|
|||
self.connector = connector.LocalConnector(None)
|
||||
cprops = self.connection_properties
|
||||
dev_info = self.connector.connect_volume(cprops)
|
||||
self.assertTrue(dev_info['type'] == 'local')
|
||||
self.assertTrue(dev_info['path'] == cprops['device_path'])
|
||||
self.assertEqual(dev_info['type'], 'local')
|
||||
self.assertEqual(dev_info['path'], cprops['device_path'])
|
||||
|
||||
def test_connect_volume_with_invalid_connection_data(self):
|
||||
self.connector = connector.LocalConnector(None)
|
||||
|
|
|
@ -358,7 +358,7 @@ class BackupTestCase(test.TestCase):
|
|||
ctxt_read_deleted = context.get_admin_context('yes')
|
||||
backup = db.backup_get(ctxt_read_deleted, backup_id)
|
||||
self.assertEqual(backup.deleted, True)
|
||||
self.assertTrue(timeutils.utcnow() > backup.deleted_at)
|
||||
self.assertGreater(timeutils.utcnow(), backup.deleted_at)
|
||||
self.assertEqual(backup.status, 'deleted')
|
||||
|
||||
def test_list_backup(self):
|
||||
|
|
|
@ -186,7 +186,7 @@ class BackupCephTestCase(test.TestCase):
|
|||
|
||||
self.stubs.Set(self.service.rbd.Image, 'list_snaps', list_snaps)
|
||||
snaps = self.service.get_backup_snaps(self.service.rbd.Image())
|
||||
self.assertTrue(len(snaps) == 3)
|
||||
self.assertEqual(len(snaps), 3)
|
||||
|
||||
def test_transfer_data_from_rbd_to_file(self):
|
||||
self._set_common_backup_stubs(self.service)
|
||||
|
@ -409,12 +409,12 @@ class BackupCephTestCase(test.TestCase):
|
|||
self.stubs.Set(self.service.rbd.Image, 'discard', _setter)
|
||||
|
||||
self.service._discard_bytes(mock_rbd(), 123456, 0)
|
||||
self.assertTrue(len(calls) == 0)
|
||||
self.assertEqual(len(calls), 0)
|
||||
|
||||
image = mock_rbd().Image()
|
||||
wrapped_rbd = self._get_wrapped_rbd_io(image)
|
||||
self.service._discard_bytes(wrapped_rbd, 123456, 1234)
|
||||
self.assertTrue(len(calls) == 1)
|
||||
self.assertEqual(len(calls), 1)
|
||||
|
||||
self.stubs.Set(image, 'write', _setter)
|
||||
wrapped_rbd = self._get_wrapped_rbd_io(image)
|
||||
|
@ -422,7 +422,7 @@ class BackupCephTestCase(test.TestCase):
|
|||
lambda *args: False)
|
||||
self.service._discard_bytes(wrapped_rbd, 0,
|
||||
self.service.chunk_size * 2)
|
||||
self.assertTrue(len(calls) == 3)
|
||||
self.assertEqual(len(calls), 3)
|
||||
|
||||
def test_delete_backup_snapshot(self):
|
||||
snap_name = 'backup.%s.snap.3824923.1412' % (uuid.uuid4())
|
||||
|
|
|
@ -193,7 +193,7 @@ class GPFSDriverTestCase(test.TestCase):
|
|||
self.volume.create_volume(self.context, volume_src['id'])
|
||||
snapCount = len(db.snapshot_get_all_for_volume(self.context,
|
||||
volume_src['id']))
|
||||
self.assertTrue(snapCount == 0)
|
||||
self.assertEqual(snapCount, 0)
|
||||
snapshot = self._create_snapshot(volume_src['id'])
|
||||
snapshot_id = snapshot['id']
|
||||
self.volume.create_snapshot(self.context, volume_src['id'],
|
||||
|
@ -202,14 +202,14 @@ class GPFSDriverTestCase(test.TestCase):
|
|||
snapshot['name'])))
|
||||
snapCount = len(db.snapshot_get_all_for_volume(self.context,
|
||||
volume_src['id']))
|
||||
self.assertTrue(snapCount == 1)
|
||||
self.assertEqual(snapCount, 1)
|
||||
self.volume.delete_snapshot(self.context, snapshot_id)
|
||||
self.volume.delete_volume(self.context, volume_src['id'])
|
||||
self.assertFalse(os.path.exists(os.path.join(self.volumes_path,
|
||||
snapshot['name'])))
|
||||
snapCount = len(db.snapshot_get_all_for_volume(self.context,
|
||||
volume_src['id']))
|
||||
self.assertTrue(snapCount == 0)
|
||||
self.assertEqual(snapCount, 0)
|
||||
|
||||
def test_create_volume_from_snapshot(self):
|
||||
volume_src = test_utils.create_volume(self.context, host=CONF.host)
|
||||
|
|
|
@ -189,7 +189,7 @@ class HUSiSCSIDriverTest(test.TestCase):
|
|||
stats = self.driver.get_volume_stats(True)
|
||||
self.assertEqual(stats["vendor_name"], "HDS")
|
||||
self.assertEqual(stats["storage_protocol"], "iSCSI")
|
||||
self.assertTrue(stats["total_capacity_gb"] > 0)
|
||||
self.assertGreater(stats["total_capacity_gb"], 0)
|
||||
|
||||
def test_create_volume(self):
|
||||
loc = self.driver.create_volume(_VOLUME)
|
||||
|
@ -214,7 +214,7 @@ class HUSiSCSIDriverTest(test.TestCase):
|
|||
num_luns_before = len(SimulatedHusBackend.alloc_lun)
|
||||
self.driver.delete_volume(vol)
|
||||
num_luns_after = len(SimulatedHusBackend.alloc_lun)
|
||||
self.assertTrue(num_luns_before > num_luns_after)
|
||||
self.assertGreater(num_luns_before, num_luns_after)
|
||||
|
||||
def test_extend_volume(self):
|
||||
vol = self.test_create_volume()
|
||||
|
@ -261,7 +261,7 @@ class HUSiSCSIDriverTest(test.TestCase):
|
|||
num_luns_before = len(SimulatedHusBackend.alloc_lun)
|
||||
self.driver.delete_snapshot(svol)
|
||||
num_luns_after = len(SimulatedHusBackend.alloc_lun)
|
||||
self.assertTrue(num_luns_before > num_luns_after)
|
||||
self.assertGreater(num_luns_before, num_luns_after)
|
||||
|
||||
def test_create_volume_from_snapshot(self):
|
||||
svol = self.test_create_snapshot()
|
||||
|
@ -293,4 +293,4 @@ class HUSiSCSIDriverTest(test.TestCase):
|
|||
num_conn_before = len(SimulatedHusBackend.connections)
|
||||
self.driver.terminate_connection(vol, conn)
|
||||
num_conn_after = len(SimulatedHusBackend.connections)
|
||||
self.assertTrue(num_conn_before > num_conn_after)
|
||||
self.assertGreater(num_conn_before, num_conn_after)
|
||||
|
|
|
@ -1079,7 +1079,7 @@ class TestHP3PARISCSIDriver(HP3PARBaseDriver, test.TestCase):
|
|||
self.mox.ReplayAll()
|
||||
|
||||
ports = self.driver.common.get_ports()['members']
|
||||
self.assertTrue(len(ports) == 3)
|
||||
self.assertEqual(len(ports), 3)
|
||||
|
||||
def test_get_iscsi_ip_active(self):
|
||||
self.flags(lock_path=self.tempdir)
|
||||
|
|
|
@ -265,7 +265,8 @@ class TestMigrations(test.TestCase):
|
|||
total = connection.execute("SELECT count(*) "
|
||||
"from information_schema.TABLES "
|
||||
"where TABLE_SCHEMA='openstack_citest'")
|
||||
self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
|
||||
self.assertGreater(total.scalar(), 0,
|
||||
msg="No tables found. Wrong schema?")
|
||||
|
||||
noninnodb = connection.execute("SELECT count(*) "
|
||||
"from information_schema.TABLES "
|
||||
|
|
Loading…
Reference in New Issue