Remove custom test assertions.

Now that we testtools, We can use stock testtools assert methods
instead of defining our own. As part of removing them, we add Matcher
classes to use in the testtools.assertThat method. testtools matchers can
be arbitrarily combined and chained, so give more flexibility than plain
asserts

Related to blueprint grizzly-testtools.

Change-Id: I26d1dbac8dc3c322eb55c96c48330c0e38636107
This commit is contained in:
Monty Taylor
2012-10-25 17:23:53 -07:00
parent 827998fd9a
commit bc8ae2ae70
8 changed files with 46 additions and 117 deletions

View File

@@ -141,86 +141,3 @@ class TestCase(testtools.TestCase):
svc.start()
self._services.append(svc)
return svc
# Useful assertions
def assertDictMatch(self, d1, d2, approx_equal=False, tolerance=0.001):
"""Assert two dicts are equivalent.
This is a 'deep' match in the sense that it handles nested
dictionaries appropriately.
NOTE:
If you don't care (or don't know) a given value, you can specify
the string DONTCARE as the value. This will cause that dict-item
to be skipped.
"""
def raise_assertion(msg):
d1str = str(d1)
d2str = str(d2)
base_msg = ('Dictionaries do not match. %(msg)s d1: %(d1str)s '
'd2: %(d2str)s' % locals())
raise AssertionError(base_msg)
d1keys = set(d1.keys())
d2keys = set(d2.keys())
if d1keys != d2keys:
d1only = d1keys - d2keys
d2only = d2keys - d1keys
raise_assertion('Keys in d1 and not d2: %(d1only)s. '
'Keys in d2 and not d1: %(d2only)s' % locals())
for key in d1keys:
d1value = d1[key]
d2value = d2[key]
try:
error = abs(float(d1value) - float(d2value))
within_tolerance = error <= tolerance
except (ValueError, TypeError):
# If both values aren't convertible to float, just ignore
# ValueError if arg is a str, TypeError if it's something else
# (like None)
within_tolerance = False
if hasattr(d1value, 'keys') and hasattr(d2value, 'keys'):
self.assertDictMatch(d1value, d2value)
elif 'DONTCARE' in (d1value, d2value):
continue
elif approx_equal and within_tolerance:
continue
elif d1value != d2value:
raise_assertion("d1['%(key)s']=%(d1value)s != "
"d2['%(key)s']=%(d2value)s" % locals())
def assertDictListMatch(self, L1, L2, approx_equal=False, tolerance=0.001):
"""Assert a list of dicts are equivalent."""
def raise_assertion(msg):
L1str = str(L1)
L2str = str(L2)
base_msg = ('List of dictionaries do not match: %(msg)s '
'L1: %(L1str)s L2: %(L2str)s' % locals())
raise AssertionError(base_msg)
L1count = len(L1)
L2count = len(L2)
if L1count != L2count:
raise_assertion('Length mismatch: len(L1)=%(L1count)d != '
'len(L2)=%(L2count)d' % locals())
for d1, d2 in zip(L1, L2):
self.assertDictMatch(d1, d2, approx_equal=approx_equal,
tolerance=tolerance)
def assertSubDictMatch(self, sub_dict, super_dict):
"""Assert a sub_dict is subset of super_dict."""
self.assertEqual(True,
set(sub_dict.keys()).issubset(set(super_dict.keys())))
for k, sub_value in sub_dict.items():
super_value = super_dict[k]
if isinstance(sub_value, dict):
self.assertSubDictMatch(sub_value, super_value)
elif 'DONTCARE' in (sub_value, super_value):
continue
else:
self.assertEqual(sub_value, super_value)

View File

