Convergence prepare traversal

Generates the graph for traversal in convergence.

* Updates current traversal for the stack
* Deletes any sync_point entries of previous traversal
* Generates the graph for traversal based on
  - resources loaded from db for the stack
  - resources that exist in present template
* Stores resource.current_template_id and resource.requires
* Stores the edges of graph in stack.current_deps
* Creates sync_points for each node in graph
* Creates sync_point for stack.

blueprint convergence-prepare-traversal

Change-Id: I507e67b39c820ed46d3b269fc76d6cf18d0ef2d7
This commit is contained in:
Rakesh H S 2015-03-08 15:34:30 +05:30
parent 7fcc40b19a
commit 5189bbebab
9 changed files with 737 additions and 36 deletions

View File

@ -53,9 +53,14 @@ class Node(object):
self.satisfy.add(source)
return iter(self.satisfy)
def requires(self, target):
'''Add a key that this node requires.'''
self.require.add(target)
def requires(self, target=None):
'''
Add a key that this node requires, and optionally add a
new one.
'''
if target is not None:
self.require.add(target)
return iter(self.require)
def __isub__(self, target):
'''Remove a key that this node requires.'''
@ -153,6 +158,13 @@ class Graph(collections.defaultdict):
text = '{%s}' % ', '.join(pairs)
return encodeutils.safe_decode(text)
def leaves(self):
'''
Return an iterator over all of the leaf nodes in the graph.
'''
return (requirer for requirer, required in self.items()
if not required)
@staticmethod
def toposort(graph):
'''
@ -207,6 +219,15 @@ class Dependencies(object):
return self._graph[last].required_by()
def requires(self, target):
'''
List the keys that require the specified node.
'''
if target not in self._graph:
raise KeyError
return self._graph[target].requires()
def __getitem__(self, last):
'''
Return a partial dependency graph consisting of the specified node and
@ -235,6 +256,18 @@ class Dependencies(object):
return Dependencies(edges)
def translate(self, transform):
'''
Translate all of the nodes using a transform function.
Returns a new Dependencies object.
'''
def transform_key(key):
return transform(key) if key is not None else None
edges = self._graph.edges()
return type(self)(tuple(map(transform_key, e)) for e in edges)
def __str__(self):
'''
Return a human-readable string representation of the dependency graph

View File

@ -174,11 +174,11 @@ class Resource(object):
self.created_time = None
self.updated_time = None
self._rpc_client = None
self.needed_by = None
self.requires = None
self.needed_by = []
self.requires = []
self.replaces = None
self.replaced_by = None
self.current_template_id = stack.t.id
self.current_template_id = None
resource = stack.db_resource_get(name)
if resource:
@ -268,6 +268,20 @@ class Resource(object):
rs.update_and_save({'rsrc_metadata': metadata})
self._rsrc_metadata = metadata
@classmethod
def set_needed_by(cls, db_rsrc, needed_by):
if db_rsrc:
db_rsrc.update_and_save(
{'needed_by': needed_by}
)
@classmethod
def set_requires(cls, db_rsrc, requires):
if db_rsrc:
db_rsrc.update_and_save(
{'requires': requires}
)
def _break_if_required(self, action, hook):
'''Block the resource until the hook is cleared if there is one.'''
if self.stack.env.registry.matches_hook(self.name, hook):

View File

@ -36,6 +36,7 @@ from heat.common.i18n import _LW
from heat.common import identifier
from heat.common import messaging as rpc_messaging
from heat.common import service_utils
from heat.common import template_format
from heat.engine import api
from heat.engine import attributes
from heat.engine import clients
@ -668,8 +669,7 @@ class EngineService(service.Service):
"""
LOG.info(_LI('Creating stack %s'), stack_name)
def _stack_create(stack):
def _create_stack_user(stack):
if not stack.stack_user_project_id:
try:
stack.create_stack_user_project_id()
@ -677,6 +677,8 @@ class EngineService(service.Service):
stack.state_set(stack.action, stack.FAILED,
six.text_type(ex))
def _stack_create(stack):
_create_stack_user(stack)
# Create/Adopt a stack, and create the periodic task if successful
if stack.adopt_stack_data:
stack.adopt()
@ -692,18 +694,22 @@ class EngineService(service.Service):
LOG.info(_LI("Stack create failed, status %s"), stack.status)
convergence = cfg.CONF.convergence_engine
if convergence:
raise exception.NotSupported(feature=_('Convergence engine'))
stack = self._parse_template_and_validate_stack(
cnxt, stack_name, template, params, files, args, owner_id,
nested_depth, user_creds_id, stack_user_project_id, convergence,
parent_resource_name)
stack.store()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
_stack_create, stack)
# once validations are done
# if convergence is enabled, take convergence path
if convergence:
# TODO(later): call _create_stack_user(stack)
# call stack.converge_stack(template=stack.t, action=stack.CREATE)
raise exception.NotSupported(feature=_('Convergence engine'))
else:
stack.store()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
_stack_create, stack)
return dict(stack.identifier())
@ -765,14 +771,20 @@ class EngineService(service.Service):
self._validate_deferred_auth_context(cnxt, updated_stack)
updated_stack.validate()
event = eventlet.event.Event()
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
self.engine_id,
current_stack.update,
updated_stack,
event=event)
th.link(self.thread_group_mgr.remove_event, current_stack.id, event)
self.thread_group_mgr.add_event(current_stack.id, event)
# Once all the validations are done
# if convergence is enabled, take the convergence path
if current_kwargs['convergence']:
current_stack.converge_stack(template=tmpl)
else:
event = eventlet.event.Event()
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
self.engine_id,
current_stack.update,
updated_stack,
event=event)
th.link(self.thread_group_mgr.remove_event,
current_stack.id, event)
self.thread_group_mgr.add_event(current_stack.id, event)
return dict(current_stack.identifier())
@context.request_context
@ -927,6 +939,16 @@ class EngineService(service.Service):
LOG.info(_LI('Deleting stack %s'), st.name)
stack = parser.Stack.load(cnxt, stack=st)
if stack.convergence:
empty_template = '''
heat_template_version: 2013-05-23
description: Empty Template
'''
tmpl = template_format.parse(empty_template)
template = templatem.Template(tmpl)
stack.converge_stack(template=template, action=stack.DELETE)
return
lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id)
with lock.try_thread_lock() as acquire_result:

View File

@ -21,6 +21,7 @@ import warnings
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import encodeutils
from oslo_utils import uuidutils
from osprofiler import profiler
import six
@ -40,6 +41,7 @@ from heat.engine import parameter_groups as param_groups
from heat.engine import resource
from heat.engine import resources
from heat.engine import scheduler
from heat.engine import sync_point
from heat.engine import template as tmpl
from heat.engine import update
from heat.objects import resource as resource_objects
@ -85,7 +87,8 @@ class Stack(collections.Mapping):
user_creds_id=None, tenant_id=None,
use_stored_context=False, username=None,
nested_depth=0, strict_validate=True, convergence=False,
current_traversal=None, tags=None):
current_traversal=None, tags=None, prev_raw_template_id=None,
current_deps=None):
'''
Initialise from a context, name, Template object and (optionally)
Environment object. The database ID may also be initialised, if the
@ -129,6 +132,8 @@ class Stack(collections.Mapping):
self.convergence = convergence
self.current_traversal = current_traversal
self.tags = tags
self.prev_raw_template_id = prev_raw_template_id
self.current_deps = current_deps
if use_stored_context:
self.context = self.stored_context()
@ -399,7 +404,9 @@ class Stack(collections.Mapping):
user_creds_id=stack.user_creds_id, tenant_id=stack.tenant,
use_stored_context=use_stored_context,
username=stack.username, convergence=stack.convergence,
current_traversal=stack.current_traversal, tags=tags)
current_traversal=stack.current_traversal, tags=tags,
prev_raw_template_id=stack.prev_raw_template_id,
current_deps=stack.current_deps)
def get_kwargs_for_cloning(self, keep_status=False, only_db=False):
"""Get common kwargs for calling Stack() for cloning.
@ -425,6 +432,8 @@ class Stack(collections.Mapping):
'nested_depth': self.nested_depth,
'convergence': self.convergence,
'current_traversal': self.current_traversal,
'prev_raw_template_id': self.prev_raw_template_id,
'current_deps': self.current_deps
}
if keep_status:
stack.update({
@ -898,6 +907,155 @@ class Stack(collections.Mapping):
event=event)
updater()
@profiler.trace('Stack.converge_stack', hide_args=False)
def converge_stack(self, template, action=UPDATE):
"""
Updates the stack and triggers convergence for resources
"""
self.prev_raw_template_id = getattr(self.t, 'id', None)
self.t = template
previous_traversal = self.current_traversal
self.current_traversal = uuidutils.generate_uuid()
self.store()
# TODO(later): lifecycle_plugin_utils.do_pre_ops
self.state_set(action, self.IN_PROGRESS,
'Stack %s started' % action)
# delete the prev traversal sync_points
sync_point.delete_all(self.context, self.id, previous_traversal)
self._converge_create_or_update()
def _converge_create_or_update(self):
self._update_or_store_resources()
self.convergence_dependencies = self._convergence_dependencies(
self.ext_rsrcs_db, self.dependencies)
LOG.info(_LI('convergence_dependencies: %s'),
self.convergence_dependencies)
# create sync_points for resources in DB
for rsrc_id, is_update in self.convergence_dependencies:
sync_point.create(self.context, rsrc_id,
self.current_traversal, is_update,
self.id)
# create sync_point entry for stack
sync_point.create(
self.context, self.id, self.current_traversal,
False if self.action in (self.DELETE, self.SUSPEND) else True,
self.id)
# Store list of edges
self.current_deps = {
'edges': [[rqr, rqd] for rqr, rqd in
self.convergence_dependencies.graph().edges()]}
self.store()
leaves = (self.convergence_dependencies.graph(reverse=True).leaves()
if self.action in (self.DELETE, self.SUSPEND)
else self.convergence_dependencies.graph().leaves())
for rsrc_id, is_update in leaves:
LOG.info(_LI("Triggering resource %(rsrc_id)s "
"for update=%(is_update)s"),
{'rsrc_id': rsrc_id, 'is_update': is_update})
self.temp_update_requires(self.convergence_dependencies)
def _update_or_store_resources(self):
try:
ext_rsrcs_db = resource_objects.Resource.get_all_by_stack(
self.context, self.id)
except exception.NotFound:
self.ext_rsrcs_db = None
else:
self.ext_rsrcs_db = {res.id: res
for res_name, res in ext_rsrcs_db.items()}
def get_existing_rsrc_db(rsrc_name):
candidate = None
if self.ext_rsrcs_db:
for id, ext_rsrc in self.ext_rsrcs_db.items():
if ext_rsrc.name != rsrc_name:
continue
if ext_rsrc.current_template_id == self.t.id:
# Rollback where the previous resource still exists
candidate = ext_rsrc
break
elif (ext_rsrc.current_template_id ==
self.prev_raw_template_id):
# Current resource is otherwise a good candidate
candidate = ext_rsrc
break
return candidate
curr_name_translated_dep = self.dependencies.translate(lambda res:
res.name)
rsrcs = {}
def update_needed_by(res):
new_requirers = set(
rsrcs[rsrc_name].id for rsrc_name in
curr_name_translated_dep.required_by(res.name)
)
old_requirers = set(res.needed_by) if res.needed_by else set()
needed_by = old_requirers | new_requirers
res.needed_by = list(needed_by)
for rsrc in reversed(self.dependencies):
existing_rsrc_db = get_existing_rsrc_db(rsrc.name)
if existing_rsrc_db is None:
update_needed_by(rsrc)
rsrc.current_template_id = self.t.id
rsrc._store()
rsrcs[rsrc.name] = rsrc
else:
update_needed_by(existing_rsrc_db)
resource.Resource.set_needed_by(
existing_rsrc_db, existing_rsrc_db.needed_by
)
rsrcs[existing_rsrc_db.name] = existing_rsrc_db
def _convergence_dependencies(self, existing_resources,
curr_template_dep):
dep = curr_template_dep.translate(lambda res: (res.id, True))
if existing_resources:
for rsrc_id, rsrc in existing_resources.items():
dep += (rsrc_id, False), None
for requirement in rsrc.requires:
if requirement in existing_resources:
dep += (requirement, False), (rsrc_id, False)
if rsrc.replaces in existing_resources:
dep += (rsrc.replaces, False), (rsrc_id, False)
if (rsrc.id, True) in dep:
dep += (rsrc_id, False), (rsrc_id, True)
return dep
def temp_update_requires(self, conv_deps):
'''updates requires column of resources'''
# This functions should be removed once the dependent patches
# are implemented.
if self.action in (self.CREATE, self.UPDATE):
requires = dict()
for rsrc_id, is_update in conv_deps:
reqs = conv_deps.requires((rsrc_id, is_update))
requires[rsrc_id] = list({id for id, is_update in reqs})
try:
rsrcs_db = resource_objects.Resource.get_all_by_stack(
self.context, self.id)
except exception.NotFound:
rsrcs_db = None
else:
rsrcs_db = {res.id: res for res_name, res in rsrcs_db.items()}
if rsrcs_db:
for id, db_rsrc in rsrcs_db.items():
if id in requires:
resource.Resource.set_requires(
db_rsrc, requires[id]
)
@scheduler.wrappertask
def update_task(self, newstack, action=UPDATE, event=None):
if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):

42
heat/engine/sync_point.py Normal file
View File

@ -0,0 +1,42 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from heat.objects import sync_point as sync_point_object
def create(context, entity_id, traversal_id, is_update, stack_id):
"""
Creates an sync point entry in DB.
"""
values = {'entity_id': entity_id, 'traversal_id': traversal_id,
'is_update': is_update, 'atomic_key': 0,
'stack_id': stack_id, 'input_data': {}}
return sync_point_object.SyncPoint.create(context, values)
def get(context, entity_id, traversal_id, is_update):
"""
Retrieves a sync point entry from DB.
"""
return sync_point_object.SyncPoint.get_by_key(context, entity_id,
traversal_id, is_update)
def delete_all(context, stack_id, traversal_id):
"""
Deletes all sync points of a stack associated with a particular traversal.
"""
return sync_point_object.SyncPoint.delete_all_by_stack_and_traversal(
context, stack_id, traversal_id
)

View File

@ -46,7 +46,8 @@ resources:
'''
def get_stack(stack_name, ctx, template=None, with_params=True):
def get_stack(stack_name, ctx, template=None, with_params=True,
convergence=False):
if template is None:
t = template_format.parse(wp_template)
if with_params:
@ -57,7 +58,7 @@ def get_stack(stack_name, ctx, template=None, with_params=True):
else:
t = template_format.parse(template)
tmpl = templatem.Template(t)
stack = parser.Stack(ctx, stack_name, tmpl)
stack = parser.Stack(ctx, stack_name, tmpl, convergence=convergence)
return stack

View File

@ -226,3 +226,11 @@ class dependenciesTest(common.HeatTestCase):
"'%s' not found in required_by" % n)
self.assertRaises(KeyError, d.required_by, 'foo')
def test_graph_leaves(self):
d = dependencies.Dependencies([('last1', 'mid'), ('last2', 'mid'),
('mid', 'first1'), ('mid', 'first2')])
leaves = sorted(list(d._graph.leaves()))
self.assertEqual(['first1', 'first2'], leaves)

View File

@ -50,6 +50,7 @@ from heat.objects import service as service_objects
from heat.objects import software_deployment as software_deployment_object
from heat.objects import stack as stack_object
from heat.objects import stack_lock as stack_lock_object
from heat.objects import sync_point as sync_point_object
from heat.objects import watch_data as watch_data_object
from heat.objects import watch_rule as watch_rule_object
from heat.openstack.common import threadgroup
@ -65,6 +66,89 @@ cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config')
cfg.CONF.import_opt('enable_stack_abandon', 'heat.common.config')
string_template_five = '''
heat_template_version: 2013-05-23
description: Random String templates
parameters:
salt:
type: string
default: "quickbrownfox"
resources:
A:
type: OS::Heat::RandomString
properties:
salt: {get_param: salt}
B:
type: OS::Heat::RandomString
properties:
salt: {get_param: salt}
C:
type: OS::Heat::RandomString
depends_on: [A, B]
properties:
salt: {get_param: salt}
D:
type: OS::Heat::RandomString
depends_on: C
properties:
salt: {get_param: salt}
E:
type: OS::Heat::RandomString
depends_on: C
properties:
salt: {get_param: salt}
'''
string_template_five_update = '''
heat_template_version: 2013-05-23
description: Random String templates
parameters:
salt:
type: string
default: "quickbrownfox123"
resources:
A:
type: OS::Heat::RandomString
properties:
salt: {get_param: salt}
B:
type: OS::Heat::RandomString
properties:
salt: {get_param: salt}
F:
type: OS::Heat::RandomString
depends_on: [A, B]
properties:
salt: {get_param: salt}
G:
type: OS::Heat::RandomString
depends_on: F
properties:
salt: {get_param: salt}
H:
type: OS::Heat::RandomString
depends_on: F
properties:
salt: {get_param: salt}
'''
empty_template = '''
heat_template_version: 2013-05-23
description: Empty Template
'''
wp_template_no_default = '''
{
"AWSTemplateFormatVersion" : "2010-09-09",
@ -152,6 +236,234 @@ resources:
'''
class StackConvergenceCreateUpdateTest(common.HeatTestCase):
def setUp(self):
super(StackConvergenceCreateUpdateTest, self).setUp()
cfg.CONF.set_override('convergence_engine', True)
def test_conv_wordpress_single_instance_stack_create(self):
stack = tools.get_stack('test_stack', utils.dummy_context(),
convergence=True)
stack.converge_stack(template=stack.t, action=stack.CREATE)
self.assertIsNone(stack.ext_rsrcs_db)
self.assertEqual('Dependencies([((1, True), None)])',
repr(stack.convergence_dependencies))
stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
self.assertIsNotNone(stack_db.current_traversal)
self.assertIsNotNone(stack_db.raw_template_id)
self.assertIsNone(stack_db.prev_raw_template_id)
self.assertEqual(stack_db.convergence, True)
self.assertEqual({'edges': [[[1, True], None]]}, stack_db.current_deps)
def test_conv_string_five_instance_stack_create(self):
stack = tools.get_stack('test_stack', utils.dummy_context(),
template=string_template_five,
convergence=True)
stack.converge_stack(template=stack.t, action=stack.CREATE)
self.assertIsNone(stack.ext_rsrcs_db)
self.assertEqual('Dependencies(['
'((3, True), (5, True)), '
'((3, True), (4, True)), '
'((1, True), (3, True)), '
'((2, True), (3, True))])',
repr(stack.convergence_dependencies))
stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
self.assertIsNotNone(stack_db.current_traversal)
self.assertIsNotNone(stack_db.raw_template_id)
self.assertIsNone(stack_db.prev_raw_template_id)
self.assertEqual(stack_db.convergence, True)
self.assertEqual(sorted([[[3, True], [5, True]], # C, A
[[3, True], [4, True]], # C, B
[[1, True], [3, True]], # E, C
[[2, True], [3, True]]]), # D, C
sorted(stack_db.current_deps['edges']))
# check if needed_by is stored properly
expected_needed_by = {'A': [3], 'B': [3],
'C': [1, 2],
'D': [], 'E': []}
rsrcs_db = resource_objects.Resource.get_all_by_stack(
stack_db._context, stack_db.id
)
self.assertEqual(5, len(rsrcs_db))
for rsrc_name, rsrc_obj in rsrcs_db.items():
self.assertEqual(sorted(expected_needed_by[rsrc_name]),
sorted(rsrc_obj.needed_by))
self.assertEqual(stack_db.raw_template_id,
rsrc_obj.current_template_id)
# check if sync_points were stored
for entity_id in [5, 4, 3, 2, 1, stack_db.id]:
sync_point = sync_point_object.SyncPoint.get_by_key(
stack_db._context, entity_id, stack_db.current_traversal, True
)
self.assertIsNotNone(sync_point)
self.assertEqual(stack_db.id, sync_point.stack_id)
def test_conv_string_five_instance_stack_update(self):
stack = tools.get_stack('test_stack', utils.dummy_context(),
template=string_template_five,
convergence=True)
# create stack
stack.converge_stack(template=stack.t, action=stack.CREATE)
curr_stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
curr_stack = parser.Stack.load(curr_stack_db._context,
stack=curr_stack_db)
# update stack with new template
t2 = template_format.parse(string_template_five_update)
template2 = templatem.Template(
t2, env=environment.Environment({'KeyName2': 'test2'}))
curr_stack.converge_stack(template=template2, action=stack.UPDATE)
self.assertIsNotNone(curr_stack.ext_rsrcs_db)
self.assertEqual('Dependencies(['
'((7, True), (8, True)), '
'((8, True), (5, True)), '
'((8, True), (4, True)), '
'((6, True), (8, True)), '
'((3, False), (2, False)), '
'((3, False), (1, False)), '
'((5, False), (3, False)), '
'((5, False), (5, True)), '
'((4, False), (3, False)), '
'((4, False), (4, True))])',
repr(curr_stack.convergence_dependencies))
stack_db = stack_object.Stack.get_by_id(curr_stack.context,
curr_stack.id)
self.assertIsNotNone(stack_db.raw_template_id)
self.assertIsNotNone(stack_db.current_traversal)
self.assertIsNotNone(stack_db.prev_raw_template_id)
self.assertEqual(True, stack_db.convergence)
self.assertEqual(sorted([[[7, True], [8, True]],
[[8, True], [5, True]],
[[8, True], [4, True]],
[[6, True], [8, True]],
[[3, False], [2, False]],
[[3, False], [1, False]],
[[5, False], [3, False]],
[[5, False], [5, True]],
[[4, False], [3, False]],
[[4, False], [4, True]]]),
sorted(stack_db.current_deps['edges']))
'''
To visualize:
G(7, True) H(6, True)
\ /
\ / B(4, False) A(5, False)
\ / / \ / /
\ / / /
F(8, True) / / \ /
/ \ / / C(3, False)
/ \ / / \
/ / \ /
/ / \ / / \
B(4, True) A(5, True) D(2, False) E(1, False)
Leaves are at the bottom
'''
# check if needed_by are stored properly
# For A & B:
# needed_by=C, F
# TODO(later): when worker is implemented test for current_template_id
# Also test for requires
expected_needed_by = {'A': [3, 8], 'B': [3, 8],
'C': [1, 2],
'D': [], 'E': [],
'F': [6, 7],
'G': [], 'H': []}
rsrcs_db = resource_objects.Resource.get_all_by_stack(
stack_db._context, stack_db.id
)
self.assertEqual(8, len(rsrcs_db))
for rsrc_name, rsrc_obj in rsrcs_db.items():
self.assertEqual(sorted(expected_needed_by[rsrc_name]),
sorted(rsrc_obj.needed_by))
# check if sync_points are created for forward traversal
# [F, H, G, A, B, Stack]
for entity_id in [8, 7, 6, 5, 4, stack_db.id]:
sync_point = sync_point_object.SyncPoint.get_by_key(
stack_db._context, entity_id, stack_db.current_traversal, True
)
self.assertIsNotNone(sync_point)
self.assertEqual(stack_db.id, sync_point.stack_id)
# check if sync_points are created for cleanup traversal
# [A, B, C, D, E]
for entity_id in [5, 4, 3, 2, 1]:
sync_point = sync_point_object.SyncPoint.get_by_key(
stack_db._context, entity_id, stack_db.current_traversal, False
)
self.assertIsNotNone(sync_point)
self.assertEqual(stack_db.id, sync_point.stack_id)
def test_conv_empty_template_stack_update_delete(self):
stack = tools.get_stack('test_stack', utils.dummy_context(),
template=string_template_five,
convergence=True)
# create stack
stack.converge_stack(template=stack.t, action=stack.CREATE)
# update stack with new template
t2 = template_format.parse(empty_template)
template2 = templatem.Template(
t2, env=environment.Environment({'KeyName2': 'test2'}))
curr_stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
curr_stack = parser.Stack.load(curr_stack_db._context,
stack=curr_stack_db)
curr_stack.converge_stack(template=template2, action=stack.DELETE)
self.assertIsNotNone(curr_stack.ext_rsrcs_db)
self.assertEqual('Dependencies(['
'((3, False), (2, False)), '
'((3, False), (1, False)), '
'((5, False), (3, False)), '
'((4, False), (3, False))])',
repr(curr_stack.convergence_dependencies))
stack_db = stack_object.Stack.get_by_id(curr_stack.context,
curr_stack.id)
self.assertIsNotNone(stack_db.current_traversal)
self.assertIsNotNone(stack_db.prev_raw_template_id)
self.assertEqual(sorted([[[3, False], [2, False]],
[[3, False], [1, False]],
[[5, False], [3, False]],
[[4, False], [3, False]]]),
sorted(stack_db.current_deps['edges']))
# TODO(later): when worker is implemented test for current_template_id
# Also test for requires
expected_needed_by = {'A': [3], 'B': [3],
'C': [1, 2],
'D': [], 'E': []}
rsrcs_db = resource_objects.Resource.get_all_by_stack(
stack_db._context, stack_db.id
)
self.assertEqual(5, len(rsrcs_db))
for rsrc_name, rsrc_obj in rsrcs_db.items():
self.assertEqual(sorted(expected_needed_by[rsrc_name]),
sorted(rsrc_obj.needed_by))
# check if sync_points are created for cleanup traversal
# [A, B, C, D, E, Stack]
for entity_id in [5, 4, 3, 2, 1, stack_db.id]:
sync_point = sync_point_object.SyncPoint.get_by_key(
stack_db._context, entity_id, stack_db.current_traversal, False
)
self.assertIsNotNone(sync_point)
self.assertEqual(stack_db.id, sync_point.stack_id)
class StackCreateTest(common.HeatTestCase):
def setUp(self):
super(StackCreateTest, self).setUp()
@ -444,15 +756,6 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
self.man.create_stack,
self.ctx, stack_name, stack.t.t, {}, None, {})
def test_stack_create_enabled_convergence_engine(self):
cfg.CONF.set_override('convergence_engine', True)
ex = self.assertRaises(dispatcher.ExpectedException,
self.man.create_stack, self.ctx, 'test',
tools.wp_template, {}, None, {})
self.assertEqual(exception.NotSupported, ex.exc_info[0])
self.assertEqual('Convergence engine is not supported.',
six.text_type(ex.exc_info[1]))
def test_stack_create_invalid_resource_name(self):
stack_name = 'service_create_test_stack_invalid_res'
stack = tools.get_stack(stack_name, self.ctx)
@ -836,6 +1139,8 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
stack.t,
convergence=False,
current_traversal=None,
prev_raw_template_id=None,
current_deps=None,
disable_rollback=True,
nested_depth=0,
owner_id=None,
@ -895,6 +1200,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
parser.Stack(self.ctx, stack.name,
stack.t,
convergence=False, current_traversal=None,
prev_raw_template_id=None, current_deps=None,
disable_rollback=True, nested_depth=0,
owner_id=None, parent_resource=None,
stack_user_project_id='1234',
@ -947,6 +1253,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
parser.Stack(self.ctx, stack.name,
stack.t,
convergence=False, current_traversal=None,
prev_raw_template_id=None, current_deps=None,
disable_rollback=False, nested_depth=0,
owner_id=None, parent_resource=None,
stack_user_project_id='1234',
@ -1056,6 +1363,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
parser.Stack(self.ctx, stack.name,
stack.t,
convergence=False, current_traversal=None,
prev_raw_template_id=None, current_deps=None,
disable_rollback=True, nested_depth=0,
owner_id=None, parent_resource=None,
stack_user_project_id='1234', strict_validate=True,
@ -1185,6 +1493,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
parser.Stack(self.ctx, stack.name,
stack.t,
convergence=False, current_traversal=None,
prev_raw_template_id=None, current_deps=None,
disable_rollback=True, nested_depth=0,
owner_id=None, parent_resource=None,
stack_user_project_id='1234', strict_validate=True,
@ -1251,6 +1560,8 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
old_stack.t,
convergence=False,
current_traversal=None,
prev_raw_template_id=None,
current_deps=None,
disable_rollback=True,
nested_depth=0,
owner_id=None,
@ -1317,6 +1628,116 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase):
six.text_type(ex))
class StackConvergenceServiceCreateUpdateTest(common.HeatTestCase):
def setUp(self):
super(StackConvergenceServiceCreateUpdateTest, self).setUp()
cfg.CONF.set_override('convergence_engine', True)
self.ctx = utils.dummy_context()
self.patch('heat.engine.service.warnings')
self.man = service.EngineService('a-host', 'a-topic')
self.man.create_periodic_tasks()
def _stub_update_mocks(self, stack_to_load, stack_to_return):
self.m.StubOutWithMock(parser, 'Stack')
self.m.StubOutWithMock(parser.Stack, 'load')
parser.Stack.load(self.ctx, stack=stack_to_load
).AndReturn(stack_to_return)
self.m.StubOutWithMock(templatem, 'Template')
self.m.StubOutWithMock(environment, 'Environment')
def _test_stack_create_convergence(self, stack_name):
params = {'foo': 'bar'}
template = '{ "Template": "data" }'
stack = tools.get_stack(stack_name, self.ctx,
template=string_template_five,
convergence=True)
self.m.StubOutWithMock(templatem, 'Template')
self.m.StubOutWithMock(environment, 'Environment')
self.m.StubOutWithMock(parser, 'Stack')
templatem.Template(template, files=None,
env=stack.env).AndReturn(stack.t)
environment.Environment(params).AndReturn(stack.env)
parser.Stack(self.ctx, stack.name,
stack.t, owner_id=None,
parent_resource=None,
nested_depth=0, user_creds_id=None,
stack_user_project_id=None,
convergence=True).AndReturn(stack)
self.m.StubOutWithMock(stack, 'validate')
stack.validate().AndReturn(None)
self.m.ReplayAll()
# TODO(later): Remove exception once convergence is supported.
ex = self.assertRaises(dispatcher.ExpectedException,
self.man.create_stack, self.ctx, stack_name,
template, params, None, {})
self.assertEqual(exception.NotSupported, ex.exc_info[0])
self.assertEqual('Convergence engine is not supported.',
six.text_type(ex.exc_info[1]))
self.m.VerifyAll()
def test_stack_create_enabled_convergence_engine(self):
stack_name = 'service_create_test_stack'
self._test_stack_create_convergence(stack_name)
def test_stack_update_enabled_convergence_engine(self):
stack_name = 'service_update_test_stack'
params = {'foo': 'bar'}
template = '{ "Template": "data" }'
old_stack = tools.get_stack(stack_name, self.ctx,
template=string_template_five,
convergence=True)
sid = old_stack.store()
s = stack_object.Stack.get_by_id(self.ctx, sid)
stack = tools.get_stack(stack_name, self.ctx,
template=string_template_five_update,
convergence=True)
self._stub_update_mocks(s, old_stack)
templatem.Template(template, files=None,
env=stack.env).AndReturn(stack.t)
environment.Environment(params).AndReturn(stack.env)
parser.Stack(self.ctx, stack.name,
stack.t,
owner_id=old_stack.owner_id,
nested_depth=old_stack.nested_depth,
user_creds_id=old_stack.user_creds_id,
stack_user_project_id=old_stack.stack_user_project_id,
timeout_mins=60,
disable_rollback=True,
parent_resource=None,
strict_validate=True,
tenant_id=old_stack.tenant_id,
username=old_stack.username,
convergence=old_stack.convergence,
current_traversal=old_stack.current_traversal,
prev_raw_template_id=old_stack.prev_raw_template_id,
current_deps=old_stack.current_deps).AndReturn(stack)
self.m.StubOutWithMock(stack, 'validate')
stack.validate().AndReturn(None)
self.m.ReplayAll()
api_args = {'timeout_mins': 60}
result = self.man.update_stack(self.ctx, old_stack.identifier(),
template, params, None, api_args)
self.assertEqual(old_stack.convergence, True)
self.assertEqual(old_stack.identifier(), result)
self.assertIsInstance(result, dict)
self.assertTrue(result['stack_id'])
self.m.VerifyAll()
class StackServiceAuthorizeTest(common.HeatTestCase):
def setUp(self):

View File

@ -293,7 +293,9 @@ class StackTest(common.HeatTestCase):
username=mox.IgnoreArg(),
convergence=False,
current_traversal=None,
tags=mox.IgnoreArg())
tags=mox.IgnoreArg(),
prev_raw_template_id=None,
current_deps=None)
self.m.ReplayAll()
stack.Stack.load(self.ctx, stack_id=self.stack.id)