Add more unit tests for the engine API

Verify the following endpoints:
 - list_stack_resources
 - describe_stack_resources
 - describe_stack_resource

Additionally, check for presence of the correct output keys from the
following endpoints:
 - list_stacks
 - show_stack

Note that some tests are commented out because they are currently failing.
This will be fixed in a future patch.

Change-Id: I1c8c46283c1217789e3d4dea7aeac11c991bcef4
Signed-off-by: Zane Bitter <zbitter@redhat.com>
This commit is contained in:
Zane Bitter
2012-07-04 16:16:16 +02:00
parent 9c7c0dea62
commit c4cb6bd395

View File

@@ -19,7 +19,7 @@ from heat.engine import auth
@attr(tag=['unit', 'resource'])
@attr(speed='fast')
@attr(speed='slow')
class stacksTest(unittest.TestCase):
def setUp(self):
self.m = mox.Mox()
@@ -140,7 +140,14 @@ class stacksTest(unittest.TestCase):
self.assertTrue(len(sl['stacks']) > 0)
for s in sl['stacks']:
self.assertTrue('CreationTime' in s)
#self.assertTrue('LastUpdatedTime' in s)
self.assertTrue('StackId' in s)
self.assertNotEqual(s['StackId'], None)
self.assertTrue('StackName' in s)
self.assertTrue('StackStatus' in s)
#self.assertTrue('StackStatusReason' in s)
self.assertTrue('TemplateDescription' in s)
self.assertNotEqual(s['TemplateDescription'].find('WordPress'), -1)
def test_stack_describe_all(self):
@@ -196,14 +203,184 @@ class stacksTest(unittest.TestCase):
man = manager.EngineManager()
sl = man.show_stack(ctx, 'test_stack_desc', {})
self.assertTrue(len(sl['stacks']) > 0)
for s in sl['stacks']:
self.assertEqual(s['StackName'], 'test_stack_desc')
self.assertTrue('CreationTime' in s)
self.assertNotEqual(s['StackId'], None)
self.assertNotEqual(s['Description'].find('WordPress'), -1)
self.assertEqual(len(sl['stacks']), 1)
# allows testing of the test directly
if __name__ == '__main__':
sys.argv.append(__file__)
nose.main()
s = sl['stacks'][0]
self.assertTrue('CreationTime' in s)
#self.assertTrue('LastUpdatedTime' in s)
self.assertTrue('StackId' in s)
self.assertNotEqual(s['StackId'], None)
self.assertTrue('StackName' in s)
self.assertEqual(s['StackName'], 'test_stack_desc')
self.assertTrue('StackStatus' in s)
self.assertTrue('StackStatusReason' in s)
self.assertTrue('Description' in s)
self.assertNotEqual(s['Description'].find('WordPress'), -1)
self.assertTrue('Parameters' in s)
def test_stack_resource_describe(self):
ctx = self.create_context('stack_res_describe')
auth.authenticate(ctx).AndReturn(True)
stack = self.get_wordpress_stack('test_stack_res_desc', ctx)
self.m.ReplayAll()
stack.store()
stack.create()
man = manager.EngineManager()
r = man.describe_stack_resource(ctx, 'test_stack_res_desc',
'WebServer')
#self.assertTrue('Description' in r)
self.assertTrue('LastUpdatedTimestamp' in r)
self.assertTrue('StackId' in r)
self.assertNotEqual(r['StackId'], None)
self.assertTrue('StackName' in r)
self.assertEqual(r['StackName'], 'test_stack_res_desc')
self.assertTrue('Metadata' in r)
self.assertTrue('ResourceStatus' in r)
self.assertTrue('ResourceStatusReason' in r)
self.assertTrue('ResourceType' in r)
self.assertTrue('PhysicalResourceId' in r)
self.assertTrue('LogicalResourceId' in r)
self.assertEqual(r['LogicalResourceId'], 'WebServer')
def test_stack_resource_describe_nonexist_stack(self):
ctx = self.create_context()
auth.authenticate(ctx).AndReturn(True)
man = manager.EngineManager()
self.assertRaises(AttributeError,
man.describe_stack_resource,
ctx, 'foo', 'WebServer')
def test_stack_resource_describe_nonexist_resource(self):
ctx = self.create_context('stack_res_describe_bad_rsrc')
auth.authenticate(ctx).AndReturn(True)
stack = self.get_wordpress_stack('test_stack_res_desc', ctx)
self.m.ReplayAll()
stack.store()
stack.create()
man = manager.EngineManager()
self.assertRaises(AttributeError,
man.describe_stack_resource,
ctx, 'test_stack_res_desc', 'foo')
def test_stack_resources_describe(self):
ctx = self.create_context('stack_res_describe')
auth.authenticate(ctx).AndReturn(True)
stack = self.get_wordpress_stack('test_stack_ress_desc', ctx)
self.m.ReplayAll()
stack.store()
stack.create()
man = manager.EngineManager()
resources = man.describe_stack_resources(ctx, 'test_stack_ress_desc',
None, 'WebServer')
self.assertEqual(len(resources), 1)
r = resources[0]
#self.assertTrue('Description' in r)
self.assertTrue('Timestamp' in r)
self.assertTrue('StackId' in r)
self.assertNotEqual(r['StackId'], None)
self.assertTrue('StackName' in r)
self.assertEqual(r['StackName'], 'test_stack_ress_desc')
self.assertTrue('ResourceStatus' in r)
self.assertTrue('ResourceStatusReason' in r)
self.assertTrue('ResourceType' in r)
self.assertTrue('PhysicalResourceId' in r)
self.assertTrue('LogicalResourceId' in r)
self.assertEqual(r['LogicalResourceId'], 'WebServer')
def test_stack_resources_describe_no_filter(self):
ctx = self.create_context('stack_res_describe_nf')
auth.authenticate(ctx).AndReturn(True)
stack = self.get_wordpress_stack('test_stack_ress_desc_nf', ctx)
self.m.ReplayAll()
stack.store()
stack.create()
man = manager.EngineManager()
resources = man.describe_stack_resources(ctx,
'test_stack_ress_desc_nf',
None, None)
self.assertEqual(len(resources), 1)
r = resources[0]
self.assertTrue('LogicalResourceId' in r)
self.assertEqual(r['LogicalResourceId'], 'WebServer')
def test_stack_resources_describe_bad_lookup(self):
ctx = self.create_context()
auth.authenticate(ctx).AndReturn(True)
man = manager.EngineManager()
self.assertRaises(AttributeError,
man.describe_stack_resources,
ctx, None, None, 'WebServer')
def test_stack_resources_describe_nonexist_stack(self):
ctx = self.create_context()
auth.authenticate(ctx).AndReturn(True)
man = manager.EngineManager()
self.assertRaises(AttributeError,
man.describe_stack_resources,
ctx, 'foo', None, 'WebServer')
def test_stack_resources_describe_nonexist_physid(self):
ctx = self.create_context()
auth.authenticate(ctx).AndReturn(True)
man = manager.EngineManager()
self.assertRaises(AttributeError,
man.describe_stack_resources,
ctx, None, 'foo', 'WebServer')
def test_stack_resources_list(self):
ctx = self.create_context('stack_res_describe')
auth.authenticate(ctx).AndReturn(True)
stack = self.get_wordpress_stack('test_stack_ress_list', ctx)
self.m.ReplayAll()
stack.store()
stack.create()
man = manager.EngineManager()
resources = man.list_stack_resources(ctx, 'test_stack_ress_list')
self.assertEqual(len(resources), 1)
r = resources[0]
self.assertTrue('LastUpdatedTimestamp' in r)
self.assertTrue('PhysicalResourceId' in r)
self.assertTrue('LogicalResourceId' in r)
self.assertEqual(r['LogicalResourceId'], 'WebServer')
self.assertTrue('ResourceStatus' in r)
#self.assertTrue('ResourceStatusReason' in r)
self.assertTrue('ResourceType' in r)
def test_stack_resources_list_nonexist_stack(self):
ctx = self.create_context()
auth.authenticate(ctx).AndReturn(True)
man = manager.EngineManager()
self.assertRaises(AttributeError,
man.list_stack_resources,
ctx, 'foo')
# allows testing of the test directly
if __name__ == '__main__':
sys.argv.append(__file__)
nose.main()