@@ -24,6 +24,7 @@ from nova import exception
from nova.openstack.common import timeutils
from nova.scheduler import host_manager
from nova import test
from nova.tests import matchers
from nova.tests.scheduler import fakes
@@ -92,7 +93,7 @@ class HostManagerTestCase(test.TestCase):
def test_update_service_capabilities(self):
service_states = self.host_manager.service_states
self.assertDictMatch(service_states, {})
self.assertEqual(len(service_states.keys()), 0)
self.mox.StubOutWithMock(timeutils, 'utcnow')
timeutils.utcnow().AndReturn(31337)
timeutils.utcnow().AndReturn(31339)
@@ -116,11 +117,11 @@ class HostManagerTestCase(test.TestCase):
expected = {('host1', 'node1'): host1_compute_capabs,
('host2', 'node2'): host2_compute_capabs}
self.assertDictMatch(service_states, expected)
self.assertThat(service_states, matchers.DictMatches(expected))
def test_update_service_capabilities_node_key(self):
service_states = self.host_manager.service_states
self.assertDictMatch(service_states, {})
self.assertThat(service_states, matchers.DictMatches({}))
host1_cap = {'hypervisor_hostname': 'host1-hvhn'}
host2_cap = {}
@@ -135,7 +136,7 @@ class HostManagerTestCase(test.TestCase):
host2_cap['timestamp'] = 31338
expected = {('host1', 'host1-hvhn'): host1_cap,
('host2', None): host2_cap}
self.assertDictMatch(service_states, expected)
self.assertThat(service_states, matchers.DictMatches(expected))
def test_get_all_host_states(self):

View File

@@ -19,6 +19,7 @@ from nova import context
from nova.scheduler import host_manager
from nova.scheduler import least_cost
from nova import test
from nova.tests import matchers
from nova.tests.scheduler import fakes
@@ -92,11 +93,11 @@ class TestWeightedHost(test.TestCase):
def test_dict_conversion_without_host_state(self):
host = least_cost.WeightedHost('someweight')
expected = {'weight': 'someweight'}
self.assertDictMatch(host.to_dict(), expected)
self.assertThat(host.to_dict(), matchers.DictMatches(expected))
def test_dict_conversion_with_host_state(self):
host_state = host_manager.HostState('somehost', None)
host = least_cost.WeightedHost('someweight', host_state)
expected = {'weight': 'someweight',
'host': 'somehost'}
self.assertDictMatch(host.to_dict(), expected)
self.assertThat(host.to_dict(), matchers.DictMatches(expected))

View File

@@ -36,6 +36,7 @@ from nova.openstack.common import timeutils
from nova.scheduler import driver
from nova.scheduler import manager
from nova import test
from nova.tests import matchers
from nova.tests.scheduler import fakes
from nova import utils
@@ -138,7 +139,7 @@ class SchedulerManagerTestCase(test.TestCase):
'local_gb_used': 512,
'memory_mb': 1024,
'memory_mb_used': 512}}
self.assertDictMatch(result, expected)
self.assertThat(result, matchers.DictMatches(expected))
def _mox_schedule_method_helper(self, method_name):
# Make sure the method exists that we're going to test call
@@ -721,7 +722,7 @@ class SchedulerDriverModuleTestCase(test.TestCase):
result = driver.encode_instance(instance, True)
expected = {'id': instance['id'], '_is_precooked': False}
self.assertDictMatch(result, expected)
self.assertThat(result, matchers.DictMatches(expected))
# Orig dict not changed
self.assertNotEqual(result, instance)
@@ -729,6 +730,6 @@ class SchedulerDriverModuleTestCase(test.TestCase):
expected = {}
expected.update(instance)
expected['_is_precooked'] = True
self.assertDictMatch(result, expected)
self.assertThat(result, matchers.DictMatches(expected))
# Orig dict not changed
self.assertNotEqual(result, instance)

View File

@@ -41,6 +41,7 @@ from nova import exception
from nova import flags
from nova.openstack.common import timeutils
from nova import test
from nova.tests import matchers
FLAGS = flags.FLAGS
@@ -163,7 +164,7 @@ class Ec2utilsTestCase(test.TestCase):
'virtual_name': 'ephemeral0'}}}
out_dict = ec2utils.dict_from_dotted_str(in_str)
self.assertDictMatch(out_dict, expected_dict)
self.assertThat(out_dict, matchers.DictMatches(expected_dict))
def test_properties_root_defice_name(self):
mappings = [{"device": "/dev/sda1", "virtual": "root"}]
@@ -209,8 +210,8 @@ class Ec2utilsTestCase(test.TestCase):
'device': '/dev/sdc1'},
{'virtual': 'ephemeral1',
'device': '/dev/sdc1'}]
self.assertDictListMatch(block_device.mappings_prepend_dev(mappings),
expected_result)
self.assertThat(block_device.mappings_prepend_dev(mappings),
matchers.DictListMatches(expected_result))
class ApiEc2TestCase(test.TestCase):

