heat/heat/tests/test_template.py

1104 lines
41 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import json
import fixtures
from oslotest import mockpatch
import six
from stevedore import extension
from heat.common import exception
from heat.common import template_format
from heat.engine.cfn import functions as cfn_funcs
from heat.engine.cfn import template as cfn_t
from heat.engine.clients.os import nova
from heat.engine import environment
from heat.engine import function
from heat.engine.hot import template as hot_t
from heat.engine import parameters
from heat.engine import resource
from heat.engine import rsrc_defn
from heat.engine import stack
from heat.engine import template
from heat.tests import common
from heat.tests import generic_resource as generic_rsrc
from heat.tests.nova import fakes as fakes_nova
from heat.tests import utils
mapping_template = template_format.parse('''{
"AWSTemplateFormatVersion" : "2010-09-09",
"Mappings" : {
"ValidMapping" : {
"TestKey" : { "TestValue" : "wibble" }
},
"InvalidMapping" : {
"ValueList" : [ "foo", "bar" ],
"ValueString" : "baz"
},
"MapList": [ "foo", { "bar" : "baz" } ],
"MapString": "foobar"
}
}''')
empty_template = template_format.parse('''{
"HeatTemplateFormatVersion" : "2012-12-12",
}''')
parameter_template = template_format.parse('''{
"HeatTemplateFormatVersion" : "2012-12-12",
"Parameters" : {
"foo" : { "Type" : "String" },
"blarg" : { "Type" : "String", "Default": "quux" }
}
}''')
resource_template = template_format.parse('''{
"HeatTemplateFormatVersion" : "2012-12-12",
"Resources" : {
"foo" : { "Type" : "GenericResourceType" },
"blarg" : { "Type" : "GenericResourceType" }
}
}''')
def join(raw):
tmpl = template.Template(mapping_template)
return function.resolve(tmpl.parse(None, raw))
class DummyClass(object):
metadata = None
def metadata_get(self):
return self.metadata
def metadata_set(self, metadata):
self.metadata = metadata
class TemplatePluginFixture(fixtures.Fixture):
def __init__(self, templates={}):
super(TemplatePluginFixture, self).__init__()
self.templates = [extension.Extension(k, None, v, None)
for (k, v) in templates.items()]
def _get_template_extension_manager(self):
return extension.ExtensionManager.make_test_instance(self.templates)
def setUp(self):
super(TemplatePluginFixture, self).setUp()
def clear_template_classes():
template._template_classes = None
clear_template_classes()
self.useFixture(mockpatch.PatchObject(
template,
'_get_template_extension_manager',
new=self._get_template_extension_manager))
self.addCleanup(clear_template_classes)
class TestTemplatePluginManager(common.HeatTestCase):
def test_template_NEW_good(self):
class NewTemplate(template.Template):
SECTIONS = (VERSION, MAPPINGS) = ('NEWTemplateFormatVersion',
'__undefined__')
RESOURCES = 'thingies'
def param_schemata(self):
pass
def get_section_name(self, section):
pass
def parameters(self, stack_identifier, user_params):
pass
def validate_resource_definitions(self, stack):
pass
def resource_definitions(self, stack):
pass
def add_resource(self, definition, name=None):
pass
def __getitem__(self, section):
return {}
def functions(self):
return {}
class NewTemplatePrint(function.Function):
def result(self):
return 'always this'
self.useFixture(TemplatePluginFixture(
{'NEWTemplateFormatVersion.2345-01-01': NewTemplate}))
t = {'NEWTemplateFormatVersion': '2345-01-01'}
tmpl = template.Template(t)
err = tmpl.validate()
self.assertIsNone(err)
class TestTemplateVersion(common.HeatTestCase):
versions = (('heat_template_version', '2013-05-23'),
('HeatTemplateFormatVersion', '2012-12-12'),
('AWSTemplateFormatVersion', '2010-09-09'))
def test_hot_version(self):
tmpl = {
'heat_template_version': '2013-05-23',
'foo': 'bar',
'parameters': {}
}
self.assertEqual(('heat_template_version', '2013-05-23'),
template.get_version(tmpl, self.versions))
def test_cfn_version(self):
tmpl = {
'AWSTemplateFormatVersion': '2010-09-09',
'foo': 'bar',
'Parameters': {}
}
self.assertEqual(('AWSTemplateFormatVersion', '2010-09-09'),
template.get_version(tmpl, self.versions))
def test_heat_cfn_version(self):
tmpl = {
'HeatTemplateFormatVersion': '2012-12-12',
'foo': 'bar',
'Parameters': {}
}
self.assertEqual(('HeatTemplateFormatVersion', '2012-12-12'),
template.get_version(tmpl, self.versions))
def test_missing_version(self):
tmpl = {
'foo': 'bar',
'Parameters': {}
}
ex = self.assertRaises(exception.InvalidTemplateVersion,
template.get_version, tmpl, self.versions)
self.assertEqual('The template version is invalid: Template version '
'was not provided', six.text_type(ex))
def test_ambiguous_version(self):
tmpl = {
'AWSTemplateFormatVersion': '2010-09-09',
'HeatTemplateFormatVersion': '2012-12-12',
'foo': 'bar',
'Parameters': {}
}
self.assertRaises(exception.InvalidTemplateVersion,
template.get_version, tmpl, self.versions)
class ParserTest(common.HeatTestCase):
def test_list(self):
raw = ['foo', 'bar', 'baz']
parsed = join(raw)
for i in six.moves.xrange(len(raw)):
self.assertEqual(raw[i], parsed[i])
self.assertIsNot(raw, parsed)
def test_dict(self):
raw = {'foo': 'bar', 'blarg': 'wibble'}
parsed = join(raw)
for k in raw:
self.assertEqual(raw[k], parsed[k])
self.assertIsNot(raw, parsed)
def test_dict_list(self):
raw = {'foo': ['bar', 'baz'], 'blarg': 'wibble'}
parsed = join(raw)
self.assertEqual(raw['blarg'], parsed['blarg'])
for i in six.moves.xrange(len(raw['foo'])):
self.assertEqual(raw['foo'][i], parsed['foo'][i])
self.assertIsNot(raw, parsed)
self.assertIsNot(raw['foo'], parsed['foo'])
def test_list_dict(self):
raw = [{'foo': 'bar', 'blarg': 'wibble'}, 'baz', 'quux']
parsed = join(raw)
for i in six.moves.xrange(1, len(raw)):
self.assertEqual(raw[i], parsed[i])
for k in raw[0]:
self.assertEqual(raw[0][k], parsed[0][k])
self.assertIsNot(raw, parsed)
self.assertIsNot(raw[0], parsed[0])
def test_join(self):
raw = {'Fn::Join': [' ', ['foo', 'bar', 'baz']]}
self.assertEqual('foo bar baz', join(raw))
def test_join_none(self):
raw = {'Fn::Join': [' ', ['foo', None, 'baz']]}
self.assertEqual('foo baz', join(raw))
def test_join_list(self):
raw = [{'Fn::Join': [' ', ['foo', 'bar', 'baz']]}, 'blarg', 'wibble']
parsed = join(raw)
self.assertEqual('foo bar baz', parsed[0])
for i in six.moves.xrange(1, len(raw)):
self.assertEqual(raw[i], parsed[i])
self.assertIsNot(raw, parsed)
def test_join_dict_val(self):
raw = {'quux': {'Fn::Join': [' ', ['foo', 'bar', 'baz']]},
'blarg': 'wibble'}
parsed = join(raw)
self.assertEqual('foo bar baz', parsed['quux'])
self.assertEqual(raw['blarg'], parsed['blarg'])
self.assertIsNot(raw, parsed)
class TestTemplateValidate(common.HeatTestCase):
def test_template_validate_cfn_good(self):
t = {
'AWSTemplateFormatVersion': '2010-09-09',
'Description': 'foo',
'Parameters': {},
'Mappings': {},
'Resources': {
'server': {
'Type': 'OS::Nova::Server'
}
},
'Outputs': {},
}
tmpl = template.Template(t)
err = tmpl.validate()
self.assertIsNone(err)
# test with alternate version key
t = {
'HeatTemplateFormatVersion': '2012-12-12',
'Description': 'foo',
'Parameters': {},
'Mappings': {},
'Resources': {
'server': {
'Type': 'OS::Nova::Server'
}
},
'Outputs': {},
}
tmpl = template.Template(t)
err = tmpl.validate()
self.assertIsNone(err)
def test_template_validate_cfn_bad_section(self):
t = {
'AWSTemplateFormatVersion': '2010-09-09',
'Description': 'foo',
'Parameteers': {},
'Mappings': {},
'Resources': {
'server': {
'Type': 'OS::Nova::Server'
}
},
'Outputs': {},
}
tmpl = template.Template(t)
err = self.assertRaises(exception.InvalidTemplateSection,
tmpl.validate)
self.assertIn('Parameteers', six.text_type(err))
def test_template_validate_cfn_empty(self):
t = template_format.parse('''
AWSTemplateFormatVersion: 2010-09-09
Parameters:
Resources:
Outputs:
''')
tmpl = template.Template(t)
err = tmpl.validate()
self.assertIsNone(err)
def test_template_validate_hot_good(self):
t = {
'heat_template_version': '2013-05-23',
'description': 'foo',
'parameters': {},
'resources': {
'server': {
'type': 'OS::Nova::Server'
}
},
'outputs': {},
}
tmpl = template.Template(t)
err = tmpl.validate()
self.assertIsNone(err)
def test_template_validate_hot_bad_section(self):
t = {
'heat_template_version': '2013-05-23',
'description': 'foo',
'parameteers': {},
'resources': {
'server': {
'type': 'OS::Nova::Server'
}
},
'outputs': {},
}
tmpl = template.Template(t)
err = self.assertRaises(exception.InvalidTemplateSection,
tmpl.validate)
self.assertIn('parameteers', six.text_type(err))
class TemplateTest(common.HeatTestCase):
def setUp(self):
super(TemplateTest, self).setUp()
self.ctx = utils.dummy_context()
resource._register_class('GenericResourceType',
generic_rsrc.GenericResource)
@staticmethod
def resolve(snippet, template, stack=None):
return function.resolve(template.parse(stack, snippet))
def test_defaults(self):
empty = template.Template(empty_template)
self.assertNotIn('AWSTemplateFormatVersion', empty)
self.assertEqual('No description', empty['Description'])
self.assertEqual({}, empty['Mappings'])
self.assertEqual({}, empty['Resources'])
self.assertEqual({}, empty['Outputs'])
def test_aws_version(self):
tmpl = template.Template(mapping_template)
self.assertEqual(('AWSTemplateFormatVersion', '2010-09-09'),
tmpl.version)
def test_heat_version(self):
tmpl = template.Template(resource_template)
self.assertEqual(('HeatTemplateFormatVersion', '2012-12-12'),
tmpl.version)
def test_invalid_hot_version(self):
invalid_hot_version_tmp = template_format.parse(
'''{
"heat_template_version" : "2012-12-12",
}''')
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
template.Template, invalid_hot_version_tmp)
valid_versions = ['2014-10-16', '2013-05-23', '2015-04-30']
ex_error_msg = ('The template version is invalid: '
'"heat_template_version: 2012-12-12". '
'"heat_template_version" should be one of: %s'
% ', '.join(valid_versions))
self.assertEqual(ex_error_msg, six.text_type(init_ex))
def test_invalid_version_not_in_hot_versions(self):
invalid_hot_version_tmp = template_format.parse(
'''{
"heat_template_version" : "2012-12-12",
}''')
versions = {
('heat_template_version', '2013-05-23'): hot_t.HOTemplate20130523,
('heat_template_version', '2013-06-23'): hot_t.HOTemplate20130523
}
temp_copy = copy.deepcopy(template._template_classes)
template._template_classes = versions
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
template.Template, invalid_hot_version_tmp)
ex_error_msg = ('The template version is invalid: '
'"heat_template_version: 2012-12-12". '
'"heat_template_version" should be '
'one of: 2013-05-23, 2013-06-23')
self.assertEqual(ex_error_msg, six.text_type(init_ex))
template._template_classes = temp_copy
def test_invalid_aws_version(self):
invalid_aws_version_tmp = template_format.parse(
'''{
"AWSTemplateFormatVersion" : "2012-12-12",
}''')
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
template.Template, invalid_aws_version_tmp)
ex_error_msg = ('The template version is invalid: '
'"AWSTemplateFormatVersion: 2012-12-12". '
'"AWSTemplateFormatVersion" should be: 2010-09-09')
self.assertEqual(ex_error_msg, six.text_type(init_ex))
def test_invalid_version_not_in_aws_versions(self):
invalid_aws_version_tmp = template_format.parse(
'''{
"AWSTemplateFormatVersion" : "2012-12-12",
}''')
versions = {
('AWSTemplateFormatVersion', '2010-09-09'): cfn_t.CfnTemplate,
('AWSTemplateFormatVersion', '2011-06-23'): cfn_t.CfnTemplate
}
temp_copy = copy.deepcopy(template._template_classes)
template._template_classes = versions
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
template.Template, invalid_aws_version_tmp)
ex_error_msg = ('The template version is invalid: '
'"AWSTemplateFormatVersion: 2012-12-12". '
'"AWSTemplateFormatVersion" should be '
'one of: 2010-09-09, 2011-06-23')
self.assertEqual(ex_error_msg, six.text_type(init_ex))
template._template_classes = temp_copy
def test_invalid_heat_version(self):
invalid_heat_version_tmp = template_format.parse(
'''{
"HeatTemplateFormatVersion" : "2010-09-09",
}''')
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
template.Template,
invalid_heat_version_tmp)
ex_error_msg = ('The template version is invalid: '
'"HeatTemplateFormatVersion: 2010-09-09". '
'"HeatTemplateFormatVersion" should be: 2012-12-12')
self.assertEqual(ex_error_msg, six.text_type(init_ex))
def test_invalid_version_not_in_heat_versions(self):
invalid_heat_version_tmp = template_format.parse(
'''{
"HeatTemplateFormatVersion" : "2010-09-09",
}''')
versions = {
('HeatTemplateFormatVersion', '2012-12-12'): cfn_t.CfnTemplate,
('HeatTemplateFormatVersion', '2014-12-12'): cfn_t.CfnTemplate
}
temp_copy = copy.deepcopy(template._template_classes)
template._template_classes = versions
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
template.Template,
invalid_heat_version_tmp)
ex_error_msg = ('The template version is invalid: '
'"HeatTemplateFormatVersion: 2010-09-09". '
'"HeatTemplateFormatVersion" should be '
'one of: 2012-12-12, 2014-12-12')
self.assertEqual(ex_error_msg, six.text_type(init_ex))
template._template_classes = temp_copy
def test_invalid_template(self):
scanner_error = '''
1
Mappings:
ValidMapping:
TestKey: TestValue
'''
parser_error = '''
Mappings:
ValidMapping:
TestKey: {TestKey1: "Value1" TestKey2: "Value2"}
'''
self.assertRaises(ValueError, template_format.parse, scanner_error)
self.assertRaises(ValueError, template_format.parse, parser_error)
def test_invalid_section(self):
tmpl = template.Template({'HeatTemplateFormatVersion': '2012-12-12',
'Foo': ['Bar']})
self.assertNotIn('Foo', tmpl)
def test_find_in_map(self):
tmpl = template.Template(mapping_template)
stk = stack.Stack(self.ctx, 'test', tmpl)
find = {'Fn::FindInMap': ["ValidMapping", "TestKey", "TestValue"]}
self.assertEqual("wibble", self.resolve(find, tmpl, stk))
def test_find_in_invalid_map(self):
tmpl = template.Template(mapping_template)
stk = stack.Stack(self.ctx, 'test', tmpl)
finds = ({'Fn::FindInMap': ["InvalidMapping", "ValueList", "foo"]},
{'Fn::FindInMap': ["InvalidMapping", "ValueString", "baz"]},
{'Fn::FindInMap': ["MapList", "foo", "bar"]},
{'Fn::FindInMap': ["MapString", "foo", "bar"]})
for find in finds:
self.assertRaises((KeyError, TypeError), self.resolve,
find, tmpl, stk)
def test_bad_find_in_map(self):
tmpl = template.Template(mapping_template)
stk = stack.Stack(self.ctx, 'test', tmpl)
finds = ({'Fn::FindInMap': "String"},
{'Fn::FindInMap': {"Dict": "String"}},
{'Fn::FindInMap': ["ShortList", "foo"]},
{'Fn::FindInMap': ["ReallyShortList"]})
for find in finds:
self.assertRaises(KeyError, self.resolve, find, tmpl, stk)
def test_param_refs(self):
env = environment.Environment({'foo': 'bar', 'blarg': 'wibble'})
tmpl = template.Template(parameter_template, env=env)
stk = stack.Stack(self.ctx, 'test', tmpl)
p_snippet = {"Ref": "foo"}
self.assertEqual("bar", self.resolve(p_snippet, tmpl, stk))
def test_param_ref_missing(self):
env = environment.Environment({'foo': 'bar'})
tmpl = template.Template(parameter_template, env=env)
stk = stack.Stack(self.ctx, 'test', tmpl)
tmpl.env = environment.Environment({})
stk.parameters = parameters.Parameters(stk.identifier(), tmpl)
snippet = {"Ref": "foo"}
self.assertRaises(exception.UserParameterMissing,
self.resolve,
snippet, tmpl, stk)
def test_resource_refs(self):
tmpl = template.Template(resource_template)
stk = stack.Stack(self.ctx, 'test', tmpl)
self.m.StubOutWithMock(stk['foo'], 'FnGetRefId')
stk['foo'].FnGetRefId().MultipleTimes().AndReturn('bar')
self.m.ReplayAll()
r_snippet = {"Ref": "foo"}
self.assertEqual("bar", self.resolve(r_snippet, tmpl, stk))
self.m.VerifyAll()
def test_resource_refs_param(self):
tmpl = template.Template(resource_template)
stk = stack.Stack(self.ctx, 'test', tmpl)
p_snippet = {"Ref": "baz"}
parsed = tmpl.parse(stk, p_snippet)
self.assertTrue(isinstance(parsed, cfn_funcs.ParamRef))
def test_select_from_list(self):
tmpl = template.Template(empty_template)
data = {"Fn::Select": ["1", ["foo", "bar"]]}
self.assertEqual("bar", self.resolve(data, tmpl))
def test_select_from_list_integer_index(self):
tmpl = template.Template(empty_template)
data = {"Fn::Select": [1, ["foo", "bar"]]}
self.assertEqual("bar", self.resolve(data, tmpl))
def test_select_from_list_out_of_bound(self):
tmpl = template.Template(empty_template)
data = {"Fn::Select": ["0", ["foo", "bar"]]}
self.assertEqual("foo", self.resolve(data, tmpl))
data = {"Fn::Select": ["1", ["foo", "bar"]]}
self.assertEqual("bar", self.resolve(data, tmpl))
data = {"Fn::Select": ["2", ["foo", "bar"]]}
self.assertEqual("", self.resolve(data, tmpl))
def test_select_from_dict(self):
tmpl = template.Template(empty_template)
data = {"Fn::Select": ["red", {"red": "robin", "re": "foo"}]}
self.assertEqual("robin", self.resolve(data, tmpl))
def test_select_from_none(self):
tmpl = template.Template(empty_template)
data = {"Fn::Select": ["red", None]}
self.assertEqual("", self.resolve(data, tmpl))
def test_select_from_dict_not_existing(self):
tmpl = template.Template(empty_template)
data = {"Fn::Select": ["green", {"red": "robin", "re": "foo"}]}
self.assertEqual("", self.resolve(data, tmpl))
def test_select_from_serialized_json_map(self):
tmpl = template.Template(empty_template)
js = json.dumps({"red": "robin", "re": "foo"})
data = {"Fn::Select": ["re", js]}
self.assertEqual("foo", self.resolve(data, tmpl))
def test_select_from_serialized_json_list(self):
tmpl = template.Template(empty_template)
js = json.dumps(["foo", "fee", "fum"])
data = {"Fn::Select": ["0", js]}
self.assertEqual("foo", self.resolve(data, tmpl))
def test_select_empty_string(self):
tmpl = template.Template(empty_template)
data = {"Fn::Select": ["0", '']}
self.assertEqual("", self.resolve(data, tmpl))
data = {"Fn::Select": ["1", '']}
self.assertEqual("", self.resolve(data, tmpl))
data = {"Fn::Select": ["one", '']}
self.assertEqual("", self.resolve(data, tmpl))
def test_join(self):
tmpl = template.Template(empty_template)
join = {"Fn::Join": [" ", ["foo", "bar"]]}
self.assertEqual("foo bar", self.resolve(join, tmpl))
def test_split_ok(self):
tmpl = template.Template(empty_template)
data = {"Fn::Split": [";", "foo; bar; achoo"]}
self.assertEqual(['foo', ' bar', ' achoo'], self.resolve(data, tmpl))
def test_split_no_delim_in_str(self):
tmpl = template.Template(empty_template)
data = {"Fn::Split": [";", "foo, bar, achoo"]}
self.assertEqual(['foo, bar, achoo'], self.resolve(data, tmpl))
def test_base64(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::Base64": "foobar"}
# For now, the Base64 function just returns the original text, and
# does not convert to base64 (see issue #133)
self.assertEqual("foobar", self.resolve(snippet, tmpl))
def test_get_azs(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::GetAZs": ""}
self.assertEqual(["nova"], self.resolve(snippet, tmpl))
def test_get_azs_with_stack(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::GetAZs": ""}
stk = stack.Stack(self.ctx, 'test_stack',
template.Template(empty_template))
self.m.StubOutWithMock(nova.NovaClientPlugin, '_create')
fc = fakes_nova.FakeClient()
nova.NovaClientPlugin._create().AndReturn(fc)
self.m.ReplayAll()
self.assertEqual(["nova1"], self.resolve(snippet, tmpl, stk))
def test_replace_string_values(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::Replace": [
{'$var1': 'foo', '%var2%': 'bar'},
'$var1 is %var2%'
]}
self.assertEqual('foo is bar', self.resolve(snippet, tmpl))
def test_replace_number_values(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::Replace": [
{'$var1': 1, '%var2%': 2},
'$var1 is not %var2%'
]}
self.assertEqual('1 is not 2', self.resolve(snippet, tmpl))
snippet = {"Fn::Replace": [
{'$var1': 1.3, '%var2%': 2.5},
'$var1 is not %var2%'
]}
self.assertEqual('1.3 is not 2.5', self.resolve(snippet, tmpl))
def test_replace_none_values(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::Replace": [
{'$var1': None, '${var2}': None},
'"$var1" is "${var2}"'
]}
self.assertEqual('"" is ""', self.resolve(snippet, tmpl))
def test_replace_missing_key(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::Replace": [
{'$var1': 'foo', 'var2': 'bar'},
'"$var1" is "${var3}"'
]}
self.assertEqual('"foo" is "${var3}"', self.resolve(snippet, tmpl))
def test_replace_param_values(self):
env = environment.Environment({'foo': 'wibble'})
tmpl = template.Template(parameter_template, env=env)
stk = stack.Stack(self.ctx, 'test_stack', tmpl)
snippet = {"Fn::Replace": [
{'$var1': {'Ref': 'foo'}, '%var2%': {'Ref': 'blarg'}},
'$var1 is %var2%'
]}
self.assertEqual('wibble is quux', self.resolve(snippet, tmpl, stk))
def test_member_list2map_good(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::MemberListToMap": [
'Name', 'Value', ['.member.0.Name=metric',
'.member.0.Value=cpu',
'.member.1.Name=size',
'.member.1.Value=56']]}
self.assertEqual({'metric': 'cpu', 'size': '56'},
self.resolve(snippet, tmpl))
def test_member_list2map_good2(self):
tmpl = template.Template(empty_template)
snippet = {"Fn::MemberListToMap": [
'Key', 'Value', ['.member.2.Key=metric',
'.member.2.Value=cpu',
'.member.5.Key=size',
'.member.5.Value=56']]}
self.assertEqual({'metric': 'cpu', 'size': '56'},
self.resolve(snippet, tmpl))
def test_resource_facade(self):
metadata_snippet = {'Fn::ResourceFacade': 'Metadata'}
deletion_policy_snippet = {'Fn::ResourceFacade': 'DeletionPolicy'}
update_policy_snippet = {'Fn::ResourceFacade': 'UpdatePolicy'}
parent_resource = DummyClass()
parent_resource.metadata_set({"foo": "bar"})
parent_resource.t = rsrc_defn.ResourceDefinition(
'parent', 'SomeType',
deletion_policy=rsrc_defn.ResourceDefinition.RETAIN,
update_policy={"blarg": "wibble"})
parent_resource.stack = stack.Stack(self.ctx, 'toplevel_stack',
template.Template(empty_template))
stk = stack.Stack(self.ctx, 'test_stack',
template.Template(empty_template),
parent_resource='parent', owner_id=45)
stk._parent_stack = dict(parent=parent_resource)
self.assertEqual({"foo": "bar"},
self.resolve(metadata_snippet, stk.t, stk))
self.assertEqual('Retain',
self.resolve(deletion_policy_snippet, stk.t, stk))
self.assertEqual({"blarg": "wibble"},
self.resolve(update_policy_snippet, stk.t, stk))
def test_resource_facade_function(self):
deletion_policy_snippet = {'Fn::ResourceFacade': 'DeletionPolicy'}
parent_resource = DummyClass()
parent_resource.metadata_set({"foo": "bar"})
parent_resource.stack = stack.Stack(self.ctx, 'toplevel_stack',
template.Template(empty_template))
del_policy = cfn_funcs.Join(parent_resource.stack,
'Fn::Join', ['eta', ['R', 'in']])
parent_resource.t = rsrc_defn.ResourceDefinition(
'parent', 'SomeType',
deletion_policy=del_policy)
stk = stack.Stack(self.ctx, 'test_stack',
template.Template(empty_template),
parent_resource='parent')
stk._parent_stack = dict(parent=parent_resource)
self.assertEqual('Retain',
self.resolve(deletion_policy_snippet, stk.t, stk))
def test_resource_facade_invalid_arg(self):
snippet = {'Fn::ResourceFacade': 'wibble'}
stk = stack.Stack(self.ctx, 'test_stack',
template.Template(empty_template))
error = self.assertRaises(ValueError,
self.resolve, snippet, stk.t, stk)
self.assertIn(list(six.iterkeys(snippet))[0], six.text_type(error))
def test_resource_facade_missing_deletion_policy(self):
snippet = {'Fn::ResourceFacade': 'DeletionPolicy'}
parent_resource = DummyClass()
parent_resource.metadata_set({"foo": "bar"})
parent_resource.t = rsrc_defn.ResourceDefinition('parent', 'SomeType')
parent_resource.stack = stack.Stack(self.ctx, 'toplevel_stack',
template.Template(empty_template))
stk = stack.Stack(self.ctx, 'test_stack',
template.Template(empty_template),
parent_resource='parent', owner_id=78)
stk._parent_stack = dict(parent=parent_resource)
self.assertEqual('Delete', self.resolve(snippet, stk.t, stk))
def test_prevent_parameters_access(self):
expected_description = "This can be accessed"
tmpl = template.Template({
'AWSTemplateFormatVersion': '2010-09-09',
'Description': expected_description,
'Parameters': {
'foo': {'Type': 'String', 'Required': True}
}
})
self.assertEqual(expected_description, tmpl['Description'])
keyError = self.assertRaises(KeyError, tmpl.__getitem__, 'Parameters')
self.assertIn("can not be accessed directly", six.text_type(keyError))
def test_parameters_section_not_iterable(self):
expected_description = "This can be accessed"
tmpl = template.Template({
'AWSTemplateFormatVersion': '2010-09-09',
'Description': expected_description,
'Parameters': {
'foo': {'Type': 'String', 'Required': True}
}
})
self.assertEqual(expected_description, tmpl['Description'])
self.assertNotIn('Parameters', six.iterkeys(tmpl))
def test_add_resource(self):
cfn_tpl = template_format.parse('''
AWSTemplateFormatVersion: 2010-09-09
Resources:
resource1:
Type: AWS::EC2::Instance
Properties:
property1: value1
Metadata:
foo: bar
DependsOn: dummy
DeletionPolicy: Retain
UpdatePolicy:
foo: bar
''')
source = template.Template(cfn_tpl)
empty = template.Template(copy.deepcopy(empty_template))
stk = stack.Stack(self.ctx, 'test_stack', source)
for defn in six.itervalues(source.resource_definitions(stk)):
empty.add_resource(defn)
self.assertEqual(cfn_tpl['Resources'], empty.t['Resources'])
class TemplateFnErrorTest(common.HeatTestCase):
scenarios = [
('select_from_list_not_int',
dict(expect=TypeError,
snippet={"Fn::Select": ["one", ["foo", "bar"]]})),
('select_from_dict_not_str',
dict(expect=TypeError,
snippet={"Fn::Select": ["1", {"red": "robin", "re": "foo"}]})),
('select_from_serialized_json_wrong',
dict(expect=ValueError,
snippet={"Fn::Select": ["not", "no json"]})),
('select_wrong_num_args_1',
dict(expect=ValueError,
snippet={"Fn::Select": []})),
('select_wrong_num_args_2',
dict(expect=ValueError,
snippet={"Fn::Select": ["4"]})),
('select_wrong_num_args_3',
dict(expect=ValueError,
snippet={"Fn::Select": ["foo", {"foo": "bar"}, ""]})),
('select_wrong_num_args_4',
dict(expect=TypeError,
snippet={'Fn::Select': [['f'], {'f': 'food'}]})),
('split_no_delim',
dict(expect=ValueError,
snippet={"Fn::Split": ["foo, bar, achoo"]})),
('split_no_list',
dict(expect=TypeError,
snippet={"Fn::Split": "foo, bar, achoo"})),
('base64_list',
dict(expect=TypeError,
snippet={"Fn::Base64": ["foobar"]})),
('base64_dict',
dict(expect=TypeError,
snippet={"Fn::Base64": {"foo": "bar"}})),
('replace_list_value',
dict(expect=TypeError,
snippet={"Fn::Replace": [
{'$var1': 'foo', '%var2%': ['bar']},
'$var1 is %var2%']})),
('replace_list_mapping',
dict(expect=TypeError,
snippet={"Fn::Replace": [
['var1', 'foo', 'var2', 'bar'],
'$var1 is ${var2}']})),
('replace_dict',
dict(expect=TypeError,
snippet={"Fn::Replace": {}})),
('replace_missing_template',
dict(expect=ValueError,
snippet={"Fn::Replace": [['var1', 'foo', 'var2', 'bar']]})),
('replace_none_template',
dict(expect=TypeError,
snippet={"Fn::Replace": [['var2', 'bar'], None]})),
('replace_list_string',
dict(expect=TypeError,
snippet={"Fn::Replace": [
{'var1': 'foo', 'var2': 'bar'},
['$var1 is ${var2}']]})),
('join_string',
dict(expect=TypeError,
snippet={"Fn::Join": [" ", "foo"]})),
('join_dict',
dict(expect=TypeError,
snippet={"Fn::Join": [" ", {"foo": "bar"}]})),
('join_wrong_num_args_1',
dict(expect=ValueError,
snippet={"Fn::Join": []})),
('join_wrong_num_args_2',
dict(expect=ValueError,
snippet={"Fn::Join": [" "]})),
('join_wrong_num_args_3',
dict(expect=ValueError,
snippet={"Fn::Join": [" ", {"foo": "bar"}, ""]})),
('join_string_nodelim',
dict(expect=TypeError,
snippet={"Fn::Join": "o"})),
('join_string_nodelim_1',
dict(expect=TypeError,
snippet={"Fn::Join": "oh"})),
('join_string_nodelim_2',
dict(expect=TypeError,
snippet={"Fn::Join": "ohh"})),
('join_dict_nodelim1',
dict(expect=TypeError,
snippet={"Fn::Join": {"foo": "bar"}})),
('join_dict_nodelim2',
dict(expect=TypeError,
snippet={"Fn::Join": {"foo": "bar", "blarg": "wibble"}})),
('join_dict_nodelim3',
dict(expect=TypeError,
snippet={"Fn::Join": {"foo": "bar", "blarg": "wibble",
"baz": "quux"}})),
('member_list2map_no_key_or_val',
dict(expect=TypeError,
snippet={"Fn::MemberListToMap": [
'Key', ['.member.2.Key=metric',
'.member.2.Value=cpu',
'.member.5.Key=size',
'.member.5.Value=56']]})),
('member_list2map_no_list',
dict(expect=TypeError,
snippet={"Fn::MemberListToMap": [
'Key', '.member.2.Key=metric']})),
('member_list2map_not_string',
dict(expect=TypeError,
snippet={"Fn::MemberListToMap": [
'Name', ['Value'], ['.member.0.Name=metric',
'.member.0.Value=cpu',
'.member.1.Name=size',
'.member.1.Value=56']]})),
]
def test_bad_input(self):
tmpl = template.Template(empty_template)
resolve = lambda s: TemplateTest.resolve(s, tmpl)
error = self.assertRaises(self.expect,
resolve,
self.snippet)
self.assertIn(list(six.iterkeys(self.snippet))[0],
six.text_type(error))
class ResolveDataTest(common.HeatTestCase):
def setUp(self):
super(ResolveDataTest, self).setUp()
self.username = 'parser_stack_test_user'
self.ctx = utils.dummy_context()
self.stack = stack.Stack(self.ctx, 'resolve_test_stack',
template.Template(empty_template))
def resolve(self, snippet):
return function.resolve(self.stack.t.parse(self.stack, snippet))
def test_join_split(self):
# join
snippet = {'Fn::Join': [';', ['one', 'two', 'three']]}
self.assertEqual('one;two;three', self.resolve(snippet))
# join then split
snippet = {'Fn::Split': [';', snippet]}
self.assertEqual(['one', 'two', 'three'], self.resolve(snippet))
def test_split_join_split_join(self):
# each snippet in this test encapsulates
# the snippet from the previous step, leading
# to increasingly nested function calls
# split
snippet = {'Fn::Split': [',', 'one,two,three']}
self.assertEqual(['one', 'two', 'three'], self.resolve(snippet))
# split then join
snippet = {'Fn::Join': [';', snippet]}
self.assertEqual('one;two;three', self.resolve(snippet))
# split then join then split
snippet = {'Fn::Split': [';', snippet]}
self.assertEqual(['one', 'two', 'three'], self.resolve(snippet))
# split then join then split then join
snippet = {'Fn::Join': ['-', snippet]}
self.assertEqual('one-two-three', self.resolve(snippet))
def test_join_recursive(self):
raw = {'Fn::Join': ['\n', [{'Fn::Join':
[' ', ['foo', 'bar']]}, 'baz']]}
self.assertEqual('foo bar\nbaz', self.resolve(raw))
def test_join_not_string(self):
snippet = {'Fn::Join': ['\n', [{'Fn::Join':
[' ', ['foo', 45]]}, 'baz']]}
error = self.assertRaises(TypeError,
self.resolve, snippet)
self.assertIn('45', six.text_type(error))
def test_base64_replace(self):
raw = {'Fn::Base64': {'Fn::Replace': [
{'foo': 'bar'}, 'Meet at the foo']}}
self.assertEqual('Meet at the bar', self.resolve(raw))
def test_replace_base64(self):
raw = {'Fn::Replace': [{'foo': 'bar'}, {
'Fn::Base64': 'Meet at the foo'}]}
self.assertEqual('Meet at the bar', self.resolve(raw))
def test_nested_selects(self):
data = {
'a': ['one', 'two', 'three'],
'b': ['een', 'twee', {'d': 'D', 'e': 'E'}]
}
raw = {'Fn::Select': ['a', data]}
self.assertEqual(data['a'], self.resolve(raw))
raw = {'Fn::Select': ['b', data]}
self.assertEqual(data['b'], self.resolve(raw))
raw = {
'Fn::Select': ['1', {
'Fn::Select': ['b', data]
}]
}
self.assertEqual('twee', self.resolve(raw))
raw = {
'Fn::Select': ['e', {
'Fn::Select': ['2', {
'Fn::Select': ['b', data]
}]
}]
}
self.assertEqual('E', self.resolve(raw))
def test_member_list_select(self):
snippet = {'Fn::Select': ['metric', {"Fn::MemberListToMap": [
'Name', 'Value', ['.member.0.Name=metric',
'.member.0.Value=cpu',
'.member.1.Name=size',
'.member.1.Value=56']]}]}
self.assertEqual('cpu', self.resolve(snippet))