heat/heat/tests/test_engine_manager.py

717 lines
27 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import os
import nose
import unittest
import mox
import json
import sqlalchemy
from nose.plugins.attrib import attr
from nose import with_setup
from heat.common import context
from heat.tests.v1_1 import fakes
import heat.engine.api as engine_api
from heat.engine import instance as instances
import heat.db as db_api
from heat.engine import parser
from heat.engine import manager
from heat.engine import auth
from heat.engine import watchrule
tests_dir = os.path.dirname(os.path.realpath(__file__))
templates_dir = os.path.normpath(os.path.join(tests_dir,
os.path.pardir, os.path.pardir,
'templates'))
def create_context(mocks, user='stacks_test_user',
tenant='test_admin', ctx=None):
ctx = ctx or context.get_admin_context()
mocks.StubOutWithMock(ctx, 'username')
mocks.StubOutWithMock(ctx, 'tenant')
ctx.username = user
ctx.tenant = tenant
mocks.StubOutWithMock(auth, 'authenticate')
return ctx
def get_wordpress_stack(stack_name, ctx):
tmpl_path = os.path.join(templates_dir,
'WordPress_Single_Instance_gold.template')
with open(tmpl_path) as f:
t = json.load(f)
template = parser.Template(t)
parameters = parser.Parameters(stack_name, template,
{'KeyName': 'test'})
stack = parser.Stack(ctx, stack_name, template, parameters)
return stack
def setup_mocks(mocks, stack):
fc = fakes.FakeClient()
mocks.StubOutWithMock(instances.Instance, 'nova')
instances.Instance.nova().MultipleTimes().AndReturn(fc)
instance = stack.resources['WebServer']
instance.calculate_properties()
server_userdata = instance._build_userdata(instance.properties['UserData'])
mocks.StubOutWithMock(fc.servers, 'create')
fc.servers.create(image=744, flavor=3, key_name='test',
name='%s.WebServer' % stack.name, security_groups=None,
userdata=server_userdata, scheduler_hints=None,
meta=None).AndReturn(fc.servers.list()[-1])
@attr(tag=['unit', 'stack'])
@attr(speed='slow')
class stackCreateTest(unittest.TestCase):
def setUp(self):
self.m = mox.Mox()
def tearDown(self):
self.m.UnsetStubs()
print "stackTest teardown complete"
def test_wordpress_single_instance_stack_create(self):
stack = get_wordpress_stack('test_stack', create_context(self.m))
setup_mocks(self.m, stack)
self.m.ReplayAll()
stack.store()
stack.create()
self.assertNotEqual(stack.resources['WebServer'], None)
self.assertTrue(stack.resources['WebServer'].instance_id > 0)
self.assertNotEqual(stack.resources['WebServer'].ipaddress, '0.0.0.0')
def test_wordpress_single_instance_stack_delete(self):
ctx = create_context(self.m, tenant='test_delete_tenant')
stack = get_wordpress_stack('test_stack', ctx)
setup_mocks(self.m, stack)
self.m.ReplayAll()
stack_id = stack.store()
stack.create()
db_s = db_api.stack_get(ctx, stack_id)
self.assertNotEqual(db_s, None)
self.assertNotEqual(stack.resources['WebServer'], None)
self.assertTrue(stack.resources['WebServer'].instance_id > 0)
stack.delete()
self.assertEqual(stack.resources['WebServer'].state, 'DELETE_COMPLETE')
self.assertEqual(db_api.stack_get(ctx, stack_id), None)
self.assertEqual(db_s.status, 'DELETE_COMPLETE')
@attr(tag=['unit', 'engine-api', 'engine-manager'])
@attr(speed='fast')
class stackManagerCreateUpdateDeleteTest(unittest.TestCase):
def setUp(self):
self.m = mox.Mox()
self.username = 'stack_manager_create_test_user'
self.tenant = 'stack_manager_create_test_tenant'
self.ctx = create_context(self.m, self.username, self.tenant)
auth.authenticate(self.ctx).AndReturn(True)
self.man = manager.EngineManager()
def tearDown(self):
self.m.UnsetStubs()
def test_stack_create(self):
stack_name = 'manager_create_test_stack'
params = {'foo': 'bar'}
template = '{ "Template": "data" }'
stack = get_wordpress_stack(stack_name, self.ctx)
self.m.StubOutWithMock(parser, 'Template')
self.m.StubOutWithMock(parser, 'Parameters')
self.m.StubOutWithMock(parser, 'Stack')
parser.Template(template).AndReturn(stack.t)
parser.Parameters(stack_name,
stack.t,
params).AndReturn(stack.parameters)
parser.Stack(self.ctx, stack.name,
stack.t, stack.parameters).AndReturn(stack)
self.m.StubOutWithMock(stack, 'validate')
stack.validate().AndReturn({'Description': 'Successfully validated'})
self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
manager.greenpool.spawn_n(stack.create)
self.m.ReplayAll()
result = self.man.create_stack(self.ctx, stack_name,
template, params, {})
self.assertEqual(result, stack.identifier())
self.assertTrue(isinstance(result, dict))
self.assertTrue(result['stack_id'])
self.m.VerifyAll()
def test_stack_create_verify_err(self):
stack_name = 'manager_create_verify_err_test_stack'
params = {'foo': 'bar'}
template = '{ "Template": "data" }'
stack = get_wordpress_stack(stack_name, self.ctx)
self.m.StubOutWithMock(parser, 'Template')
self.m.StubOutWithMock(parser, 'Parameters')
self.m.StubOutWithMock(parser, 'Stack')
parser.Template(template).AndReturn(stack.t)
parser.Parameters(stack_name,
stack.t,
params).AndReturn(stack.parameters)
parser.Stack(self.ctx, stack.name,
stack.t, stack.parameters).AndReturn(stack)
self.m.StubOutWithMock(stack, 'validate')
error = {'Description': 'fubar'}
stack.validate().AndReturn(error)
self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
self.m.ReplayAll()
result = self.man.create_stack(self.ctx, stack_name,
template, params, {})
self.assertEqual(result, error)
self.m.VerifyAll()
def test_stack_delete(self):
stack_name = 'manager_delete_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
stack.store()
self.m.StubOutWithMock(parser.Stack, 'load')
self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
parser.Stack.load(self.ctx, stack.id).AndReturn(stack)
manager.greenpool.spawn_n(stack.delete)
self.m.ReplayAll()
self.assertEqual(self.man.delete_stack(self.ctx,
stack.identifier(), {}),
None)
self.m.VerifyAll()
def test_stack_delete_nonexist(self):
stack_name = 'manager_delete_nonexist_test_stack'
stack = get_wordpress_stack(stack_name, self.ctx)
self.m.ReplayAll()
self.assertRaises(AttributeError,
self.man.delete_stack,
self.ctx, stack.identifier(), {})
self.m.VerifyAll()
def test_stack_update(self):
stack_name = 'manager_update_test_stack'
params = {'foo': 'bar'}
template = '{ "Template": "data" }'
old_stack = get_wordpress_stack(stack_name, self.ctx)
old_stack.store()
stack = get_wordpress_stack(stack_name, self.ctx)
self.m.StubOutWithMock(parser, 'Stack')
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, old_stack.id).AndReturn(old_stack)
self.m.StubOutWithMock(parser, 'Template')
self.m.StubOutWithMock(parser, 'Parameters')
parser.Template(template).AndReturn(stack.t)
parser.Parameters(stack_name,
stack.t,
params).AndReturn(stack.parameters)
parser.Stack(self.ctx, stack.name,
stack.t, stack.parameters).AndReturn(stack)
self.m.StubOutWithMock(stack, 'validate')
stack.validate().AndReturn({'Description': 'Successfully validated'})
self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
manager.greenpool.spawn_n(old_stack.update, stack)
self.m.ReplayAll()
result = self.man.update_stack(self.ctx, old_stack.identifier(),
template, params, {})
self.assertEqual(result, old_stack.identifier())
self.assertTrue(isinstance(result, dict))
self.assertTrue(result['stack_id'])
self.m.VerifyAll()
def test_stack_update_verify_err(self):
stack_name = 'manager_update_verify_err_test_stack'
params = {'foo': 'bar'}
template = '{ "Template": "data" }'
old_stack = get_wordpress_stack(stack_name, self.ctx)
old_stack.store()
stack = get_wordpress_stack(stack_name, self.ctx)
self.m.StubOutWithMock(parser, 'Stack')
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, old_stack.id).AndReturn(old_stack)
self.m.StubOutWithMock(parser, 'Template')
self.m.StubOutWithMock(parser, 'Parameters')
parser.Template(template).AndReturn(stack.t)
parser.Parameters(stack_name,
stack.t,
params).AndReturn(stack.parameters)
parser.Stack(self.ctx, stack.name,
stack.t, stack.parameters).AndReturn(stack)
self.m.StubOutWithMock(stack, 'validate')
error = {'Description': 'fubar'}
stack.validate().AndReturn(error)
self.m.StubOutWithMock(manager.greenpool, 'spawn_n')
self.m.ReplayAll()
result = self.man.update_stack(self.ctx, old_stack.identifier(),
template, params, {})
self.assertEqual(result, error)
self.m.VerifyAll()
def test_stack_update_nonexist(self):
stack_name = 'manager_update_nonexist_test_stack'
params = {'foo': 'bar'}
template = '{ "Template": "data" }'
stack = get_wordpress_stack(stack_name, self.ctx)
self.m.ReplayAll()
self.assertRaises(AttributeError,
self.man.update_stack,
self.ctx, stack.identifier(), template, params, {})
self.m.VerifyAll()
@attr(tag=['unit', 'engine-api', 'engine-manager'])
@attr(speed='fast')
class stackManagerTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
m = mox.Mox()
cls.username = 'stack_manager_test_user'
cls.tenant = 'stack_manager_test_tenant'
ctx = create_context(m, cls.username, cls.tenant)
cls.stack_name = 'manager_test_stack'
stack = get_wordpress_stack(cls.stack_name, ctx)
setup_mocks(m, stack)
m.ReplayAll()
stack.store()
stack.create()
cls.stack = stack
cls.stack_identity = stack.identifier()
m.UnsetStubs()
@classmethod
def tearDownClass(cls):
cls = cls
m = mox.Mox()
create_context(m, cls.username, cls.tenant, ctx=cls.stack.context)
setup_mocks(m, cls.stack)
m.ReplayAll()
cls.stack.delete()
m.UnsetStubs()
def setUp(self):
self.m = mox.Mox()
self.ctx = create_context(self.m, self.username, self.tenant)
auth.authenticate(self.ctx).AndReturn(True)
setup_mocks(self.m, self.stack)
self.m.ReplayAll()
self.man = manager.EngineManager()
def tearDown(self):
self.m.UnsetStubs()
def test_stack_identify(self):
identity = self.man.identify_stack(self.ctx, self.stack_name)
self.assertEqual(identity, self.stack_identity)
def test_stack_identify_nonexist(self):
self.assertRaises(AttributeError, self.man.identify_stack,
self.ctx, 'wibble')
def test_stack_create_existing(self):
self.assertRaises(AttributeError, self.man.create_stack,
self.ctx, self.stack_name, self.stack.t, {}, {})
def test_stack_event_list(self):
el = self.man.list_events(self.ctx, self.stack_identity, {})
self.assertTrue('events' in el)
events = el['events']
self.assertEqual(len(events), 2)
for ev in events:
self.assertTrue('event_id' in ev)
self.assertTrue(ev['event_id'] > 0)
self.assertTrue('logical_resource_id' in ev)
self.assertEqual(ev['logical_resource_id'], 'WebServer')
self.assertTrue('physical_resource_id' in ev)
self.assertTrue('resource_properties' in ev)
# Big long user data field.. it mentions 'wordpress'
# a few times so this should work.
user_data = ev['resource_properties']['UserData']
self.assertNotEqual(user_data.find('wordpress'), -1)
self.assertEqual(ev['resource_properties']['ImageId'],
'F17-x86_64-gold')
self.assertEqual(ev['resource_properties']['InstanceType'],
'm1.large')
self.assertTrue('resource_status' in ev)
self.assertTrue(ev['resource_status'] in ('IN_PROGRESS',
'CREATE_COMPLETE'))
self.assertTrue('resource_status_reason' in ev)
self.assertEqual(ev['resource_status_reason'], 'state changed')
self.assertTrue('resource_type' in ev)
self.assertEqual(ev['resource_type'], 'AWS::EC2::Instance')
self.assertTrue('stack_identity' in ev)
self.assertTrue('stack_name' in ev)
self.assertEqual(ev['stack_name'], self.stack_name)
self.assertTrue('event_time' in ev)
def test_stack_describe_all(self):
sl = self.man.show_stack(self.ctx, None, {})
self.assertEqual(len(sl['stacks']), 1)
for s in sl['stacks']:
self.assertNotEqual(s['stack_identity'], None)
self.assertNotEqual(s['description'].find('WordPress'), -1)
def test_stack_describe_all_empty(self):
self.tearDown()
self.tenant = 'stack_describe_all_empty_tenant'
self.setUp()
sl = self.man.show_stack(self.ctx, None, {})
self.assertEqual(len(sl['stacks']), 0)
def test_stack_describe_nonexistent(self):
nonexist = dict(self.stack_identity)
nonexist['stack_name'] = 'wibble'
self.assertRaises(AttributeError,
self.man.show_stack,
self.ctx, nonexist, {})
def test_stack_describe(self):
sl = self.man.show_stack(self.ctx, self.stack_identity, {})
self.assertEqual(len(sl['stacks']), 1)
s = sl['stacks'][0]
self.assertTrue('creation_time' in s)
self.assertTrue('updated_time' in s)
self.assertTrue('stack_identity' in s)
self.assertNotEqual(s['stack_identity'], None)
self.assertTrue('stack_name' in s)
self.assertEqual(s['stack_name'], self.stack_name)
self.assertTrue('stack_status' in s)
self.assertTrue('stack_status_reason' in s)
self.assertTrue('description' in s)
self.assertNotEqual(s['description'].find('WordPress'), -1)
self.assertTrue('parameters' in s)
def test_stack_resource_describe(self):
r = self.man.describe_stack_resource(self.ctx, self.stack_identity,
'WebServer')
self.assertTrue('description' in r)
self.assertTrue('updated_time' in r)
self.assertTrue('stack_identity' in r)
self.assertNotEqual(r['stack_identity'], None)
self.assertTrue('stack_name' in r)
self.assertEqual(r['stack_name'], self.stack_name)
self.assertTrue('metadata' in r)
self.assertTrue('resource_status' in r)
self.assertTrue('resource_status_reason' in r)
self.assertTrue('resource_type' in r)
self.assertTrue('physical_resource_id' in r)
self.assertTrue('logical_resource_id' in r)
self.assertEqual(r['logical_resource_id'], 'WebServer')
def test_stack_resource_describe_nonexist_stack(self):
nonexist = dict(self.stack_identity)
nonexist['stack_name'] = 'foo'
self.assertRaises(AttributeError,
self.man.describe_stack_resource,
self.ctx, nonexist, 'WebServer')
def test_stack_resource_describe_nonexist_resource(self):
self.assertRaises(AttributeError,
self.man.describe_stack_resource,
self.ctx, self.stack_identity, 'foo')
def test_stack_resources_describe(self):
resources = self.man.describe_stack_resources(self.ctx,
self.stack_identity,
None, 'WebServer')
self.assertEqual(len(resources), 1)
r = resources[0]
self.assertTrue('description' in r)
self.assertTrue('updated_time' in r)
self.assertTrue('stack_identity' in r)
self.assertNotEqual(r['stack_identity'], None)
self.assertTrue('stack_name' in r)
self.assertEqual(r['stack_name'], self.stack_name)
self.assertTrue('resource_status' in r)
self.assertTrue('resource_status_reason' in r)
self.assertTrue('resource_type' in r)
self.assertTrue('physical_resource_id' in r)
self.assertTrue('logical_resource_id' in r)
self.assertEqual(r['logical_resource_id'], 'WebServer')
def test_stack_resources_describe_no_filter(self):
resources = self.man.describe_stack_resources(self.ctx,
self.stack_identity,
None, None)
self.assertEqual(len(resources), 1)
r = resources[0]
self.assertTrue('logical_resource_id' in r)
self.assertEqual(r['logical_resource_id'], 'WebServer')
def test_stack_resources_describe_bad_lookup(self):
self.assertRaises(AttributeError,
self.man.describe_stack_resources,
self.ctx, None, None, 'WebServer')
def test_stack_resources_describe_nonexist_stack(self):
nonexist = dict(self.stack_identity)
nonexist['stack_name'] = 'foo'
self.assertRaises(AttributeError,
self.man.describe_stack_resources,
self.ctx, nonexist, None, 'WebServer')
def test_stack_resources_describe_nonexist_physid(self):
self.assertRaises(AttributeError,
self.man.describe_stack_resources,
self.ctx, None, 'foo', 'WebServer')
def test_stack_resources_list(self):
resources = self.man.list_stack_resources(self.ctx,
self.stack_identity)
self.assertEqual(len(resources), 1)
r = resources[0]
self.assertTrue('updated_time' in r)
self.assertTrue('physical_resource_id' in r)
self.assertTrue('logical_resource_id' in r)
self.assertEqual(r['logical_resource_id'], 'WebServer')
self.assertTrue('resource_status' in r)
self.assertTrue('resource_status_reason' in r)
self.assertTrue('resource_type' in r)
def test_stack_resources_list_nonexist_stack(self):
nonexist = dict(self.stack_identity)
nonexist['stack_name'] = 'foo'
self.assertRaises(AttributeError,
self.man.list_stack_resources,
self.ctx, nonexist)
def test_metadata(self):
err, metadata = self.man.metadata_get_resource(None,
self.stack_name,
'WebServer')
self.assertEqual(err, None)
self.assertTrue('AWS::CloudFormation::Init' in metadata)
test_metadata = {'foo': 'bar', 'baz': 'quux', 'blarg': 'wibble'}
err, result = self.man.metadata_update(None,
self.stack.id, 'WebServer',
test_metadata)
self.assertEqual(err, None)
self.assertEqual(result, test_metadata)
err, metadata = self.man.metadata_get_resource(None,
self.stack_name,
'WebServer')
self.assertEqual(err, None)
self.assertFalse('AWS::CloudFormation::Init' in metadata)
self.assertEqual(metadata, test_metadata)
def test_show_watch(self):
# Insert two dummy watch rules into the DB
values = {'stack_name': u'wordpress_ha', 'state': 'NORMAL',
'name': u'HttpFailureAlarm',
'rule': {
u'EvaluationPeriods': u'1',
u'AlarmActions': [u'WebServerRestartPolicy'],
u'AlarmDescription': u'Restart the WikiDatabase',
u'Namespace': u'system/linux',
u'Period': u'300',
u'ComparisonOperator': u'GreaterThanThreshold',
u'Statistic': u'SampleCount',
u'Threshold': u'2',
u'MetricName': u'ServiceFailure'}}
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
values['name'] = "AnotherWatch"
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
# watch_name=None should return both watches
result = self.man.show_watch(self.ctx, watch_name=None)
self.assertEqual(2, len(result))
# watch_name="HttpFailureAlarm" should return only one
result = self.man.show_watch(self.ctx, watch_name="HttpFailureAlarm")
self.assertEqual(1, len(result))
# watch_name="nonexistent" should raise an AttributeError
self.assertRaises(AttributeError,
self.man.show_watch,
self.ctx, watch_name="nonexistent")
# Check the response has all keys defined in the engine API
for key in engine_api.WATCH_KEYS:
self.assertTrue(key in result[0])
def test_show_watch_metric(self):
# Get one of the watch rules created in test_show_watch
# And add a metric datapoint
watch = db_api.watch_rule_get(self.ctx, "HttpFailureAlarm")
self.assertNotEqual(watch, None)
values = {'watch_rule_id': watch.id,
'data': {
u'Namespace': u'system/linux',
u'ServiceFailure': {
u'Units': u'Counter', u'Value': 1}}}
watch = db_api.watch_data_create(self.ctx, values)
# Check there is one result returned
result = self.man.show_watch_metric(self.ctx, namespace=None,
metric_name=None)
self.assertEqual(1, len(result))
# Create another metric datapoint and check we get two
watch = db_api.watch_data_create(self.ctx, values)
result = self.man.show_watch_metric(self.ctx, namespace=None,
metric_name=None)
self.assertEqual(2, len(result))
# Check the response has all keys defined in the engine API
for key in engine_api.WATCH_DATA_KEYS:
self.assertTrue(key in result[0])
def test_set_watch_state(self):
# Insert dummy watch rule into the DB
values = {'stack_name': u'wordpress_ha', 'state': 'NORMAL',
'name': u'OverrideAlarm',
'rule': {
u'EvaluationPeriods': u'1',
u'AlarmActions': [u'WebServerRestartPolicy'],
u'AlarmDescription': u'Restart the WikiDatabase',
u'Namespace': u'system/linux',
u'Period': u'300',
u'ComparisonOperator': u'GreaterThanThreshold',
u'Statistic': u'SampleCount',
u'Threshold': u'2',
u'MetricName': u'ServiceFailure'}}
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
for state in watchrule.WatchRule.WATCH_STATES:
result = self.man.set_watch_state(self.ctx,
watch_name="OverrideAlarm",
state=state)
self.assertNotEqual(result, None)
self.assertEqual(result[engine_api.WATCH_STATE_VALUE], state)
# Cleanup, delete the dummy rule
db_api.watch_rule_delete(self.ctx, "OverrideAlarm")
def test_set_watch_state_badstate(self):
# Insert dummy watch rule into the DB
values = {'stack_name': u'wordpress_ha', 'state': 'NORMAL',
'name': u'OverrideAlarm2',
'rule': {
u'EvaluationPeriods': u'1',
u'AlarmActions': [u'WebServerRestartPolicy'],
u'AlarmDescription': u'Restart the WikiDatabase',
u'Namespace': u'system/linux',
u'Period': u'300',
u'ComparisonOperator': u'GreaterThanThreshold',
u'Statistic': u'SampleCount',
u'Threshold': u'2',
u'MetricName': u'ServiceFailure'}}
db_ret = db_api.watch_rule_create(self.ctx, values)
self.assertNotEqual(db_ret, None)
for state in ["HGJHGJHG", "1234", "!\*(&%"]:
self.assertRaises(AttributeError,
self.man.set_watch_state,
self.ctx, watch_name="OverrideAlarm2",
state=state)
# Cleanup, delete the dummy rule
db_api.watch_rule_delete(self.ctx, "OverrideAlarm2")
def test_set_watch_state_noexist(self):
# watch_name="nonexistent" should raise an AttributeError
state = watchrule.WatchRule.ALARM # State valid
self.assertRaises(AttributeError,
self.man.set_watch_state,
self.ctx, watch_name="nonexistent", state=state)
# allows testing of the test directly
if __name__ == '__main__':
sys.argv.append(__file__)
nose.main()