View File

@@ -28,6 +28,7 @@ from nova import exception
from nova import flags
from nova.openstack.common import timeutils
from nova import test
from nova.tests import matchers
from nova import utils
@@ -619,14 +620,16 @@ class AggregateDBApiTestCase(test.TestCase):
ctxt = context.get_admin_context()
result = _create_aggregate(context=ctxt)
expected_metadata = db.aggregate_metadata_get(ctxt, result['id'])
self.assertDictMatch(expected_metadata, _get_fake_aggr_metadata())
self.assertThat(expected_metadata,
matchers.DictMatches(_get_fake_aggr_metadata()))
def test_aggregate_create_delete_create_with_metadata(self):
"""Ensure aggregate metadata is deleted bug 1052479."""
ctxt = context.get_admin_context()
result = _create_aggregate(context=ctxt)
expected_metadata = db.aggregate_metadata_get(ctxt, result['id'])
self.assertDictMatch(expected_metadata, _get_fake_aggr_metadata())
self.assertThat(expected_metadata,
matchers.DictMatches(_get_fake_aggr_metadata()))
db.aggregate_delete(ctxt, result['id'])
result = _create_aggregate(metadata=None)
expected_metadata = db.aggregate_metadata_get(ctxt, result['id'])
@@ -748,7 +751,8 @@ class AggregateDBApiTestCase(test.TestCase):
values['metadata'] = _get_fake_aggr_metadata()
db.aggregate_update(ctxt, 1, values)
expected = db.aggregate_metadata_get(ctxt, result.id)
self.assertDictMatch(_get_fake_aggr_metadata(), expected)
self.assertThat(_get_fake_aggr_metadata(),
matchers.DictMatches(expected))
def test_aggregate_update_with_existing_metadata(self):
"""Ensure an aggregate can be updated with existing metadata."""
@@ -759,7 +763,7 @@ class AggregateDBApiTestCase(test.TestCase):
values['metadata']['fake_key1'] = 'foo'
db.aggregate_update(ctxt, 1, values)
expected = db.aggregate_metadata_get(ctxt, result.id)
self.assertDictMatch(values['metadata'], expected)
self.assertThat(values['metadata'], matchers.DictMatches(expected))
def test_aggregate_update_raise_not_found(self):
"""Ensure AggregateNotFound is raised when updating an aggregate."""
@@ -805,7 +809,7 @@ class AggregateDBApiTestCase(test.TestCase):
metadata = _get_fake_aggr_metadata()
db.aggregate_metadata_add(ctxt, result.id, metadata)
expected = db.aggregate_metadata_get(ctxt, result.id)
self.assertDictMatch(metadata, expected)
self.assertThat(metadata, matchers.DictMatches(expected))
def test_aggregate_metadata_update(self):
"""Ensure we can update metadata for the aggregate."""
@@ -818,7 +822,7 @@ class AggregateDBApiTestCase(test.TestCase):
db.aggregate_metadata_add(ctxt, result.id, new_metadata)
expected = db.aggregate_metadata_get(ctxt, result.id)
metadata[key] = 'foo'
self.assertDictMatch(metadata, expected)
self.assertThat(metadata, matchers.DictMatches(expected))
def test_aggregate_metadata_delete(self):
"""Ensure we can delete metadata for the aggregate."""
@@ -829,7 +833,7 @@ class AggregateDBApiTestCase(test.TestCase):
db.aggregate_metadata_delete(ctxt, result.id, metadata.keys()[0])
expected = db.aggregate_metadata_get(ctxt, result.id)
del metadata[metadata.keys()[0]]
self.assertDictMatch(metadata, expected)
self.assertThat(metadata, matchers.DictMatches(expected))
def test_aggregate_metadata_delete_raise_not_found(self):
"""Ensure AggregateMetadataNotFound is raised when deleting."""

