From 5189bbebabbbe2c672c8bb492c496e5780892d01 Mon Sep 17 00:00:00 2001 From: Rakesh H S Date: Sun, 8 Mar 2015 15:34:30 +0530 Subject: [PATCH] Convergence prepare traversal Generates the graph for traversal in convergence. * Updates current traversal for the stack * Deletes any sync_point entries of previous traversal * Generates the graph for traversal based on - resources loaded from db for the stack - resources that exist in present template * Stores resource.current_template_id and resource.requires * Stores the edges of graph in stack.current_deps * Creates sync_points for each node in graph * Creates sync_point for stack. blueprint convergence-prepare-traversal Change-Id: I507e67b39c820ed46d3b269fc76d6cf18d0ef2d7 --- heat/engine/dependencies.py | 39 ++- heat/engine/resource.py | 20 +- heat/engine/service.py | 54 ++-- heat/engine/stack.py | 162 ++++++++++- heat/engine/sync_point.py | 42 +++ heat/tests/engine/tools.py | 5 +- heat/tests/test_dependencies.py | 8 + heat/tests/test_engine_service.py | 439 +++++++++++++++++++++++++++++- heat/tests/test_stack.py | 4 +- 9 files changed, 737 insertions(+), 36 deletions(-) create mode 100644 heat/engine/sync_point.py diff --git a/heat/engine/dependencies.py b/heat/engine/dependencies.py index ca0a2bbeaf..c39be1e8fd 100644 --- a/heat/engine/dependencies.py +++ b/heat/engine/dependencies.py @@ -53,9 +53,14 @@ class Node(object): self.satisfy.add(source) return iter(self.satisfy) - def requires(self, target): - '''Add a key that this node requires.''' - self.require.add(target) + def requires(self, target=None): + ''' + Add a key that this node requires, and optionally add a + new one. + ''' + if target is not None: + self.require.add(target) + return iter(self.require) def __isub__(self, target): '''Remove a key that this node requires.''' @@ -153,6 +158,13 @@ class Graph(collections.defaultdict): text = '{%s}' % ', '.join(pairs) return encodeutils.safe_decode(text) + def leaves(self): + ''' + Return an iterator over all of the leaf nodes in the graph. + ''' + return (requirer for requirer, required in self.items() + if not required) + @staticmethod def toposort(graph): ''' @@ -207,6 +219,15 @@ class Dependencies(object): return self._graph[last].required_by() + def requires(self, target): + ''' + List the keys that require the specified node. + ''' + if target not in self._graph: + raise KeyError + + return self._graph[target].requires() + def __getitem__(self, last): ''' Return a partial dependency graph consisting of the specified node and @@ -235,6 +256,18 @@ class Dependencies(object): return Dependencies(edges) + def translate(self, transform): + ''' + Translate all of the nodes using a transform function. + + Returns a new Dependencies object. + ''' + def transform_key(key): + return transform(key) if key is not None else None + + edges = self._graph.edges() + return type(self)(tuple(map(transform_key, e)) for e in edges) + def __str__(self): ''' Return a human-readable string representation of the dependency graph diff --git a/heat/engine/resource.py b/heat/engine/resource.py index e82fad84b5..a10aef3f86 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -174,11 +174,11 @@ class Resource(object): self.created_time = None self.updated_time = None self._rpc_client = None - self.needed_by = None - self.requires = None + self.needed_by = [] + self.requires = [] self.replaces = None self.replaced_by = None - self.current_template_id = stack.t.id + self.current_template_id = None resource = stack.db_resource_get(name) if resource: @@ -268,6 +268,20 @@ class Resource(object): rs.update_and_save({'rsrc_metadata': metadata}) self._rsrc_metadata = metadata + @classmethod + def set_needed_by(cls, db_rsrc, needed_by): + if db_rsrc: + db_rsrc.update_and_save( + {'needed_by': needed_by} + ) + + @classmethod + def set_requires(cls, db_rsrc, requires): + if db_rsrc: + db_rsrc.update_and_save( + {'requires': requires} + ) + def _break_if_required(self, action, hook): '''Block the resource until the hook is cleared if there is one.''' if self.stack.env.registry.matches_hook(self.name, hook): diff --git a/heat/engine/service.py b/heat/engine/service.py index 33cee1c9bb..74192803f4 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -36,6 +36,7 @@ from heat.common.i18n import _LW from heat.common import identifier from heat.common import messaging as rpc_messaging from heat.common import service_utils +from heat.common import template_format from heat.engine import api from heat.engine import attributes from heat.engine import clients @@ -668,8 +669,7 @@ class EngineService(service.Service): """ LOG.info(_LI('Creating stack %s'), stack_name) - def _stack_create(stack): - + def _create_stack_user(stack): if not stack.stack_user_project_id: try: stack.create_stack_user_project_id() @@ -677,6 +677,8 @@ class EngineService(service.Service): stack.state_set(stack.action, stack.FAILED, six.text_type(ex)) + def _stack_create(stack): + _create_stack_user(stack) # Create/Adopt a stack, and create the periodic task if successful if stack.adopt_stack_data: stack.adopt() @@ -692,18 +694,22 @@ class EngineService(service.Service): LOG.info(_LI("Stack create failed, status %s"), stack.status) convergence = cfg.CONF.convergence_engine - if convergence: - raise exception.NotSupported(feature=_('Convergence engine')) stack = self._parse_template_and_validate_stack( cnxt, stack_name, template, params, files, args, owner_id, nested_depth, user_creds_id, stack_user_project_id, convergence, parent_resource_name) - stack.store() - - self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, - _stack_create, stack) + # once validations are done + # if convergence is enabled, take convergence path + if convergence: + # TODO(later): call _create_stack_user(stack) + # call stack.converge_stack(template=stack.t, action=stack.CREATE) + raise exception.NotSupported(feature=_('Convergence engine')) + else: + stack.store() + self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, + _stack_create, stack) return dict(stack.identifier()) @@ -765,14 +771,20 @@ class EngineService(service.Service): self._validate_deferred_auth_context(cnxt, updated_stack) updated_stack.validate() - event = eventlet.event.Event() - th = self.thread_group_mgr.start_with_lock(cnxt, current_stack, - self.engine_id, - current_stack.update, - updated_stack, - event=event) - th.link(self.thread_group_mgr.remove_event, current_stack.id, event) - self.thread_group_mgr.add_event(current_stack.id, event) + # Once all the validations are done + # if convergence is enabled, take the convergence path + if current_kwargs['convergence']: + current_stack.converge_stack(template=tmpl) + else: + event = eventlet.event.Event() + th = self.thread_group_mgr.start_with_lock(cnxt, current_stack, + self.engine_id, + current_stack.update, + updated_stack, + event=event) + th.link(self.thread_group_mgr.remove_event, + current_stack.id, event) + self.thread_group_mgr.add_event(current_stack.id, event) return dict(current_stack.identifier()) @context.request_context @@ -927,6 +939,16 @@ class EngineService(service.Service): LOG.info(_LI('Deleting stack %s'), st.name) stack = parser.Stack.load(cnxt, stack=st) + if stack.convergence: + empty_template = ''' + heat_template_version: 2013-05-23 + description: Empty Template + ''' + tmpl = template_format.parse(empty_template) + template = templatem.Template(tmpl) + stack.converge_stack(template=template, action=stack.DELETE) + return + lock = stack_lock.StackLock(cnxt, stack.id, self.engine_id) with lock.try_thread_lock() as acquire_result: diff --git a/heat/engine/stack.py b/heat/engine/stack.py index 4dca73d397..bdf2872c30 100755 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -21,6 +21,7 @@ import warnings from oslo_config import cfg from oslo_log import log as logging from oslo_utils import encodeutils +from oslo_utils import uuidutils from osprofiler import profiler import six @@ -40,6 +41,7 @@ from heat.engine import parameter_groups as param_groups from heat.engine import resource from heat.engine import resources from heat.engine import scheduler +from heat.engine import sync_point from heat.engine import template as tmpl from heat.engine import update from heat.objects import resource as resource_objects @@ -85,7 +87,8 @@ class Stack(collections.Mapping): user_creds_id=None, tenant_id=None, use_stored_context=False, username=None, nested_depth=0, strict_validate=True, convergence=False, - current_traversal=None, tags=None): + current_traversal=None, tags=None, prev_raw_template_id=None, + current_deps=None): ''' Initialise from a context, name, Template object and (optionally) Environment object. The database ID may also be initialised, if the @@ -129,6 +132,8 @@ class Stack(collections.Mapping): self.convergence = convergence self.current_traversal = current_traversal self.tags = tags + self.prev_raw_template_id = prev_raw_template_id + self.current_deps = current_deps if use_stored_context: self.context = self.stored_context() @@ -399,7 +404,9 @@ class Stack(collections.Mapping): user_creds_id=stack.user_creds_id, tenant_id=stack.tenant, use_stored_context=use_stored_context, username=stack.username, convergence=stack.convergence, - current_traversal=stack.current_traversal, tags=tags) + current_traversal=stack.current_traversal, tags=tags, + prev_raw_template_id=stack.prev_raw_template_id, + current_deps=stack.current_deps) def get_kwargs_for_cloning(self, keep_status=False, only_db=False): """Get common kwargs for calling Stack() for cloning. @@ -425,6 +432,8 @@ class Stack(collections.Mapping): 'nested_depth': self.nested_depth, 'convergence': self.convergence, 'current_traversal': self.current_traversal, + 'prev_raw_template_id': self.prev_raw_template_id, + 'current_deps': self.current_deps } if keep_status: stack.update({ @@ -898,6 +907,155 @@ class Stack(collections.Mapping): event=event) updater() + @profiler.trace('Stack.converge_stack', hide_args=False) + def converge_stack(self, template, action=UPDATE): + """ + Updates the stack and triggers convergence for resources + """ + self.prev_raw_template_id = getattr(self.t, 'id', None) + self.t = template + previous_traversal = self.current_traversal + self.current_traversal = uuidutils.generate_uuid() + self.store() + + # TODO(later): lifecycle_plugin_utils.do_pre_ops + self.state_set(action, self.IN_PROGRESS, + 'Stack %s started' % action) + + # delete the prev traversal sync_points + sync_point.delete_all(self.context, self.id, previous_traversal) + self._converge_create_or_update() + + def _converge_create_or_update(self): + self._update_or_store_resources() + self.convergence_dependencies = self._convergence_dependencies( + self.ext_rsrcs_db, self.dependencies) + LOG.info(_LI('convergence_dependencies: %s'), + self.convergence_dependencies) + + # create sync_points for resources in DB + for rsrc_id, is_update in self.convergence_dependencies: + sync_point.create(self.context, rsrc_id, + self.current_traversal, is_update, + self.id) + # create sync_point entry for stack + sync_point.create( + self.context, self.id, self.current_traversal, + False if self.action in (self.DELETE, self.SUSPEND) else True, + self.id) + + # Store list of edges + self.current_deps = { + 'edges': [[rqr, rqd] for rqr, rqd in + self.convergence_dependencies.graph().edges()]} + self.store() + + leaves = (self.convergence_dependencies.graph(reverse=True).leaves() + if self.action in (self.DELETE, self.SUSPEND) + else self.convergence_dependencies.graph().leaves()) + + for rsrc_id, is_update in leaves: + LOG.info(_LI("Triggering resource %(rsrc_id)s " + "for update=%(is_update)s"), + {'rsrc_id': rsrc_id, 'is_update': is_update}) + self.temp_update_requires(self.convergence_dependencies) + + def _update_or_store_resources(self): + try: + ext_rsrcs_db = resource_objects.Resource.get_all_by_stack( + self.context, self.id) + except exception.NotFound: + self.ext_rsrcs_db = None + else: + self.ext_rsrcs_db = {res.id: res + for res_name, res in ext_rsrcs_db.items()} + + def get_existing_rsrc_db(rsrc_name): + candidate = None + if self.ext_rsrcs_db: + for id, ext_rsrc in self.ext_rsrcs_db.items(): + if ext_rsrc.name != rsrc_name: + continue + if ext_rsrc.current_template_id == self.t.id: + # Rollback where the previous resource still exists + candidate = ext_rsrc + break + elif (ext_rsrc.current_template_id == + self.prev_raw_template_id): + # Current resource is otherwise a good candidate + candidate = ext_rsrc + break + return candidate + + curr_name_translated_dep = self.dependencies.translate(lambda res: + res.name) + rsrcs = {} + + def update_needed_by(res): + new_requirers = set( + rsrcs[rsrc_name].id for rsrc_name in + curr_name_translated_dep.required_by(res.name) + ) + old_requirers = set(res.needed_by) if res.needed_by else set() + needed_by = old_requirers | new_requirers + res.needed_by = list(needed_by) + + for rsrc in reversed(self.dependencies): + existing_rsrc_db = get_existing_rsrc_db(rsrc.name) + if existing_rsrc_db is None: + update_needed_by(rsrc) + rsrc.current_template_id = self.t.id + rsrc._store() + rsrcs[rsrc.name] = rsrc + else: + update_needed_by(existing_rsrc_db) + resource.Resource.set_needed_by( + existing_rsrc_db, existing_rsrc_db.needed_by + ) + rsrcs[existing_rsrc_db.name] = existing_rsrc_db + + def _convergence_dependencies(self, existing_resources, + curr_template_dep): + dep = curr_template_dep.translate(lambda res: (res.id, True)) + if existing_resources: + for rsrc_id, rsrc in existing_resources.items(): + dep += (rsrc_id, False), None + + for requirement in rsrc.requires: + if requirement in existing_resources: + dep += (requirement, False), (rsrc_id, False) + if rsrc.replaces in existing_resources: + dep += (rsrc.replaces, False), (rsrc_id, False) + + if (rsrc.id, True) in dep: + dep += (rsrc_id, False), (rsrc_id, True) + return dep + + def temp_update_requires(self, conv_deps): + '''updates requires column of resources''' + # This functions should be removed once the dependent patches + # are implemented. + if self.action in (self.CREATE, self.UPDATE): + requires = dict() + for rsrc_id, is_update in conv_deps: + reqs = conv_deps.requires((rsrc_id, is_update)) + requires[rsrc_id] = list({id for id, is_update in reqs}) + + try: + rsrcs_db = resource_objects.Resource.get_all_by_stack( + self.context, self.id) + except exception.NotFound: + rsrcs_db = None + else: + rsrcs_db = {res.id: res for res_name, res in rsrcs_db.items()} + + if rsrcs_db: + for id, db_rsrc in rsrcs_db.items(): + if id in requires: + resource.Resource.set_requires( + db_rsrc, requires[id] + ) + @scheduler.wrappertask def update_task(self, newstack, action=UPDATE, event=None): if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE): diff --git a/heat/engine/sync_point.py b/heat/engine/sync_point.py new file mode 100644 index 0000000000..dbb66222c8 --- /dev/null +++ b/heat/engine/sync_point.py @@ -0,0 +1,42 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from heat.objects import sync_point as sync_point_object + + +def create(context, entity_id, traversal_id, is_update, stack_id): + """ + Creates an sync point entry in DB. + """ + values = {'entity_id': entity_id, 'traversal_id': traversal_id, + 'is_update': is_update, 'atomic_key': 0, + 'stack_id': stack_id, 'input_data': {}} + return sync_point_object.SyncPoint.create(context, values) + + +def get(context, entity_id, traversal_id, is_update): + """ + Retrieves a sync point entry from DB. + """ + return sync_point_object.SyncPoint.get_by_key(context, entity_id, + traversal_id, is_update) + + +def delete_all(context, stack_id, traversal_id): + """ + Deletes all sync points of a stack associated with a particular traversal. + """ + return sync_point_object.SyncPoint.delete_all_by_stack_and_traversal( + context, stack_id, traversal_id + ) diff --git a/heat/tests/engine/tools.py b/heat/tests/engine/tools.py index 4aaa5ae1d1..cbc8c42bff 100644 --- a/heat/tests/engine/tools.py +++ b/heat/tests/engine/tools.py @@ -46,7 +46,8 @@ resources: ''' -def get_stack(stack_name, ctx, template=None, with_params=True): +def get_stack(stack_name, ctx, template=None, with_params=True, + convergence=False): if template is None: t = template_format.parse(wp_template) if with_params: @@ -57,7 +58,7 @@ def get_stack(stack_name, ctx, template=None, with_params=True): else: t = template_format.parse(template) tmpl = templatem.Template(t) - stack = parser.Stack(ctx, stack_name, tmpl) + stack = parser.Stack(ctx, stack_name, tmpl, convergence=convergence) return stack diff --git a/heat/tests/test_dependencies.py b/heat/tests/test_dependencies.py index e5b57e971b..9159ff442e 100644 --- a/heat/tests/test_dependencies.py +++ b/heat/tests/test_dependencies.py @@ -226,3 +226,11 @@ class dependenciesTest(common.HeatTestCase): "'%s' not found in required_by" % n) self.assertRaises(KeyError, d.required_by, 'foo') + + def test_graph_leaves(self): + d = dependencies.Dependencies([('last1', 'mid'), ('last2', 'mid'), + ('mid', 'first1'), ('mid', 'first2')]) + + leaves = sorted(list(d._graph.leaves())) + + self.assertEqual(['first1', 'first2'], leaves) diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index c354557702..ca55f553cf 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -50,6 +50,7 @@ from heat.objects import service as service_objects from heat.objects import software_deployment as software_deployment_object from heat.objects import stack as stack_object from heat.objects import stack_lock as stack_lock_object +from heat.objects import sync_point as sync_point_object from heat.objects import watch_data as watch_data_object from heat.objects import watch_rule as watch_rule_object from heat.openstack.common import threadgroup @@ -65,6 +66,89 @@ cfg.CONF.import_opt('engine_life_check_timeout', 'heat.common.config') cfg.CONF.import_opt('enable_stack_abandon', 'heat.common.config') +string_template_five = ''' +heat_template_version: 2013-05-23 +description: Random String templates + +parameters: + salt: + type: string + default: "quickbrownfox" + +resources: + A: + type: OS::Heat::RandomString + properties: + salt: {get_param: salt} + + B: + type: OS::Heat::RandomString + properties: + salt: {get_param: salt} + + C: + type: OS::Heat::RandomString + depends_on: [A, B] + properties: + salt: {get_param: salt} + + D: + type: OS::Heat::RandomString + depends_on: C + properties: + salt: {get_param: salt} + + E: + type: OS::Heat::RandomString + depends_on: C + properties: + salt: {get_param: salt} +''' + +string_template_five_update = ''' +heat_template_version: 2013-05-23 +description: Random String templates + +parameters: + salt: + type: string + default: "quickbrownfox123" + +resources: + A: + type: OS::Heat::RandomString + properties: + salt: {get_param: salt} + + B: + type: OS::Heat::RandomString + properties: + salt: {get_param: salt} + + F: + type: OS::Heat::RandomString + depends_on: [A, B] + properties: + salt: {get_param: salt} + + G: + type: OS::Heat::RandomString + depends_on: F + properties: + salt: {get_param: salt} + + H: + type: OS::Heat::RandomString + depends_on: F + properties: + salt: {get_param: salt} +''' + +empty_template = ''' +heat_template_version: 2013-05-23 +description: Empty Template +''' + wp_template_no_default = ''' { "AWSTemplateFormatVersion" : "2010-09-09", @@ -152,6 +236,234 @@ resources: ''' +class StackConvergenceCreateUpdateTest(common.HeatTestCase): + def setUp(self): + super(StackConvergenceCreateUpdateTest, self).setUp() + cfg.CONF.set_override('convergence_engine', True) + + def test_conv_wordpress_single_instance_stack_create(self): + stack = tools.get_stack('test_stack', utils.dummy_context(), + convergence=True) + stack.converge_stack(template=stack.t, action=stack.CREATE) + self.assertIsNone(stack.ext_rsrcs_db) + self.assertEqual('Dependencies([((1, True), None)])', + repr(stack.convergence_dependencies)) + + stack_db = stack_object.Stack.get_by_id(stack.context, stack.id) + self.assertIsNotNone(stack_db.current_traversal) + self.assertIsNotNone(stack_db.raw_template_id) + + self.assertIsNone(stack_db.prev_raw_template_id) + + self.assertEqual(stack_db.convergence, True) + self.assertEqual({'edges': [[[1, True], None]]}, stack_db.current_deps) + + def test_conv_string_five_instance_stack_create(self): + stack = tools.get_stack('test_stack', utils.dummy_context(), + template=string_template_five, + convergence=True) + stack.converge_stack(template=stack.t, action=stack.CREATE) + self.assertIsNone(stack.ext_rsrcs_db) + self.assertEqual('Dependencies([' + '((3, True), (5, True)), ' + '((3, True), (4, True)), ' + '((1, True), (3, True)), ' + '((2, True), (3, True))])', + repr(stack.convergence_dependencies)) + + stack_db = stack_object.Stack.get_by_id(stack.context, stack.id) + self.assertIsNotNone(stack_db.current_traversal) + self.assertIsNotNone(stack_db.raw_template_id) + self.assertIsNone(stack_db.prev_raw_template_id) + self.assertEqual(stack_db.convergence, True) + self.assertEqual(sorted([[[3, True], [5, True]], # C, A + [[3, True], [4, True]], # C, B + [[1, True], [3, True]], # E, C + [[2, True], [3, True]]]), # D, C + sorted(stack_db.current_deps['edges'])) + + # check if needed_by is stored properly + expected_needed_by = {'A': [3], 'B': [3], + 'C': [1, 2], + 'D': [], 'E': []} + rsrcs_db = resource_objects.Resource.get_all_by_stack( + stack_db._context, stack_db.id + ) + self.assertEqual(5, len(rsrcs_db)) + for rsrc_name, rsrc_obj in rsrcs_db.items(): + self.assertEqual(sorted(expected_needed_by[rsrc_name]), + sorted(rsrc_obj.needed_by)) + self.assertEqual(stack_db.raw_template_id, + rsrc_obj.current_template_id) + + # check if sync_points were stored + for entity_id in [5, 4, 3, 2, 1, stack_db.id]: + sync_point = sync_point_object.SyncPoint.get_by_key( + stack_db._context, entity_id, stack_db.current_traversal, True + ) + self.assertIsNotNone(sync_point) + self.assertEqual(stack_db.id, sync_point.stack_id) + + def test_conv_string_five_instance_stack_update(self): + stack = tools.get_stack('test_stack', utils.dummy_context(), + template=string_template_five, + convergence=True) + # create stack + stack.converge_stack(template=stack.t, action=stack.CREATE) + + curr_stack_db = stack_object.Stack.get_by_id(stack.context, stack.id) + curr_stack = parser.Stack.load(curr_stack_db._context, + stack=curr_stack_db) + # update stack with new template + t2 = template_format.parse(string_template_five_update) + template2 = templatem.Template( + t2, env=environment.Environment({'KeyName2': 'test2'})) + curr_stack.converge_stack(template=template2, action=stack.UPDATE) + + self.assertIsNotNone(curr_stack.ext_rsrcs_db) + self.assertEqual('Dependencies([' + '((7, True), (8, True)), ' + '((8, True), (5, True)), ' + '((8, True), (4, True)), ' + '((6, True), (8, True)), ' + '((3, False), (2, False)), ' + '((3, False), (1, False)), ' + '((5, False), (3, False)), ' + '((5, False), (5, True)), ' + '((4, False), (3, False)), ' + '((4, False), (4, True))])', + repr(curr_stack.convergence_dependencies)) + + stack_db = stack_object.Stack.get_by_id(curr_stack.context, + curr_stack.id) + self.assertIsNotNone(stack_db.raw_template_id) + self.assertIsNotNone(stack_db.current_traversal) + self.assertIsNotNone(stack_db.prev_raw_template_id) + self.assertEqual(True, stack_db.convergence) + self.assertEqual(sorted([[[7, True], [8, True]], + [[8, True], [5, True]], + [[8, True], [4, True]], + [[6, True], [8, True]], + [[3, False], [2, False]], + [[3, False], [1, False]], + [[5, False], [3, False]], + [[5, False], [5, True]], + [[4, False], [3, False]], + [[4, False], [4, True]]]), + sorted(stack_db.current_deps['edges'])) + ''' + To visualize: + + G(7, True) H(6, True) + \ / + \ / B(4, False) A(5, False) + \ / / \ / / + \ / / / + F(8, True) / / \ / + / \ / / C(3, False) + / \ / / \ + / / \ / + / / \ / / \ + B(4, True) A(5, True) D(2, False) E(1, False) + + Leaves are at the bottom + ''' + + # check if needed_by are stored properly + # For A & B: + # needed_by=C, F + # TODO(later): when worker is implemented test for current_template_id + # Also test for requires + + expected_needed_by = {'A': [3, 8], 'B': [3, 8], + 'C': [1, 2], + 'D': [], 'E': [], + 'F': [6, 7], + 'G': [], 'H': []} + rsrcs_db = resource_objects.Resource.get_all_by_stack( + stack_db._context, stack_db.id + ) + self.assertEqual(8, len(rsrcs_db)) + for rsrc_name, rsrc_obj in rsrcs_db.items(): + self.assertEqual(sorted(expected_needed_by[rsrc_name]), + sorted(rsrc_obj.needed_by)) + + # check if sync_points are created for forward traversal + # [F, H, G, A, B, Stack] + for entity_id in [8, 7, 6, 5, 4, stack_db.id]: + sync_point = sync_point_object.SyncPoint.get_by_key( + stack_db._context, entity_id, stack_db.current_traversal, True + ) + self.assertIsNotNone(sync_point) + self.assertEqual(stack_db.id, sync_point.stack_id) + + # check if sync_points are created for cleanup traversal + # [A, B, C, D, E] + for entity_id in [5, 4, 3, 2, 1]: + sync_point = sync_point_object.SyncPoint.get_by_key( + stack_db._context, entity_id, stack_db.current_traversal, False + ) + self.assertIsNotNone(sync_point) + self.assertEqual(stack_db.id, sync_point.stack_id) + + def test_conv_empty_template_stack_update_delete(self): + stack = tools.get_stack('test_stack', utils.dummy_context(), + template=string_template_five, + convergence=True) + # create stack + stack.converge_stack(template=stack.t, action=stack.CREATE) + + # update stack with new template + t2 = template_format.parse(empty_template) + template2 = templatem.Template( + t2, env=environment.Environment({'KeyName2': 'test2'})) + + curr_stack_db = stack_object.Stack.get_by_id(stack.context, stack.id) + curr_stack = parser.Stack.load(curr_stack_db._context, + stack=curr_stack_db) + curr_stack.converge_stack(template=template2, action=stack.DELETE) + + self.assertIsNotNone(curr_stack.ext_rsrcs_db) + self.assertEqual('Dependencies([' + '((3, False), (2, False)), ' + '((3, False), (1, False)), ' + '((5, False), (3, False)), ' + '((4, False), (3, False))])', + repr(curr_stack.convergence_dependencies)) + + stack_db = stack_object.Stack.get_by_id(curr_stack.context, + curr_stack.id) + self.assertIsNotNone(stack_db.current_traversal) + self.assertIsNotNone(stack_db.prev_raw_template_id) + self.assertEqual(sorted([[[3, False], [2, False]], + [[3, False], [1, False]], + [[5, False], [3, False]], + [[4, False], [3, False]]]), + sorted(stack_db.current_deps['edges'])) + + # TODO(later): when worker is implemented test for current_template_id + # Also test for requires + expected_needed_by = {'A': [3], 'B': [3], + 'C': [1, 2], + 'D': [], 'E': []} + rsrcs_db = resource_objects.Resource.get_all_by_stack( + stack_db._context, stack_db.id + ) + self.assertEqual(5, len(rsrcs_db)) + for rsrc_name, rsrc_obj in rsrcs_db.items(): + self.assertEqual(sorted(expected_needed_by[rsrc_name]), + sorted(rsrc_obj.needed_by)) + + # check if sync_points are created for cleanup traversal + # [A, B, C, D, E, Stack] + for entity_id in [5, 4, 3, 2, 1, stack_db.id]: + sync_point = sync_point_object.SyncPoint.get_by_key( + stack_db._context, entity_id, stack_db.current_traversal, False + ) + self.assertIsNotNone(sync_point) + self.assertEqual(stack_db.id, sync_point.stack_id) + + class StackCreateTest(common.HeatTestCase): def setUp(self): super(StackCreateTest, self).setUp() @@ -444,15 +756,6 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase): self.man.create_stack, self.ctx, stack_name, stack.t.t, {}, None, {}) - def test_stack_create_enabled_convergence_engine(self): - cfg.CONF.set_override('convergence_engine', True) - ex = self.assertRaises(dispatcher.ExpectedException, - self.man.create_stack, self.ctx, 'test', - tools.wp_template, {}, None, {}) - self.assertEqual(exception.NotSupported, ex.exc_info[0]) - self.assertEqual('Convergence engine is not supported.', - six.text_type(ex.exc_info[1])) - def test_stack_create_invalid_resource_name(self): stack_name = 'service_create_test_stack_invalid_res' stack = tools.get_stack(stack_name, self.ctx) @@ -836,6 +1139,8 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase): stack.t, convergence=False, current_traversal=None, + prev_raw_template_id=None, + current_deps=None, disable_rollback=True, nested_depth=0, owner_id=None, @@ -895,6 +1200,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase): parser.Stack(self.ctx, stack.name, stack.t, convergence=False, current_traversal=None, + prev_raw_template_id=None, current_deps=None, disable_rollback=True, nested_depth=0, owner_id=None, parent_resource=None, stack_user_project_id='1234', @@ -947,6 +1253,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase): parser.Stack(self.ctx, stack.name, stack.t, convergence=False, current_traversal=None, + prev_raw_template_id=None, current_deps=None, disable_rollback=False, nested_depth=0, owner_id=None, parent_resource=None, stack_user_project_id='1234', @@ -1056,6 +1363,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase): parser.Stack(self.ctx, stack.name, stack.t, convergence=False, current_traversal=None, + prev_raw_template_id=None, current_deps=None, disable_rollback=True, nested_depth=0, owner_id=None, parent_resource=None, stack_user_project_id='1234', strict_validate=True, @@ -1185,6 +1493,7 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase): parser.Stack(self.ctx, stack.name, stack.t, convergence=False, current_traversal=None, + prev_raw_template_id=None, current_deps=None, disable_rollback=True, nested_depth=0, owner_id=None, parent_resource=None, stack_user_project_id='1234', strict_validate=True, @@ -1251,6 +1560,8 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase): old_stack.t, convergence=False, current_traversal=None, + prev_raw_template_id=None, + current_deps=None, disable_rollback=True, nested_depth=0, owner_id=None, @@ -1317,6 +1628,116 @@ class StackServiceCreateUpdateDeleteTest(common.HeatTestCase): six.text_type(ex)) +class StackConvergenceServiceCreateUpdateTest(common.HeatTestCase): + + def setUp(self): + super(StackConvergenceServiceCreateUpdateTest, self).setUp() + cfg.CONF.set_override('convergence_engine', True) + self.ctx = utils.dummy_context() + self.patch('heat.engine.service.warnings') + self.man = service.EngineService('a-host', 'a-topic') + self.man.create_periodic_tasks() + + def _stub_update_mocks(self, stack_to_load, stack_to_return): + self.m.StubOutWithMock(parser, 'Stack') + self.m.StubOutWithMock(parser.Stack, 'load') + parser.Stack.load(self.ctx, stack=stack_to_load + ).AndReturn(stack_to_return) + + self.m.StubOutWithMock(templatem, 'Template') + self.m.StubOutWithMock(environment, 'Environment') + + def _test_stack_create_convergence(self, stack_name): + params = {'foo': 'bar'} + template = '{ "Template": "data" }' + + stack = tools.get_stack(stack_name, self.ctx, + template=string_template_five, + convergence=True) + + self.m.StubOutWithMock(templatem, 'Template') + self.m.StubOutWithMock(environment, 'Environment') + self.m.StubOutWithMock(parser, 'Stack') + + templatem.Template(template, files=None, + env=stack.env).AndReturn(stack.t) + environment.Environment(params).AndReturn(stack.env) + parser.Stack(self.ctx, stack.name, + stack.t, owner_id=None, + parent_resource=None, + nested_depth=0, user_creds_id=None, + stack_user_project_id=None, + convergence=True).AndReturn(stack) + + self.m.StubOutWithMock(stack, 'validate') + stack.validate().AndReturn(None) + + self.m.ReplayAll() + + # TODO(later): Remove exception once convergence is supported. + ex = self.assertRaises(dispatcher.ExpectedException, + self.man.create_stack, self.ctx, stack_name, + template, params, None, {}) + self.assertEqual(exception.NotSupported, ex.exc_info[0]) + self.assertEqual('Convergence engine is not supported.', + six.text_type(ex.exc_info[1])) + self.m.VerifyAll() + + def test_stack_create_enabled_convergence_engine(self): + stack_name = 'service_create_test_stack' + self._test_stack_create_convergence(stack_name) + + def test_stack_update_enabled_convergence_engine(self): + stack_name = 'service_update_test_stack' + params = {'foo': 'bar'} + template = '{ "Template": "data" }' + old_stack = tools.get_stack(stack_name, self.ctx, + template=string_template_five, + convergence=True) + sid = old_stack.store() + s = stack_object.Stack.get_by_id(self.ctx, sid) + + stack = tools.get_stack(stack_name, self.ctx, + template=string_template_five_update, + convergence=True) + + self._stub_update_mocks(s, old_stack) + + templatem.Template(template, files=None, + env=stack.env).AndReturn(stack.t) + environment.Environment(params).AndReturn(stack.env) + parser.Stack(self.ctx, stack.name, + stack.t, + owner_id=old_stack.owner_id, + nested_depth=old_stack.nested_depth, + user_creds_id=old_stack.user_creds_id, + stack_user_project_id=old_stack.stack_user_project_id, + timeout_mins=60, + disable_rollback=True, + parent_resource=None, + strict_validate=True, + tenant_id=old_stack.tenant_id, + username=old_stack.username, + convergence=old_stack.convergence, + current_traversal=old_stack.current_traversal, + prev_raw_template_id=old_stack.prev_raw_template_id, + current_deps=old_stack.current_deps).AndReturn(stack) + + self.m.StubOutWithMock(stack, 'validate') + stack.validate().AndReturn(None) + + self.m.ReplayAll() + + api_args = {'timeout_mins': 60} + result = self.man.update_stack(self.ctx, old_stack.identifier(), + template, params, None, api_args) + self.assertEqual(old_stack.convergence, True) + self.assertEqual(old_stack.identifier(), result) + self.assertIsInstance(result, dict) + self.assertTrue(result['stack_id']) + self.m.VerifyAll() + + class StackServiceAuthorizeTest(common.HeatTestCase): def setUp(self): diff --git a/heat/tests/test_stack.py b/heat/tests/test_stack.py index 1933f085a3..561608e930 100644 --- a/heat/tests/test_stack.py +++ b/heat/tests/test_stack.py @@ -293,7 +293,9 @@ class StackTest(common.HeatTestCase): username=mox.IgnoreArg(), convergence=False, current_traversal=None, - tags=mox.IgnoreArg()) + tags=mox.IgnoreArg(), + prev_raw_template_id=None, + current_deps=None) self.m.ReplayAll() stack.Stack.load(self.ctx, stack_id=self.stack.id)