Merge "Refactor test_parser module"
This commit is contained in:
commit
a26d61357c
|
@ -31,9 +31,11 @@ from heat.engine import rsrc_defn
|
|||
from heat.engine import template
|
||||
from heat.tests import common
|
||||
from heat.tests import generic_resource as generic_rsrc
|
||||
from heat.tests import test_parser
|
||||
from heat.tests import utils
|
||||
|
||||
empty_template = template_format.parse('''{
|
||||
"HeatTemplateFormatVersion" : "2012-12-12",
|
||||
}''')
|
||||
|
||||
hot_tpl_empty = template_format.parse('''
|
||||
heat_template_version: 2013-05-23
|
||||
|
@ -844,8 +846,22 @@ class HOTemplateTest(common.HeatTestCase):
|
|||
self.assertEqual(hot_tpl['resources'], empty.t['resources'])
|
||||
|
||||
|
||||
class StackTest(test_parser.StackTest):
|
||||
class HotStackTest(common.HeatTestCase):
|
||||
"""Test stack function when stack was created from HOT template."""
|
||||
def setUp(self):
|
||||
super(HotStackTest, self).setUp()
|
||||
|
||||
self.tmpl = template.Template(copy.deepcopy(empty_template))
|
||||
self.ctx = utils.dummy_context()
|
||||
|
||||
resource._register_class('GenericResourceType',
|
||||
generic_rsrc.GenericResource)
|
||||
resource._register_class('ResourceWithPropsType',
|
||||
generic_rsrc.ResourceWithProps)
|
||||
resource._register_class('ResourceWithComplexAttributesType',
|
||||
generic_rsrc.ResourceWithComplexAttributes)
|
||||
resource._register_class('ResWithComplexPropsAndAttrs',
|
||||
generic_rsrc.ResWithComplexPropsAndAttrs)
|
||||
|
||||
def resolve(self, snippet):
|
||||
return function.resolve(self.stack.t.parse(self.stack, snippet))
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,429 @@
|
|||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import time
|
||||
|
||||
from keystoneclient import exceptions as kc_exceptions
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from heat.common import exception
|
||||
from heat.common import heat_keystoneclient as hkc
|
||||
from heat.common import template_format
|
||||
import heat.db.api as db_api
|
||||
from heat.engine.clients.os import keystone
|
||||
from heat.engine import resource
|
||||
from heat.engine import scheduler
|
||||
from heat.engine import stack
|
||||
from heat.engine import template
|
||||
from heat.tests import common
|
||||
from heat.tests import fakes
|
||||
from heat.tests import generic_resource as generic_rsrc
|
||||
from heat.tests import utils
|
||||
|
||||
empty_template = template_format.parse('''{
|
||||
"HeatTemplateFormatVersion" : "2012-12-12",
|
||||
}''')
|
||||
|
||||
|
||||
class StackTest(common.HeatTestCase):
|
||||
def setUp(self):
|
||||
super(StackTest, self).setUp()
|
||||
|
||||
self.tmpl = template.Template(copy.deepcopy(empty_template))
|
||||
self.ctx = utils.dummy_context()
|
||||
resource._register_class('GenericResourceType',
|
||||
generic_rsrc.GenericResource)
|
||||
|
||||
def test_delete(self):
|
||||
self.stack = stack.Stack(self.ctx, 'delete_test', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
def test_delete_user_creds(self):
|
||||
self.stack = stack.Stack(self.ctx, 'delete_test', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
self.assertIsNotNone(db_s.user_creds_id)
|
||||
user_creds_id = db_s.user_creds_id
|
||||
db_creds = db_api.user_creds_get(db_s.user_creds_id)
|
||||
self.assertIsNotNone(db_creds)
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
db_creds = db_api.user_creds_get(user_creds_id)
|
||||
self.assertIsNone(db_creds)
|
||||
del_db_s = db_api.stack_get(self.ctx, stack_id, show_deleted=True)
|
||||
self.assertIsNone(del_db_s.user_creds_id)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
def test_delete_user_creds_gone_missing(self):
|
||||
'''Do not block stack deletion if user_creds is missing.
|
||||
|
||||
It may happen that user_creds were deleted when a delete operation was
|
||||
stopped. We should be resilient to this and still complete the delete
|
||||
operation.
|
||||
'''
|
||||
self.stack = stack.Stack(self.ctx, 'delete_test', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
self.assertIsNotNone(db_s.user_creds_id)
|
||||
user_creds_id = db_s.user_creds_id
|
||||
db_creds = db_api.user_creds_get(db_s.user_creds_id)
|
||||
self.assertIsNotNone(db_creds)
|
||||
|
||||
db_api.user_creds_delete(self.ctx, user_creds_id)
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
db_creds = db_api.user_creds_get(user_creds_id)
|
||||
self.assertIsNone(db_creds)
|
||||
del_db_s = db_api.stack_get(self.ctx, stack_id, show_deleted=True)
|
||||
self.assertIsNone(del_db_s.user_creds_id)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
def test_delete_user_creds_fail(self):
|
||||
'''Do not stop deleting stacks even failed deleting user_creds.
|
||||
|
||||
It may happen that user_creds were incorrectly saved (truncated) and
|
||||
thus cannot be correctly retrieved (and decrypted). In this case,
|
||||
stack delete should not be stopped.
|
||||
'''
|
||||
self.stack = stack.Stack(self.ctx, 'delete_test', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
self.assertIsNotNone(db_s.user_creds_id)
|
||||
exc = exception.Error('Cannot get user credentials')
|
||||
self.patchobject(db_api, 'user_creds_get').side_effect = exc
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
def test_delete_trust(self):
|
||||
cfg.CONF.set_override('deferred_auth_method', 'trusts')
|
||||
self.stub_keystoneclient()
|
||||
|
||||
self.stack = stack.Stack(self.ctx, 'delete_trust', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
def test_delete_trust_trustor(self):
|
||||
cfg.CONF.set_override('deferred_auth_method', 'trusts')
|
||||
|
||||
trustor_ctx = utils.dummy_context(user_id='thetrustor')
|
||||
self.m.StubOutWithMock(hkc, 'KeystoneClient')
|
||||
hkc.KeystoneClient(trustor_ctx).AndReturn(
|
||||
fakes.FakeKeystoneClient(user_id='thetrustor'))
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = stack.Stack(trustor_ctx, 'delete_trust_nt', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
user_creds_id = db_s.user_creds_id
|
||||
self.assertIsNotNone(user_creds_id)
|
||||
user_creds = db_api.user_creds_get(user_creds_id)
|
||||
self.assertEqual('thetrustor', user_creds.get('trustor_user_id'))
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(trustor_ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
def test_delete_trust_not_trustor(self):
|
||||
cfg.CONF.set_override('deferred_auth_method', 'trusts')
|
||||
|
||||
# Stack gets created with trustor_ctx, deleted with other_ctx
|
||||
# then the trust delete should be with stored_ctx
|
||||
trustor_ctx = utils.dummy_context(user_id='thetrustor')
|
||||
other_ctx = utils.dummy_context(user_id='nottrustor')
|
||||
stored_ctx = utils.dummy_context(trust_id='thetrust')
|
||||
|
||||
self.m.StubOutWithMock(hkc, 'KeystoneClient')
|
||||
hkc.KeystoneClient(trustor_ctx).AndReturn(
|
||||
fakes.FakeKeystoneClient(user_id='thetrustor'))
|
||||
self.m.StubOutWithMock(stack.Stack, 'stored_context')
|
||||
stack.Stack.stored_context().AndReturn(stored_ctx)
|
||||
hkc.KeystoneClient(stored_ctx).AndReturn(
|
||||
fakes.FakeKeystoneClient(user_id='nottrustor'))
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = stack.Stack(trustor_ctx, 'delete_trust_nt', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
user_creds_id = db_s.user_creds_id
|
||||
self.assertIsNotNone(user_creds_id)
|
||||
user_creds = db_api.user_creds_get(user_creds_id)
|
||||
self.assertEqual('thetrustor', user_creds.get('trustor_user_id'))
|
||||
|
||||
loaded_stack = stack.Stack.load(other_ctx, self.stack.id)
|
||||
loaded_stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(other_ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
loaded_stack.state)
|
||||
|
||||
def test_delete_trust_backup(self):
|
||||
cfg.CONF.set_override('deferred_auth_method', 'trusts')
|
||||
|
||||
class FakeKeystoneClientFail(fakes.FakeKeystoneClient):
|
||||
def delete_trust(self, trust_id):
|
||||
raise Exception("Shouldn't delete")
|
||||
|
||||
self.m.StubOutWithMock(keystone.KeystoneClientPlugin, '_create')
|
||||
keystone.KeystoneClientPlugin._create().AndReturn(
|
||||
FakeKeystoneClientFail())
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = stack.Stack(self.ctx, 'delete_trust', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
self.stack.delete(backup=True)
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
self.assertEqual(self.stack.state,
|
||||
(stack.Stack.DELETE, stack.Stack.COMPLETE))
|
||||
|
||||
def test_delete_trust_nested(self):
|
||||
cfg.CONF.set_override('deferred_auth_method', 'trusts')
|
||||
|
||||
class FakeKeystoneClientFail(fakes.FakeKeystoneClient):
|
||||
def delete_trust(self, trust_id):
|
||||
raise Exception("Shouldn't delete")
|
||||
|
||||
self.stub_keystoneclient(fake_client=FakeKeystoneClientFail())
|
||||
|
||||
self.stack = stack.Stack(self.ctx, 'delete_trust_nested', self.tmpl,
|
||||
owner_id='owner123')
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
user_creds_id = db_s.user_creds_id
|
||||
self.assertIsNotNone(user_creds_id)
|
||||
user_creds = db_api.user_creds_get(user_creds_id)
|
||||
self.assertIsNotNone(user_creds)
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
user_creds = db_api.user_creds_get(user_creds_id)
|
||||
self.assertIsNotNone(user_creds)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
def test_delete_trust_fail(self):
|
||||
cfg.CONF.set_override('deferred_auth_method', 'trusts')
|
||||
|
||||
class FakeKeystoneClientFail(fakes.FakeKeystoneClient):
|
||||
def delete_trust(self, trust_id):
|
||||
raise kc_exceptions.Forbidden("Denied!")
|
||||
|
||||
self.m.StubOutWithMock(keystone.KeystoneClientPlugin, '_create')
|
||||
keystone.KeystoneClientPlugin._create().AndReturn(
|
||||
FakeKeystoneClientFail())
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = stack.Stack(self.ctx, 'delete_trust', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.FAILED),
|
||||
self.stack.state)
|
||||
self.assertIn('Error deleting trust', self.stack.status_reason)
|
||||
|
||||
def test_delete_deletes_project(self):
|
||||
fkc = fakes.FakeKeystoneClient()
|
||||
fkc.delete_stack_domain_project = mock.Mock()
|
||||
|
||||
self.m.StubOutWithMock(keystone.KeystoneClientPlugin, '_create')
|
||||
keystone.KeystoneClientPlugin._create().AndReturn(fkc)
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = stack.Stack(self.ctx, 'delete_trust', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
self.stack.set_stack_user_project_id(project_id='aproject456')
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
fkc.delete_stack_domain_project.assert_called_once_with(
|
||||
project_id='aproject456')
|
||||
|
||||
def test_delete_rollback(self):
|
||||
self.stack = stack.Stack(self.ctx, 'delete_rollback_test',
|
||||
self.tmpl, disable_rollback=False)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
self.stack.delete(action=self.stack.ROLLBACK)
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNone(db_s)
|
||||
self.assertEqual((stack.Stack.ROLLBACK, stack.Stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
def test_delete_badaction(self):
|
||||
self.stack = stack.Stack(self.ctx, 'delete_badaction_test', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
self.stack.delete(action="wibble")
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.FAILED),
|
||||
self.stack.state)
|
||||
|
||||
def test_stack_delete_timeout(self):
|
||||
self.stack = stack.Stack(self.ctx, 'delete_test', self.tmpl)
|
||||
stack_id = self.stack.store()
|
||||
|
||||
db_s = db_api.stack_get(self.ctx, stack_id)
|
||||
self.assertIsNotNone(db_s)
|
||||
|
||||
self.m.StubOutWithMock(scheduler.DependencyTaskGroup, '__call__')
|
||||
self.m.StubOutWithMock(scheduler, 'wallclock')
|
||||
|
||||
def dummy_task():
|
||||
while True:
|
||||
yield
|
||||
|
||||
start_time = time.time()
|
||||
scheduler.wallclock().AndReturn(start_time)
|
||||
scheduler.wallclock().AndReturn(start_time + 1)
|
||||
scheduler.DependencyTaskGroup.__call__().AndReturn(dummy_task())
|
||||
scheduler.wallclock().AndReturn(
|
||||
start_time + self.stack.timeout_secs() + 1)
|
||||
self.m.ReplayAll()
|
||||
self.stack.delete()
|
||||
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.FAILED),
|
||||
self.stack.state)
|
||||
self.assertEqual('Delete timed out', self.stack.status_reason)
|
||||
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_stack_delete_resourcefailure(self):
|
||||
tmpl = {'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {'AResource': {'Type': 'GenericResourceType'}}}
|
||||
self.m.StubOutWithMock(generic_rsrc.GenericResource, 'handle_delete')
|
||||
exc = Exception('foo')
|
||||
generic_rsrc.GenericResource.handle_delete().AndRaise(exc)
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = stack.Stack(self.ctx, 'delete_test_fail',
|
||||
template.Template(tmpl))
|
||||
|
||||
self.stack.store()
|
||||
self.stack.create()
|
||||
self.assertEqual((self.stack.CREATE, self.stack.COMPLETE),
|
||||
self.stack.state)
|
||||
|
||||
self.stack.delete()
|
||||
|
||||
self.assertEqual((self.stack.DELETE, self.stack.FAILED),
|
||||
self.stack.state)
|
||||
self.assertEqual('Resource DELETE failed: Exception: foo',
|
||||
self.stack.status_reason)
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_stack_user_project_id_delete_fail(self):
|
||||
|
||||
class FakeKeystoneClientFail(fakes.FakeKeystoneClient):
|
||||
def delete_stack_domain_project(self, project_id):
|
||||
raise kc_exceptions.Forbidden("Denied!")
|
||||
|
||||
self.m.StubOutWithMock(keystone.KeystoneClientPlugin, '_create')
|
||||
keystone.KeystoneClientPlugin._create().AndReturn(
|
||||
FakeKeystoneClientFail())
|
||||
self.m.ReplayAll()
|
||||
|
||||
self.stack = stack.Stack(self.ctx, 'user_project_init',
|
||||
self.tmpl,
|
||||
stack_user_project_id='aproject1234')
|
||||
self.stack.store()
|
||||
self.assertEqual('aproject1234', self.stack.stack_user_project_id)
|
||||
db_stack = db_api.stack_get(self.ctx, self.stack.id)
|
||||
self.assertEqual('aproject1234', db_stack.stack_user_project_id)
|
||||
|
||||
self.stack.delete()
|
||||
self.assertEqual((stack.Stack.DELETE, stack.Stack.FAILED),
|
||||
self.stack.state)
|
||||
self.assertIn('Error deleting project', self.stack.status_reason)
|
||||
self.m.VerifyAll()
|
File diff suppressed because it is too large
Load Diff
|
@ -11,6 +11,10 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import json
|
||||
import warnings
|
||||
|
||||
import fixtures
|
||||
from oslotest import mockpatch
|
||||
import six
|
||||
|
@ -18,9 +22,73 @@ from stevedore import extension
|
|||
|
||||
from heat.common import exception
|
||||
from heat.common import template_format
|
||||
from heat.engine.cfn import functions as cfn_funcs
|
||||
from heat.engine.cfn import template as cfn_t
|
||||
from heat.engine.clients.os import nova
|
||||
from heat.engine import environment
|
||||
from heat.engine import function
|
||||
from heat.engine.hot import template as hot_t
|
||||
from heat.engine import parameters
|
||||
from heat.engine import resource
|
||||
from heat.engine import rsrc_defn
|
||||
from heat.engine import stack
|
||||
from heat.engine import template
|
||||
from heat.tests import common
|
||||
from heat.tests import generic_resource as generic_rsrc
|
||||
from heat.tests import utils
|
||||
from heat.tests.v1_1 import fakes as fakes_v1_1
|
||||
|
||||
|
||||
mapping_template = template_format.parse('''{
|
||||
"AWSTemplateFormatVersion" : "2010-09-09",
|
||||
"Mappings" : {
|
||||
"ValidMapping" : {
|
||||
"TestKey" : { "TestValue" : "wibble" }
|
||||
},
|
||||
"InvalidMapping" : {
|
||||
"ValueList" : [ "foo", "bar" ],
|
||||
"ValueString" : "baz"
|
||||
},
|
||||
"MapList": [ "foo", { "bar" : "baz" } ],
|
||||
"MapString": "foobar"
|
||||
}
|
||||
}''')
|
||||
|
||||
empty_template = template_format.parse('''{
|
||||
"HeatTemplateFormatVersion" : "2012-12-12",
|
||||
}''')
|
||||
|
||||
parameter_template = template_format.parse('''{
|
||||
"HeatTemplateFormatVersion" : "2012-12-12",
|
||||
"Parameters" : {
|
||||
"foo" : { "Type" : "String" },
|
||||
"blarg" : { "Type" : "String", "Default": "quux" }
|
||||
}
|
||||
}''')
|
||||
|
||||
|
||||
resource_template = template_format.parse('''{
|
||||
"HeatTemplateFormatVersion" : "2012-12-12",
|
||||
"Resources" : {
|
||||
"foo" : { "Type" : "GenericResourceType" },
|
||||
"blarg" : { "Type" : "GenericResourceType" }
|
||||
}
|
||||
}''')
|
||||
|
||||
|
||||
def join(raw):
|
||||
tmpl = template.Template(mapping_template)
|
||||
return function.resolve(tmpl.parse(None, raw))
|
||||
|
||||
|
||||
class DummyClass(object):
|
||||
metadata = None
|
||||
|
||||
def metadata_get(self):
|
||||
return self.metadata
|
||||
|
||||
def metadata_set(self, metadata):
|
||||
self.metadata = metadata
|
||||
|
||||
|
||||
class TemplatePluginFixture(fixtures.Fixture):
|
||||
|
@ -141,6 +209,66 @@ class TestTemplateVersion(common.HeatTestCase):
|
|||
template.get_version, tmpl, self.versions)
|
||||
|
||||
|
||||
class ParserTest(common.HeatTestCase):
|
||||
|
||||
def test_list(self):
|
||||
raw = ['foo', 'bar', 'baz']
|
||||
parsed = join(raw)
|
||||
for i in six.moves.xrange(len(raw)):
|
||||
self.assertEqual(raw[i], parsed[i])
|
||||
self.assertIsNot(raw, parsed)
|
||||
|
||||
def test_dict(self):
|
||||
raw = {'foo': 'bar', 'blarg': 'wibble'}
|
||||
parsed = join(raw)
|
||||
for k in raw:
|
||||
self.assertEqual(raw[k], parsed[k])
|
||||
self.assertIsNot(raw, parsed)
|
||||
|
||||
def test_dict_list(self):
|
||||
raw = {'foo': ['bar', 'baz'], 'blarg': 'wibble'}
|
||||
parsed = join(raw)
|
||||
self.assertEqual(raw['blarg'], parsed['blarg'])
|
||||
for i in six.moves.xrange(len(raw['foo'])):
|
||||
self.assertEqual(raw['foo'][i], parsed['foo'][i])
|
||||
self.assertIsNot(raw, parsed)
|
||||
self.assertIsNot(raw['foo'], parsed['foo'])
|
||||
|
||||
def test_list_dict(self):
|
||||
raw = [{'foo': 'bar', 'blarg': 'wibble'}, 'baz', 'quux']
|
||||
parsed = join(raw)
|
||||
for i in six.moves.xrange(1, len(raw)):
|
||||
self.assertEqual(raw[i], parsed[i])
|
||||
for k in raw[0]:
|
||||
self.assertEqual(raw[0][k], parsed[0][k])
|
||||
self.assertIsNot(raw, parsed)
|
||||
self.assertIsNot(raw[0], parsed[0])
|
||||
|
||||
def test_join(self):
|
||||
raw = {'Fn::Join': [' ', ['foo', 'bar', 'baz']]}
|
||||
self.assertEqual('foo bar baz', join(raw))
|
||||
|
||||
def test_join_none(self):
|
||||
raw = {'Fn::Join': [' ', ['foo', None, 'baz']]}
|
||||
self.assertEqual('foo baz', join(raw))
|
||||
|
||||
def test_join_list(self):
|
||||
raw = [{'Fn::Join': [' ', ['foo', 'bar', 'baz']]}, 'blarg', 'wibble']
|
||||
parsed = join(raw)
|
||||
self.assertEqual('foo bar baz', parsed[0])
|
||||
for i in six.moves.xrange(1, len(raw)):
|
||||
self.assertEqual(raw[i], parsed[i])
|
||||
self.assertIsNot(raw, parsed)
|
||||
|
||||
def test_join_dict_val(self):
|
||||
raw = {'quux': {'Fn::Join': [' ', ['foo', 'bar', 'baz']]},
|
||||
'blarg': 'wibble'}
|
||||
parsed = join(raw)
|
||||
self.assertEqual('foo bar baz', parsed['quux'])
|
||||
self.assertEqual(raw['blarg'], parsed['blarg'])
|
||||
self.assertIsNot(raw, parsed)
|
||||
|
||||
|
||||
class TestTemplateValidate(common.HeatTestCase):
|
||||
|
||||
def test_template_validate_cfn_good(self):
|
||||
|
@ -243,3 +371,750 @@ Outputs:
|
|||
err = self.assertRaises(exception.InvalidTemplateSection,
|
||||
tmpl.validate)
|
||||
self.assertIn('parameteers', six.text_type(err))
|
||||
|
||||
|
||||
class TemplateTest(common.HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TemplateTest, self).setUp()
|
||||
self.ctx = utils.dummy_context()
|
||||
|
||||
resource._register_class('GenericResourceType',
|
||||
generic_rsrc.GenericResource)
|
||||
|
||||
@staticmethod
|
||||
def resolve(snippet, template, stack=None):
|
||||
return function.resolve(template.parse(stack, snippet))
|
||||
|
||||
def test_defaults(self):
|
||||
empty = template.Template(empty_template)
|
||||
self.assertNotIn('AWSTemplateFormatVersion', empty)
|
||||
self.assertEqual('No description', empty['Description'])
|
||||
self.assertEqual({}, empty['Mappings'])
|
||||
self.assertEqual({}, empty['Resources'])
|
||||
self.assertEqual({}, empty['Outputs'])
|
||||
|
||||
def test_aws_version(self):
|
||||
tmpl = template.Template(mapping_template)
|
||||
self.assertEqual(('AWSTemplateFormatVersion', '2010-09-09'),
|
||||
tmpl.version)
|
||||
|
||||
def test_heat_version(self):
|
||||
tmpl = template.Template(resource_template)
|
||||
self.assertEqual(('HeatTemplateFormatVersion', '2012-12-12'),
|
||||
tmpl.version)
|
||||
|
||||
def test_invalid_hot_version(self):
|
||||
invalid_hot_version_tmp = template_format.parse(
|
||||
'''{
|
||||
"heat_template_version" : "2012-12-12",
|
||||
}''')
|
||||
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
|
||||
template.Template, invalid_hot_version_tmp)
|
||||
valid_versions = ['2014-10-16', '2013-05-23', '2015-04-30']
|
||||
ex_error_msg = ('The template version is invalid: '
|
||||
'"heat_template_version: 2012-12-12". '
|
||||
'"heat_template_version" should be one of: %s'
|
||||
% ', '.join(valid_versions))
|
||||
self.assertEqual(ex_error_msg, six.text_type(init_ex))
|
||||
|
||||
def test_invalid_version_not_in_hot_versions(self):
|
||||
invalid_hot_version_tmp = template_format.parse(
|
||||
'''{
|
||||
"heat_template_version" : "2012-12-12",
|
||||
}''')
|
||||
versions = {
|
||||
('heat_template_version', '2013-05-23'): hot_t.HOTemplate20130523,
|
||||
('heat_template_version', '2013-06-23'): hot_t.HOTemplate20130523
|
||||
}
|
||||
|
||||
temp_copy = copy.deepcopy(template._template_classes)
|
||||
template._template_classes = versions
|
||||
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
|
||||
template.Template, invalid_hot_version_tmp)
|
||||
ex_error_msg = ('The template version is invalid: '
|
||||
'"heat_template_version: 2012-12-12". '
|
||||
'"heat_template_version" should be '
|
||||
'one of: 2013-05-23, 2013-06-23')
|
||||
self.assertEqual(ex_error_msg, six.text_type(init_ex))
|
||||
template._template_classes = temp_copy
|
||||
|
||||
def test_invalid_aws_version(self):
|
||||
invalid_aws_version_tmp = template_format.parse(
|
||||
'''{
|
||||
"AWSTemplateFormatVersion" : "2012-12-12",
|
||||
}''')
|
||||
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
|
||||
template.Template, invalid_aws_version_tmp)
|
||||
ex_error_msg = ('The template version is invalid: '
|
||||
'"AWSTemplateFormatVersion: 2012-12-12". '
|
||||
'"AWSTemplateFormatVersion" should be: 2010-09-09')
|
||||
self.assertEqual(ex_error_msg, six.text_type(init_ex))
|
||||
|
||||
def test_invalid_version_not_in_aws_versions(self):
|
||||
invalid_aws_version_tmp = template_format.parse(
|
||||
'''{
|
||||
"AWSTemplateFormatVersion" : "2012-12-12",
|
||||
}''')
|
||||
versions = {
|
||||
('AWSTemplateFormatVersion', '2010-09-09'): cfn_t.CfnTemplate,
|
||||
('AWSTemplateFormatVersion', '2011-06-23'): cfn_t.CfnTemplate
|
||||
}
|
||||
temp_copy = copy.deepcopy(template._template_classes)
|
||||
template._template_classes = versions
|
||||
|
||||
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
|
||||
template.Template, invalid_aws_version_tmp)
|
||||
ex_error_msg = ('The template version is invalid: '
|
||||
'"AWSTemplateFormatVersion: 2012-12-12". '
|
||||
'"AWSTemplateFormatVersion" should be '
|
||||
'one of: 2010-09-09, 2011-06-23')
|
||||
self.assertEqual(ex_error_msg, six.text_type(init_ex))
|
||||
template._template_classes = temp_copy
|
||||
|
||||
def test_invalid_heat_version(self):
|
||||
invalid_heat_version_tmp = template_format.parse(
|
||||
'''{
|
||||
"HeatTemplateFormatVersion" : "2010-09-09",
|
||||
}''')
|
||||
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
|
||||
template.Template,
|
||||
invalid_heat_version_tmp)
|
||||
ex_error_msg = ('The template version is invalid: '
|
||||
'"HeatTemplateFormatVersion: 2010-09-09". '
|
||||
'"HeatTemplateFormatVersion" should be: 2012-12-12')
|
||||
self.assertEqual(ex_error_msg, six.text_type(init_ex))
|
||||
|
||||
def test_invalid_version_not_in_heat_versions(self):
|
||||
invalid_heat_version_tmp = template_format.parse(
|
||||
'''{
|
||||
"HeatTemplateFormatVersion" : "2010-09-09",
|
||||
}''')
|
||||
versions = {
|
||||
('HeatTemplateFormatVersion', '2012-12-12'): cfn_t.CfnTemplate,
|
||||
('HeatTemplateFormatVersion', '2014-12-12'): cfn_t.CfnTemplate
|
||||
}
|
||||
temp_copy = copy.deepcopy(template._template_classes)
|
||||
template._template_classes = versions
|
||||
|
||||
init_ex = self.assertRaises(exception.InvalidTemplateVersion,
|
||||
template.Template,
|
||||
invalid_heat_version_tmp)
|
||||
ex_error_msg = ('The template version is invalid: '
|
||||
'"HeatTemplateFormatVersion: 2010-09-09". '
|
||||
'"HeatTemplateFormatVersion" should be '
|
||||
'one of: 2012-12-12, 2014-12-12')
|
||||
self.assertEqual(ex_error_msg, six.text_type(init_ex))
|
||||
|
||||
template._template_classes = temp_copy
|
||||
|
||||
def test_invalid_template(self):
|
||||
scanner_error = '''
|
||||
1
|
||||
Mappings:
|
||||
ValidMapping:
|
||||
TestKey: TestValue
|
||||
'''
|
||||
parser_error = '''
|
||||
Mappings:
|
||||
ValidMapping:
|
||||
TestKey: {TestKey1: "Value1" TestKey2: "Value2"}
|
||||
'''
|
||||
|
||||
self.assertRaises(ValueError, template_format.parse, scanner_error)
|
||||
self.assertRaises(ValueError, template_format.parse, parser_error)
|
||||
|
||||
def test_invalid_section(self):
|
||||
tmpl = template.Template({'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Foo': ['Bar']})
|
||||
self.assertNotIn('Foo', tmpl)
|
||||
|
||||
def test_find_in_map(self):
|
||||
tmpl = template.Template(mapping_template)
|
||||
stk = stack.Stack(self.ctx, 'test', tmpl)
|
||||
find = {'Fn::FindInMap': ["ValidMapping", "TestKey", "TestValue"]}
|
||||
self.assertEqual("wibble", self.resolve(find, tmpl, stk))
|
||||
|
||||
def test_find_in_invalid_map(self):
|
||||
tmpl = template.Template(mapping_template)
|
||||
stk = stack.Stack(self.ctx, 'test', tmpl)
|
||||
finds = ({'Fn::FindInMap': ["InvalidMapping", "ValueList", "foo"]},
|
||||
{'Fn::FindInMap': ["InvalidMapping", "ValueString", "baz"]},
|
||||
{'Fn::FindInMap': ["MapList", "foo", "bar"]},
|
||||
{'Fn::FindInMap': ["MapString", "foo", "bar"]})
|
||||
|
||||
for find in finds:
|
||||
self.assertRaises((KeyError, TypeError), self.resolve,
|
||||
find, tmpl, stk)
|
||||
|
||||
def test_bad_find_in_map(self):
|
||||
tmpl = template.Template(mapping_template)
|
||||
stk = stack.Stack(self.ctx, 'test', tmpl)
|
||||
finds = ({'Fn::FindInMap': "String"},
|
||||
{'Fn::FindInMap': {"Dict": "String"}},
|
||||
{'Fn::FindInMap': ["ShortList", "foo"]},
|
||||
{'Fn::FindInMap': ["ReallyShortList"]})
|
||||
|
||||
for find in finds:
|
||||
self.assertRaises(KeyError, self.resolve, find, tmpl, stk)
|
||||
|
||||
def test_param_refs(self):
|
||||
tmpl = template.Template(parameter_template)
|
||||
env = environment.Environment({'foo': 'bar', 'blarg': 'wibble'})
|
||||
stk = stack.Stack(self.ctx, 'test', tmpl, env)
|
||||
p_snippet = {"Ref": "foo"}
|
||||
self.assertEqual("bar", self.resolve(p_snippet, tmpl, stk))
|
||||
|
||||
def test_param_ref_missing(self):
|
||||
tmpl = template.Template(parameter_template)
|
||||
env = environment.Environment({'foo': 'bar'})
|
||||
stk = stack.Stack(self.ctx, 'test', tmpl, env)
|
||||
stk.env = environment.Environment({})
|
||||
stk.parameters = parameters.Parameters(stk.identifier(), tmpl)
|
||||
snippet = {"Ref": "foo"}
|
||||
self.assertRaises(exception.UserParameterMissing,
|
||||
self.resolve,
|
||||
snippet, tmpl, stk)
|
||||
|
||||
def test_resource_refs(self):
|
||||
tmpl = template.Template(resource_template)
|
||||
stk = stack.Stack(self.ctx, 'test', tmpl)
|
||||
|
||||
self.m.StubOutWithMock(stk['foo'], 'FnGetRefId')
|
||||
stk['foo'].FnGetRefId().MultipleTimes().AndReturn('bar')
|
||||
self.m.ReplayAll()
|
||||
|
||||
r_snippet = {"Ref": "foo"}
|
||||
self.assertEqual("bar", self.resolve(r_snippet, tmpl, stk))
|
||||
self.m.VerifyAll()
|
||||
|
||||
def test_resource_refs_param(self):
|
||||
tmpl = template.Template(resource_template)
|
||||
stk = stack.Stack(self.ctx, 'test', tmpl)
|
||||
|
||||
p_snippet = {"Ref": "baz"}
|
||||
parsed = tmpl.parse(stk, p_snippet)
|
||||
self.assertTrue(isinstance(parsed, cfn_funcs.ParamRef))
|
||||
|
||||
def test_select_from_list(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Select": ["1", ["foo", "bar"]]}
|
||||
self.assertEqual("bar", self.resolve(data, tmpl))
|
||||
|
||||
def test_select_from_list_integer_index(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Select": [1, ["foo", "bar"]]}
|
||||
self.assertEqual("bar", self.resolve(data, tmpl))
|
||||
|
||||
def test_select_from_list_out_of_bound(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Select": ["0", ["foo", "bar"]]}
|
||||
self.assertEqual("foo", self.resolve(data, tmpl))
|
||||
data = {"Fn::Select": ["1", ["foo", "bar"]]}
|
||||
self.assertEqual("bar", self.resolve(data, tmpl))
|
||||
data = {"Fn::Select": ["2", ["foo", "bar"]]}
|
||||
self.assertEqual("", self.resolve(data, tmpl))
|
||||
|
||||
def test_select_from_dict(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Select": ["red", {"red": "robin", "re": "foo"}]}
|
||||
self.assertEqual("robin", self.resolve(data, tmpl))
|
||||
|
||||
def test_select_from_none(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Select": ["red", None]}
|
||||
self.assertEqual("", self.resolve(data, tmpl))
|
||||
|
||||
def test_select_from_dict_not_existing(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Select": ["green", {"red": "robin", "re": "foo"}]}
|
||||
self.assertEqual("", self.resolve(data, tmpl))
|
||||
|
||||
def test_select_from_serialized_json_map(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
js = json.dumps({"red": "robin", "re": "foo"})
|
||||
data = {"Fn::Select": ["re", js]}
|
||||
self.assertEqual("foo", self.resolve(data, tmpl))
|
||||
|
||||
def test_select_from_serialized_json_list(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
js = json.dumps(["foo", "fee", "fum"])
|
||||
data = {"Fn::Select": ["0", js]}
|
||||
self.assertEqual("foo", self.resolve(data, tmpl))
|
||||
|
||||
def test_select_empty_string(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Select": ["0", '']}
|
||||
self.assertEqual("", self.resolve(data, tmpl))
|
||||
data = {"Fn::Select": ["1", '']}
|
||||
self.assertEqual("", self.resolve(data, tmpl))
|
||||
data = {"Fn::Select": ["one", '']}
|
||||
self.assertEqual("", self.resolve(data, tmpl))
|
||||
|
||||
def test_join(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
join = {"Fn::Join": [" ", ["foo", "bar"]]}
|
||||
self.assertEqual("foo bar", self.resolve(join, tmpl))
|
||||
|
||||
def test_split_ok(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Split": [";", "foo; bar; achoo"]}
|
||||
self.assertEqual(['foo', ' bar', ' achoo'], self.resolve(data, tmpl))
|
||||
|
||||
def test_split_no_delim_in_str(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
data = {"Fn::Split": [";", "foo, bar, achoo"]}
|
||||
self.assertEqual(['foo, bar, achoo'], self.resolve(data, tmpl))
|
||||
|
||||
def test_base64(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::Base64": "foobar"}
|
||||
# For now, the Base64 function just returns the original text, and
|
||||
# does not convert to base64 (see issue #133)
|
||||
self.assertEqual("foobar", self.resolve(snippet, tmpl))
|
||||
|
||||
def test_get_azs(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::GetAZs": ""}
|
||||
self.assertEqual(["nova"], self.resolve(snippet, tmpl))
|
||||
|
||||
def test_get_azs_with_stack(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::GetAZs": ""}
|
||||
stk = stack.Stack(self.ctx, 'test_stack',
|
||||
template.Template(empty_template))
|
||||
self.m.StubOutWithMock(nova.NovaClientPlugin, '_create')
|
||||
fc = fakes_v1_1.FakeClient()
|
||||
nova.NovaClientPlugin._create().AndReturn(fc)
|
||||
self.m.ReplayAll()
|
||||
self.assertEqual(["nova1"], self.resolve(snippet, tmpl, stk))
|
||||
|
||||
def test_replace_string_values(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::Replace": [
|
||||
{'$var1': 'foo', '%var2%': 'bar'},
|
||||
'$var1 is %var2%'
|
||||
]}
|
||||
self.assertEqual('foo is bar', self.resolve(snippet, tmpl))
|
||||
|
||||
def test_replace_number_values(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::Replace": [
|
||||
{'$var1': 1, '%var2%': 2},
|
||||
'$var1 is not %var2%'
|
||||
]}
|
||||
self.assertEqual('1 is not 2', self.resolve(snippet, tmpl))
|
||||
|
||||
snippet = {"Fn::Replace": [
|
||||
{'$var1': 1.3, '%var2%': 2.5},
|
||||
'$var1 is not %var2%'
|
||||
]}
|
||||
self.assertEqual('1.3 is not 2.5', self.resolve(snippet, tmpl))
|
||||
|
||||
def test_replace_none_values(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::Replace": [
|
||||
{'$var1': None, '${var2}': None},
|
||||
'"$var1" is "${var2}"'
|
||||
]}
|
||||
self.assertEqual('"" is ""', self.resolve(snippet, tmpl))
|
||||
|
||||
def test_replace_missing_key(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::Replace": [
|
||||
{'$var1': 'foo', 'var2': 'bar'},
|
||||
'"$var1" is "${var3}"'
|
||||
]}
|
||||
self.assertEqual('"foo" is "${var3}"', self.resolve(snippet, tmpl))
|
||||
|
||||
def test_replace_param_values(self):
|
||||
tmpl = template.Template(parameter_template)
|
||||
env = environment.Environment({'foo': 'wibble'})
|
||||
stk = stack.Stack(self.ctx, 'test_stack', tmpl, env)
|
||||
snippet = {"Fn::Replace": [
|
||||
{'$var1': {'Ref': 'foo'}, '%var2%': {'Ref': 'blarg'}},
|
||||
'$var1 is %var2%'
|
||||
]}
|
||||
self.assertEqual('wibble is quux', self.resolve(snippet, tmpl, stk))
|
||||
|
||||
def test_member_list2map_good(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::MemberListToMap": [
|
||||
'Name', 'Value', ['.member.0.Name=metric',
|
||||
'.member.0.Value=cpu',
|
||||
'.member.1.Name=size',
|
||||
'.member.1.Value=56']]}
|
||||
self.assertEqual({'metric': 'cpu', 'size': '56'},
|
||||
self.resolve(snippet, tmpl))
|
||||
|
||||
def test_member_list2map_good2(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
snippet = {"Fn::MemberListToMap": [
|
||||
'Key', 'Value', ['.member.2.Key=metric',
|
||||
'.member.2.Value=cpu',
|
||||
'.member.5.Key=size',
|
||||
'.member.5.Value=56']]}
|
||||
self.assertEqual({'metric': 'cpu', 'size': '56'},
|
||||
self.resolve(snippet, tmpl))
|
||||
|
||||
def test_resource_facade(self):
|
||||
metadata_snippet = {'Fn::ResourceFacade': 'Metadata'}
|
||||
deletion_policy_snippet = {'Fn::ResourceFacade': 'DeletionPolicy'}
|
||||
update_policy_snippet = {'Fn::ResourceFacade': 'UpdatePolicy'}
|
||||
|
||||
parent_resource = DummyClass()
|
||||
parent_resource.metadata_set({"foo": "bar"})
|
||||
|
||||
parent_resource.t = rsrc_defn.ResourceDefinition(
|
||||
'parent', 'SomeType',
|
||||
deletion_policy=rsrc_defn.ResourceDefinition.RETAIN,
|
||||
update_policy={"blarg": "wibble"})
|
||||
|
||||
parent_resource.stack = stack.Stack(self.ctx, 'toplevel_stack',
|
||||
template.Template(empty_template))
|
||||
stk = stack.Stack(self.ctx, 'test_stack',
|
||||
template.Template(empty_template),
|
||||
parent_resource='parent', owner_id=45)
|
||||
stk._parent_resource = parent_resource
|
||||
self.assertEqual({"foo": "bar"},
|
||||
self.resolve(metadata_snippet, stk.t, stk))
|
||||
self.assertEqual('Retain',
|
||||
self.resolve(deletion_policy_snippet, stk.t, stk))
|
||||
self.assertEqual({"blarg": "wibble"},
|
||||
self.resolve(update_policy_snippet, stk.t, stk))
|
||||
|
||||
def test_resource_facade_function(self):
|
||||
deletion_policy_snippet = {'Fn::ResourceFacade': 'DeletionPolicy'}
|
||||
|
||||
parent_resource = DummyClass()
|
||||
parent_resource.metadata_set({"foo": "bar"})
|
||||
parent_resource.stack = stack.Stack(self.ctx, 'toplevel_stack',
|
||||
template.Template(empty_template))
|
||||
del_policy = cfn_funcs.Join(parent_resource.stack,
|
||||
'Fn::Join', ['eta', ['R', 'in']])
|
||||
parent_resource.t = rsrc_defn.ResourceDefinition(
|
||||
'parent', 'SomeType',
|
||||
deletion_policy=del_policy)
|
||||
|
||||
stk = stack.Stack(self.ctx, 'test_stack',
|
||||
template.Template(empty_template),
|
||||
parent_resource='parent')
|
||||
stk._parent_resource = parent_resource
|
||||
self.assertEqual('Retain',
|
||||
self.resolve(deletion_policy_snippet, stk.t, stk))
|
||||
|
||||
def test_resource_facade_invalid_arg(self):
|
||||
snippet = {'Fn::ResourceFacade': 'wibble'}
|
||||
stk = stack.Stack(self.ctx, 'test_stack',
|
||||
template.Template(empty_template))
|
||||
error = self.assertRaises(ValueError,
|
||||
self.resolve, snippet, stk.t, stk)
|
||||
self.assertIn(snippet.keys()[0], six.text_type(error))
|
||||
|
||||
def test_resource_facade_missing_deletion_policy(self):
|
||||
snippet = {'Fn::ResourceFacade': 'DeletionPolicy'}
|
||||
|
||||
parent_resource = DummyClass()
|
||||
parent_resource.metadata_set({"foo": "bar"})
|
||||
parent_resource.t = rsrc_defn.ResourceDefinition('parent', 'SomeType')
|
||||
|
||||
parent_resource.stack = stack.Stack(self.ctx, 'toplevel_stack',
|
||||
template.Template(empty_template))
|
||||
stk = stack.Stack(self.ctx, 'test_stack',
|
||||
template.Template(empty_template),
|
||||
parent_resource='parent', owner_id=78)
|
||||
stk._parent_resource = parent_resource
|
||||
self.assertEqual('Delete', self.resolve(snippet, stk.t, stk))
|
||||
|
||||
def test_prevent_parameters_access(self):
|
||||
expected_description = "This can be accessed"
|
||||
tmpl = template.Template({
|
||||
'AWSTemplateFormatVersion': '2010-09-09',
|
||||
'Description': expected_description,
|
||||
'Parameters': {
|
||||
'foo': {'Type': 'String', 'Required': True}
|
||||
}
|
||||
})
|
||||
self.assertEqual(expected_description, tmpl['Description'])
|
||||
keyError = self.assertRaises(KeyError, tmpl.__getitem__, 'Parameters')
|
||||
self.assertIn("can not be accessed directly", six.text_type(keyError))
|
||||
|
||||
def test_parameters_section_not_iterable(self):
|
||||
expected_description = "This can be accessed"
|
||||
tmpl = template.Template({
|
||||
'AWSTemplateFormatVersion': '2010-09-09',
|
||||
'Description': expected_description,
|
||||
'Parameters': {
|
||||
'foo': {'Type': 'String', 'Required': True}
|
||||
}
|
||||
})
|
||||
self.assertEqual(expected_description, tmpl['Description'])
|
||||
self.assertNotIn('Parameters', tmpl.keys())
|
||||
|
||||
def test_add_resource(self):
|
||||
cfn_tpl = template_format.parse('''
|
||||
AWSTemplateFormatVersion: 2010-09-09
|
||||
Resources:
|
||||
resource1:
|
||||
Type: AWS::EC2::Instance
|
||||
Properties:
|
||||
property1: value1
|
||||
Metadata:
|
||||
foo: bar
|
||||
DependsOn: dummy
|
||||
DeletionPolicy: Retain
|
||||
UpdatePolicy:
|
||||
foo: bar
|
||||
''')
|
||||
source = template.Template(cfn_tpl)
|
||||
empty = template.Template(copy.deepcopy(empty_template))
|
||||
stk = stack.Stack(self.ctx, 'test_stack', source)
|
||||
|
||||
for defn in source.resource_definitions(stk).values():
|
||||
empty.add_resource(defn)
|
||||
|
||||
self.assertEqual(cfn_tpl['Resources'], empty.t['Resources'])
|
||||
|
||||
|
||||
class TemplateFnErrorTest(common.HeatTestCase):
|
||||
scenarios = [
|
||||
('select_from_list_not_int',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Select": ["one", ["foo", "bar"]]})),
|
||||
('select_from_dict_not_str',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Select": ["1", {"red": "robin", "re": "foo"}]})),
|
||||
('select_from_serialized_json_wrong',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Select": ["not", "no json"]})),
|
||||
('select_wrong_num_args_1',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Select": []})),
|
||||
('select_wrong_num_args_2',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Select": ["4"]})),
|
||||
('select_wrong_num_args_3',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Select": ["foo", {"foo": "bar"}, ""]})),
|
||||
('select_wrong_num_args_4',
|
||||
dict(expect=TypeError,
|
||||
snippet={'Fn::Select': [['f'], {'f': 'food'}]})),
|
||||
('split_no_delim',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Split": ["foo, bar, achoo"]})),
|
||||
('split_no_list',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Split": "foo, bar, achoo"})),
|
||||
('base64_list',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Base64": ["foobar"]})),
|
||||
('base64_dict',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Base64": {"foo": "bar"}})),
|
||||
('replace_list_value',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Replace": [
|
||||
{'$var1': 'foo', '%var2%': ['bar']},
|
||||
'$var1 is %var2%']})),
|
||||
('replace_list_mapping',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Replace": [
|
||||
['var1', 'foo', 'var2', 'bar'],
|
||||
'$var1 is ${var2}']})),
|
||||
('replace_dict',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Replace": {}})),
|
||||
('replace_missing_template',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Replace": [['var1', 'foo', 'var2', 'bar']]})),
|
||||
('replace_none_template',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Replace": [['var2', 'bar'], None]})),
|
||||
('replace_list_string',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Replace": [
|
||||
{'var1': 'foo', 'var2': 'bar'},
|
||||
['$var1 is ${var2}']]})),
|
||||
('join_string',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Join": [" ", "foo"]})),
|
||||
('join_dict',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Join": [" ", {"foo": "bar"}]})),
|
||||
('join_wrong_num_args_1',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Join": []})),
|
||||
('join_wrong_num_args_2',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Join": [" "]})),
|
||||
('join_wrong_num_args_3',
|
||||
dict(expect=ValueError,
|
||||
snippet={"Fn::Join": [" ", {"foo": "bar"}, ""]})),
|
||||
('join_string_nodelim',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Join": "o"})),
|
||||
('join_string_nodelim_1',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Join": "oh"})),
|
||||
('join_string_nodelim_2',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Join": "ohh"})),
|
||||
('join_dict_nodelim1',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Join": {"foo": "bar"}})),
|
||||
('join_dict_nodelim2',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Join": {"foo": "bar", "blarg": "wibble"}})),
|
||||
('join_dict_nodelim3',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::Join": {"foo": "bar", "blarg": "wibble",
|
||||
"baz": "quux"}})),
|
||||
('member_list2map_no_key_or_val',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::MemberListToMap": [
|
||||
'Key', ['.member.2.Key=metric',
|
||||
'.member.2.Value=cpu',
|
||||
'.member.5.Key=size',
|
||||
'.member.5.Value=56']]})),
|
||||
('member_list2map_no_list',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::MemberListToMap": [
|
||||
'Key', '.member.2.Key=metric']})),
|
||||
('member_list2map_not_string',
|
||||
dict(expect=TypeError,
|
||||
snippet={"Fn::MemberListToMap": [
|
||||
'Name', ['Value'], ['.member.0.Name=metric',
|
||||
'.member.0.Value=cpu',
|
||||
'.member.1.Name=size',
|
||||
'.member.1.Value=56']]})),
|
||||
]
|
||||
|
||||
def test_bad_input(self):
|
||||
tmpl = template.Template(empty_template)
|
||||
resolve = lambda s: TemplateTest.resolve(s, tmpl)
|
||||
error = self.assertRaises(self.expect,
|
||||
resolve,
|
||||
self.snippet)
|
||||
self.assertIn(self.snippet.keys()[0], six.text_type(error))
|
||||
|
||||
|
||||
class ResolveDataTest(common.HeatTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(ResolveDataTest, self).setUp()
|
||||
self.username = 'parser_stack_test_user'
|
||||
|
||||
self.ctx = utils.dummy_context()
|
||||
|
||||
self.stack = stack.Stack(self.ctx, 'resolve_test_stack',
|
||||
template.Template(empty_template),
|
||||
environment.Environment({}))
|
||||
|
||||
def resolve(self, snippet):
|
||||
return function.resolve(self.stack.t.parse(self.stack, snippet))
|
||||
|
||||
def test_stack_resolve_runtime_data_deprecated(self):
|
||||
stk = stack.Stack(self.ctx, 'test_stack',
|
||||
template.Template(empty_template),
|
||||
tenant_id='bar')
|
||||
|
||||
with warnings.catch_warnings(record=True) as ws:
|
||||
warnings.filterwarnings('always')
|
||||
|
||||
# Work around http://bugs.python.org/issue4180
|
||||
getattr(stack, '__warningregistry__', {}).clear()
|
||||
|
||||
test_data = {'foo': 'bar'}
|
||||
resolved = stk.resolve_runtime_data(test_data)
|
||||
|
||||
self.assertTrue(ws)
|
||||
self.assertTrue(issubclass(ws[0].category, DeprecationWarning))
|
||||
|
||||
self.assertEqual(test_data, resolved)
|
||||
|
||||
def test_join_split(self):
|
||||
# join
|
||||
snippet = {'Fn::Join': [';', ['one', 'two', 'three']]}
|
||||
self.assertEqual('one;two;three', self.resolve(snippet))
|
||||
|
||||
# join then split
|
||||
snippet = {'Fn::Split': [';', snippet]}
|
||||
self.assertEqual(['one', 'two', 'three'], self.resolve(snippet))
|
||||
|
||||
def test_split_join_split_join(self):
|
||||
# each snippet in this test encapsulates
|
||||
# the snippet from the previous step, leading
|
||||
# to increasingly nested function calls
|
||||
|
||||
# split
|
||||
snippet = {'Fn::Split': [',', 'one,two,three']}
|
||||
self.assertEqual(['one', 'two', 'three'], self.resolve(snippet))
|
||||
|
||||
# split then join
|
||||
snippet = {'Fn::Join': [';', snippet]}
|
||||
self.assertEqual('one;two;three', self.resolve(snippet))
|
||||
|
||||
# split then join then split
|
||||
snippet = {'Fn::Split': [';', snippet]}
|
||||
self.assertEqual(['one', 'two', 'three'], self.resolve(snippet))
|
||||
|
||||
# split then join then split then join
|
||||
snippet = {'Fn::Join': ['-', snippet]}
|
||||
self.assertEqual('one-two-three', self.resolve(snippet))
|
||||
|
||||
def test_join_recursive(self):
|
||||
raw = {'Fn::Join': ['\n', [{'Fn::Join':
|
||||
[' ', ['foo', 'bar']]}, 'baz']]}
|
||||
self.assertEqual('foo bar\nbaz', self.resolve(raw))
|
||||
|
||||
def test_join_not_string(self):
|
||||
snippet = {'Fn::Join': ['\n', [{'Fn::Join':
|
||||
[' ', ['foo', 45]]}, 'baz']]}
|
||||
error = self.assertRaises(TypeError,
|
||||
self.resolve, snippet)
|
||||
self.assertIn('45', six.text_type(error))
|
||||
|
||||
def test_base64_replace(self):
|
||||
raw = {'Fn::Base64': {'Fn::Replace': [
|
||||
{'foo': 'bar'}, 'Meet at the foo']}}
|
||||
self.assertEqual('Meet at the bar', self.resolve(raw))
|
||||
|
||||
def test_replace_base64(self):
|
||||
raw = {'Fn::Replace': [{'foo': 'bar'}, {
|
||||
'Fn::Base64': 'Meet at the foo'}]}
|
||||
self.assertEqual('Meet at the bar', self.resolve(raw))
|
||||
|
||||
def test_nested_selects(self):
|
||||
data = {
|
||||
'a': ['one', 'two', 'three'],
|
||||
'b': ['een', 'twee', {'d': 'D', 'e': 'E'}]
|
||||
}
|
||||
raw = {'Fn::Select': ['a', data]}
|
||||
self.assertEqual(data['a'], self.resolve(raw))
|
||||
|
||||
raw = {'Fn::Select': ['b', data]}
|
||||
self.assertEqual(data['b'], self.resolve(raw))
|
||||
|
||||
raw = {
|
||||
'Fn::Select': ['1', {
|
||||
'Fn::Select': ['b', data]
|
||||
}]
|
||||
}
|
||||
self.assertEqual('twee', self.resolve(raw))
|
||||
|
||||
raw = {
|
||||
'Fn::Select': ['e', {
|
||||
'Fn::Select': ['2', {
|
||||
'Fn::Select': ['b', data]
|
||||
}]
|
||||
}]
|
||||
}
|
||||
self.assertEqual('E', self.resolve(raw))
|
||||
|
||||
def test_member_list_select(self):
|
||||
snippet = {'Fn::Select': ['metric', {"Fn::MemberListToMap": [
|
||||
'Name', 'Value', ['.member.0.Name=metric',
|
||||
'.member.0.Value=cpu',
|
||||
'.member.1.Name=size',
|
||||
'.member.1.Value=56']]}]}
|
||||
self.assertEqual('cpu', self.resolve(snippet))
|
||||
|
|
Loading…
Reference in New Issue