View File

@@ -45,6 +45,7 @@ from nova import test
from nova.tests import fake_libvirt_utils
from nova.tests import fake_network
import nova.tests.image.fake
from nova.tests import matchers
from nova import utils
from nova.virt.disk import api as disk
from nova.virt import driver
@@ -630,7 +631,7 @@ class LibvirtConnTestCase(test.TestCase):
'id': 'fake'
}
result = conn.get_volume_connector(volume)
self.assertDictMatch(expected, result)
self.assertThat(expected, matchers.DictMatches(result))
def test_get_guest_config(self):
conn = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), True)
@@ -1923,11 +1924,11 @@ class LibvirtConnTestCase(test.TestCase):
self.mox.ReplayAll()
return_value = conn.check_can_live_migrate_destination(self.context,
instance_ref, compute_info, compute_info, True)
self.assertDictMatch(return_value,
{"filename": "file",
self.assertThat({"filename": "file",
'disk_available_mb': 409600,
"disk_over_commit": False,
"block_migration": True})
"block_migration": True},
matchers.DictMatches(return_value))
def test_check_can_live_migrate_dest_all_pass_no_block_migration(self):
instance_ref = db.instance_create(self.context, self.test_instance)
@@ -1949,11 +1950,11 @@ class LibvirtConnTestCase(test.TestCase):
self.mox.ReplayAll()
return_value = conn.check_can_live_migrate_destination(self.context,
instance_ref, compute_info, compute_info, False)
self.assertDictMatch(return_value,
{"filename": "file",
self.assertThat({"filename": "file",
"block_migration": False,
"disk_over_commit": False,
"disk_available_mb": None})
"disk_available_mb": None},
matchers.DictMatches(return_value))
def test_check_can_live_migrate_dest_incompatible_cpu_raises(self):
instance_ref = db.instance_create(self.context, self.test_instance)

View File

@@ -41,6 +41,7 @@ from nova.tests.db import fakes as db_fakes
from nova.tests import fake_network
from nova.tests import fake_utils
import nova.tests.image.fake as fake_image
from nova.tests import matchers
from nova.tests.xenapi import stubs
from nova.virt import fake
from nova.virt.xenapi import agent
@@ -359,7 +360,7 @@ class XenAPIVMTestCase(stubs.XenAPITestBase):
}
instance = self._create_instance()
expected = self.conn.get_diagnostics(instance)
self.assertDictMatch(fake_diagnostics, expected)
self.assertThat(fake_diagnostics, matchers.DictMatches(expected))
def test_instance_snapshot_fails_with_no_primary_vdi(self):
def create_bad_vbd(session, vm_ref, vdi_ref, userdevice,
@@ -2090,7 +2091,8 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
self.conn._pool.add_to_aggregate(self.context, aggregate, "host")
result = db.aggregate_get(self.context, aggregate.id)
self.assertTrue(fake_init_pool.called)
self.assertDictMatch(self.fake_metadata, result.metadetails)
self.assertThat(self.fake_metadata,
matchers.DictMatches(result.metadetails))
def test_join_slave(self):
"""Ensure join_slave gets called when the request gets to master."""
@@ -2168,8 +2170,9 @@ class XenAPIAggregateTestCase(stubs.XenAPITestBase):
self.conn._pool.remove_from_aggregate(self.context, aggregate, "host")
result = db.aggregate_get(self.context, aggregate.id)
self.assertTrue(fake_clear_pool.called)
self.assertDictMatch({pool_states.POOL_FLAG: 'XenAPI',
pool_states.KEY: pool_states.ACTIVE}, result.metadetails)
self.assertThat({pool_states.POOL_FLAG: 'XenAPI',
pool_states.KEY: pool_states.ACTIVE},
matchers.DictMatches(result.metadetails))
def test_remote_master_non_empty_pool(self):
"""Ensure AggregateError is raised if removing the master."""