Merge "Replace assert statements with assert methods"
This commit is contained in:
@@ -54,8 +54,8 @@ class TestComputeDurationByResource(FunctionalTest,
|
|||||||
|
|
||||||
def _patch_get_interval(self, start, end):
|
def _patch_get_interval(self, start, end):
|
||||||
def get_interval(event_filter, period, groupby, aggregate):
|
def get_interval(event_filter, period, groupby, aggregate):
|
||||||
assert event_filter.start
|
self.assertIsNotNone(event_filter.start)
|
||||||
assert event_filter.end
|
self.assertIsNotNone(event_filter.end)
|
||||||
if (event_filter.start > end or event_filter.end < start):
|
if (event_filter.start > end or event_filter.end < start):
|
||||||
return []
|
return []
|
||||||
duration_start = max(event_filter.start, start)
|
duration_start = max(event_filter.start, start)
|
||||||
|
|||||||
@@ -527,7 +527,7 @@ class TestListMeters(FunctionalTest,
|
|||||||
'value': 'test_source_doesnt_exist',
|
'value': 'test_source_doesnt_exist',
|
||||||
}],
|
}],
|
||||||
)
|
)
|
||||||
assert not data
|
self.assertIsEmpty(data)
|
||||||
|
|
||||||
def test_with_user(self):
|
def test_with_user(self):
|
||||||
data = self.get_json('/meters',
|
data = self.get_json('/meters',
|
||||||
|
|||||||
@@ -41,6 +41,20 @@ class BaseTestCase(test.BaseTestCase):
|
|||||||
0.0,
|
0.0,
|
||||||
places=5)
|
places=5)
|
||||||
|
|
||||||
|
def assertIsEmpty(self, obj):
|
||||||
|
try:
|
||||||
|
if len(obj) != 0:
|
||||||
|
self.fail("%s is not empty" % type(obj))
|
||||||
|
except (TypeError, AttributeError):
|
||||||
|
self.fail("%s doesn't have length" % type(obj))
|
||||||
|
|
||||||
|
def assertIsNotEmpty(self, obj):
|
||||||
|
try:
|
||||||
|
if len(obj) == 0:
|
||||||
|
self.fail("%s is empty" % type(obj))
|
||||||
|
except (TypeError, AttributeError):
|
||||||
|
self.fail("%s doesn't have length" % type(obj))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def path_get(project_file=None):
|
def path_get(project_file=None):
|
||||||
root = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
root = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||||
|
|||||||
@@ -20,10 +20,10 @@
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
from ceilometer.openstack.common import test
|
import ceilometer.tests.base as base
|
||||||
|
|
||||||
|
|
||||||
class TestPollsterBase(test.BaseTestCase):
|
class TestPollsterBase(base.BaseTestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestPollsterBase, self).setUp()
|
super(TestPollsterBase, self).setUp()
|
||||||
|
|||||||
@@ -46,7 +46,7 @@ class TestDiskPollsters(base.TestPollsterBase):
|
|||||||
mgr = manager.AgentManager()
|
mgr = manager.AgentManager()
|
||||||
cache = {}
|
cache = {}
|
||||||
samples = list(pollster.get_samples(mgr, cache, [self.instance]))
|
samples = list(pollster.get_samples(mgr, cache, [self.instance]))
|
||||||
assert samples
|
self.assertIsNotEmpty(samples)
|
||||||
self.assertIn(pollster.CACHE_KEY_DISK, cache)
|
self.assertIn(pollster.CACHE_KEY_DISK, cache)
|
||||||
self.assertIn(self.instance.name, cache[pollster.CACHE_KEY_DISK])
|
self.assertIn(self.instance.name, cache[pollster.CACHE_KEY_DISK])
|
||||||
|
|
||||||
@@ -96,7 +96,7 @@ class TestDiskRatePollsters(base.TestPollsterBase):
|
|||||||
mgr = manager.AgentManager()
|
mgr = manager.AgentManager()
|
||||||
cache = {}
|
cache = {}
|
||||||
samples = list(pollster.get_samples(mgr, cache, [self.instance]))
|
samples = list(pollster.get_samples(mgr, cache, [self.instance]))
|
||||||
assert samples
|
self.assertIsNotEmpty(samples)
|
||||||
self.assertIsNotNone(samples)
|
self.assertIsNotNone(samples)
|
||||||
self.assertIn(pollster.CACHE_KEY_DISK_RATE, cache)
|
self.assertIn(pollster.CACHE_KEY_DISK_RATE, cache)
|
||||||
self.assertIn(self.instance.id, cache[pollster.CACHE_KEY_DISK_RATE])
|
self.assertIn(self.instance.id, cache[pollster.CACHE_KEY_DISK_RATE])
|
||||||
|
|||||||
@@ -68,8 +68,8 @@ class TestDispatcherDB(test.BaseTestCase):
|
|||||||
|
|
||||||
self.dispatcher.record_metering_data(msg)
|
self.dispatcher.record_metering_data(msg)
|
||||||
|
|
||||||
assert not self.dispatcher.storage_conn.called, \
|
if self.dispatcher.storage_conn.called:
|
||||||
'Should not have called the storage connection'
|
self.fail('Should not have called the storage connection')
|
||||||
|
|
||||||
def test_timestamp_conversion(self):
|
def test_timestamp_conversion(self):
|
||||||
msg = {'counter_name': 'test',
|
msg = {'counter_name': 'test',
|
||||||
|
|||||||
@@ -19,7 +19,7 @@
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
from ceilometer.network import notifications
|
from ceilometer.network import notifications
|
||||||
from ceilometer.openstack.common import test
|
from ceilometer.tests import base as test
|
||||||
|
|
||||||
NOTIFICATION_NETWORK_CREATE = {
|
NOTIFICATION_NETWORK_CREATE = {
|
||||||
u'_context_roles': [u'anotherrole',
|
u'_context_roles': [u'anotherrole',
|
||||||
@@ -307,17 +307,17 @@ class TestEventTypes(test.BaseTestCase):
|
|||||||
def test_network(self):
|
def test_network(self):
|
||||||
v = notifications.Network()
|
v = notifications.Network()
|
||||||
events = v.event_types
|
events = v.event_types
|
||||||
assert events
|
self.assertIsNotEmpty(events)
|
||||||
|
|
||||||
def test_subnet(self):
|
def test_subnet(self):
|
||||||
v = notifications.Subnet()
|
v = notifications.Subnet()
|
||||||
events = v.event_types
|
events = v.event_types
|
||||||
assert events
|
self.assertIsNotEmpty(events)
|
||||||
|
|
||||||
def test_port(self):
|
def test_port(self):
|
||||||
v = notifications.Port()
|
v = notifications.Port()
|
||||||
events = v.event_types
|
events = v.event_types
|
||||||
assert events
|
self.assertIsNotEmpty(events)
|
||||||
|
|
||||||
def test_router(self):
|
def test_router(self):
|
||||||
self.assertTrue(notifications.Router().event_types)
|
self.assertTrue(notifications.Router().event_types)
|
||||||
|
|||||||
@@ -174,7 +174,7 @@ class ResourceTest(DBTestBase,
|
|||||||
self.assertEqual(resource.metadata['display_name'], 'test-server')
|
self.assertEqual(resource.metadata['display_name'], 'test-server')
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
assert False, 'Never found resource-id'
|
self.fail('Never found resource-id')
|
||||||
|
|
||||||
def test_get_resources_start_timestamp(self):
|
def test_get_resources_start_timestamp(self):
|
||||||
timestamp = datetime.datetime(2012, 7, 2, 10, 42)
|
timestamp = datetime.datetime(2012, 7, 2, 10, 42)
|
||||||
@@ -425,7 +425,7 @@ class MeterTest(DBTestBase,
|
|||||||
def test_get_meters_by_metaquery(self):
|
def test_get_meters_by_metaquery(self):
|
||||||
q = {'metadata.display_name': 'test-server'}
|
q = {'metadata.display_name': 'test-server'}
|
||||||
results = list(self.conn.get_meters(metaquery=q))
|
results = list(self.conn.get_meters(metaquery=q))
|
||||||
assert results
|
self.assertIsNotEmpty(results)
|
||||||
self.assertEqual(len(results), 9)
|
self.assertEqual(len(results), 9)
|
||||||
|
|
||||||
def test_get_meters_by_empty_metaquery(self):
|
def test_get_meters_by_empty_metaquery(self):
|
||||||
@@ -542,7 +542,7 @@ class RawSampleTest(DBTestBase,
|
|||||||
def test_get_samples_by_resource(self):
|
def test_get_samples_by_resource(self):
|
||||||
f = storage.SampleFilter(user='user-id', resource='resource-id')
|
f = storage.SampleFilter(user='user-id', resource='resource-id')
|
||||||
results = list(self.conn.get_samples(f))
|
results = list(self.conn.get_samples(f))
|
||||||
assert results
|
self.assertIsNotEmpty(results)
|
||||||
meter = results[1]
|
meter = results[1]
|
||||||
d = meter.as_dict()
|
d = meter.as_dict()
|
||||||
self.assertEqual(d['recorded_at'], timeutils.utcnow())
|
self.assertEqual(d['recorded_at'], timeutils.utcnow())
|
||||||
@@ -640,12 +640,12 @@ class RawSampleTest(DBTestBase,
|
|||||||
def test_get_samples_by_name(self):
|
def test_get_samples_by_name(self):
|
||||||
f = storage.SampleFilter(user='user-id', meter='no-such-meter')
|
f = storage.SampleFilter(user='user-id', meter='no-such-meter')
|
||||||
results = list(self.conn.get_samples(f))
|
results = list(self.conn.get_samples(f))
|
||||||
assert not results
|
self.assertIsEmpty(results)
|
||||||
|
|
||||||
def test_get_samples_by_name2(self):
|
def test_get_samples_by_name2(self):
|
||||||
f = storage.SampleFilter(user='user-id', meter='instance')
|
f = storage.SampleFilter(user='user-id', meter='instance')
|
||||||
results = list(self.conn.get_samples(f))
|
results = list(self.conn.get_samples(f))
|
||||||
assert results
|
self.assertIsNotEmpty(results)
|
||||||
|
|
||||||
def test_get_samples_by_source(self):
|
def test_get_samples_by_source(self):
|
||||||
f = storage.SampleFilter(source='test-1')
|
f = storage.SampleFilter(source='test-1')
|
||||||
|
|||||||
Reference in New Issue
Block a user