From ad104c51bf53337a7e1974e5e1c1f5df149a47ae Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Wed, 17 Jun 2015 13:27:06 +1000 Subject: [PATCH] convergence: sync_point fixes This is a merge of 4 reviews: I52f1611d34def3474acba0e5eee054e11c5fc5ad Ic374a38c9d76763be341d3a80f53fa396c9c2256 Iecd21ccb4392369f66fa1b3a0cf55aad754aeac4 I77b81097d2dcf01efa540237ed5ae14896ed1670 - make sure sender is a tuple (otherwise the serialization function in sync_point breaks.) - Update updated_time on any lifecycle operation(CREATE/UPDATE/DELETE) over a stack. - adjust sync_point logic to account for deletes Done by having only a single stack sync point for both updates and deletes. - Serialize/deserialize input_data for RPC - Make GraphKey's the norm in convergence worker - move temp_update_requires functionality to tests During intial stages of convergence to simulate the entire cycle some part of worker code was written in stack.py. Now that the convergence worker is implemented, this code needs to be executed only in tests. - Fix dictionary structure that's passed to resoure.(create/update) - Temporarily disable loading cache_data for stack to help fix other issues. Change-Id: Iecd21ccb4392369f66fa1b3a0cf55aad754aeac4 Co-Authored-by: Sirushti Murugesan Co-Authored-by: Rakesh H S --- heat/engine/dependencies.py | 7 +++++ heat/engine/resource.py | 3 +- heat/engine/stack.py | 37 ++++-------------------- heat/engine/sync_point.py | 2 +- heat/engine/worker.py | 34 +++++++++++----------- heat/tests/test_dependencies.py | 8 ++++++ heat/tests/test_engine_service.py | 47 +++++++++++++++++++++++++------ heat/tests/test_engine_worker.py | 6 ++-- heat/tests/test_stack.py | 2 +- heat/tests/test_sync_point.py | 4 +++ 10 files changed, 87 insertions(+), 63 deletions(-) diff --git a/heat/engine/dependencies.py b/heat/engine/dependencies.py index 0b7f394398..d488b5a390 100644 --- a/heat/engine/dependencies.py +++ b/heat/engine/dependencies.py @@ -256,6 +256,13 @@ class Dependencies(object): return (requirer for requirer, required in self._graph.items() if not required) + def roots(self): + ''' + Return an iterator over all of the root nodes in the graph. + ''' + return (requirer for requirer, required in self.graph( + reverse=True).items() if not required) + def translate(self, transform): ''' Translate all of the nodes using a transform function. diff --git a/heat/engine/resource.py b/heat/engine/resource.py index 8c54608a07..e85cd7128c 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -243,7 +243,8 @@ class Resource(object): # FIXME(sirushtim): Import this in global space. from heat.engine import stack as stack_mod db_res = resource_objects.Resource.get_obj(context, resource_id) - stack = stack_mod.Stack.load(context, db_res.stack_id, cache_data=data) + # TODO(sirushtim): Load stack from cache + stack = stack_mod.Stack.load(context, db_res.stack_id) # NOTE(sirushtim): Because on delete/cleanup operations, we simply # update with another template, the stack object won't have the # template of the previous stack-run. diff --git a/heat/engine/stack.py b/heat/engine/stack.py index ed42d44d77..177121c525 100755 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -944,6 +944,7 @@ class Stack(collections.Mapping): self.t = template previous_traversal = self.current_traversal self.current_traversal = uuidutils.generate_uuid() + self.updated_time = datetime.datetime.utcnow() self.store() # TODO(later): lifecycle_plugin_utils.do_pre_ops @@ -968,9 +969,7 @@ class Stack(collections.Mapping): self.id) # create sync_point entry for stack sync_point.create( - self.context, self.id, self.current_traversal, - False if self.action in (self.DELETE, self.SUSPEND) else True, - self.id) + self.context, self.id, self.current_traversal, True, self.id) # Store list of edges self.current_deps = { @@ -980,14 +979,12 @@ class Stack(collections.Mapping): for rsrc_id, is_update in self.convergence_dependencies.leaves(): LOG.info(_LI("Triggering resource %(rsrc_id)s " - "for update=%(is_update)s"), + "for %(is_update)s update"), {'rsrc_id': rsrc_id, 'is_update': is_update}) self.worker_client.check_resource(self.context, rsrc_id, self.current_traversal, {}, is_update) - self.temp_update_requires(self.convergence_dependencies) - def _update_or_store_resources(self): try: ext_rsrcs_db = resource_objects.Resource.get_all_by_stack( @@ -1059,31 +1056,6 @@ class Stack(collections.Mapping): dep += (rsrc_id, False), (rsrc_id, True) return dep - def temp_update_requires(self, conv_deps): - '''updates requires column of resources''' - # This functions should be removed once the dependent patches - # are implemented. - if self.action in (self.CREATE, self.UPDATE): - requires = dict() - for rsrc_id, is_update in conv_deps: - reqs = conv_deps.requires((rsrc_id, is_update)) - requires[rsrc_id] = list({id for id, is_update in reqs}) - - try: - rsrcs_db = resource_objects.Resource.get_all_by_stack( - self.context, self.id) - except exception.NotFound: - rsrcs_db = None - else: - rsrcs_db = {res.id: res for res_name, res in rsrcs_db.items()} - - if rsrcs_db: - for id, db_rsrc in rsrcs_db.items(): - if id in requires: - resource.Resource.set_requires( - db_rsrc, requires[id] - ) - @scheduler.wrappertask def update_task(self, newstack, action=UPDATE, event=None): if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE): @@ -1592,7 +1564,8 @@ class Stack(collections.Mapping): return False def cache_data_resource_id(self, resource_name): - return self.cache_data.get(resource_name, {}).get('id') + return self.cache_data.get( + resource_name, {}).get('physical_resource_id') def cache_data_resource_attribute(self, resource_name, attribute_key): return self.cache_data.get( diff --git a/heat/engine/sync_point.py b/heat/engine/sync_point.py index c26d6de667..b2f7c3f64c 100644 --- a/heat/engine/sync_point.py +++ b/heat/engine/sync_point.py @@ -105,7 +105,7 @@ def sync(cnxt, entity_id, current_traversal, is_update, propagate, else: LOG.debug('[%s] Ready %s: Got %s', key, entity_id, _dump_list(input_data)) - propagate(entity_id, input_data) + propagate(entity_id, serialize_input_data(input_data)) class SyncPointNotFound(Exception): diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 73209d7f72..0029b094a5 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -92,9 +92,13 @@ class WorkerService(service.Service): The node may be associated with either an update or a cleanup of its associated resource. ''' + data = dict(sync_point.deserialize_input_data(data)) try: - rsrc, stack = resource.Resource.load(cnxt, resource_id, data) - except exception.NotFound: + cache_data = {in_data.get( + 'name'): in_data for in_data in data.values() + if in_data is not None} + rsrc, stack = resource.Resource.load(cnxt, resource_id, cache_data) + except (exception.ResourceNotFound, exception.NotFound): return tmpl = stack.t @@ -144,10 +148,10 @@ class WorkerService(service.Service): propagate_check_resource( cnxt, self._rpc_client, req, current_traversal, set(graph[(req, fwd)]), graph_key, - input_data if fwd else rsrc.id, fwd) + input_data if fwd else None, fwd) check_stack_complete(cnxt, rsrc.stack, current_traversal, - rsrc.id, graph, is_update) + rsrc.id, deps, is_update) except sync_point.SyncPointNotFound: # NOTE(sirushtim): Implemented by spec # convergence-concurrent-workflow @@ -167,7 +171,7 @@ def construct_input_data(rsrc): return input_data -def check_stack_complete(cnxt, stack, current_traversal, sender, graph, +def check_stack_complete(cnxt, stack, current_traversal, sender_id, deps, is_update): ''' Mark the stack complete if the update is complete. @@ -175,21 +179,21 @@ def check_stack_complete(cnxt, stack, current_traversal, sender, graph, Complete is currently in the sense that all desired resources are in service, not that superfluous ones have been cleaned up. ''' - roots = set(key for (key, fwd), node in graph.items() - if not any(f for k, f in node.required_by())) + roots = set(deps.roots()) - if sender not in roots: + if (sender_id, is_update) not in roots: return def mark_complete(stack_id, data): stack.mark_complete(current_traversal) - sync_point.sync(cnxt, stack.id, current_traversal, is_update, - mark_complete, roots, {sender: None}) + sender_key = (sender_id, is_update) + sync_point.sync(cnxt, stack.id, current_traversal, True, + mark_complete, roots, {sender_key: None}) def propagate_check_resource(cnxt, rpc_client, next_res_id, - current_traversal, predecessors, sender, + current_traversal, predecessors, sender_key, sender_data, is_update): ''' Trigger processing of a node if all of its dependencies are satisfied. @@ -200,19 +204,17 @@ def propagate_check_resource(cnxt, rpc_client, next_res_id, sync_point.sync(cnxt, next_res_id, current_traversal, is_update, do_check, predecessors, - {sender: sender_data}) + {sender_key: sender_data}) def check_resource_update(rsrc, template_id, data): ''' Create or update the Resource if appropriate. ''' - input_data = {in_data.name: in_data for in_data in data.values()} - if rsrc.resource_id is None: - rsrc.create_convergence(template_id, input_data) + rsrc.create_convergence(template_id, data) else: - rsrc.update_convergence(template_id, input_data) + rsrc.update_convergence(template_id, data) def check_resource_cleanup(rsrc, template_id, data): diff --git a/heat/tests/test_dependencies.py b/heat/tests/test_dependencies.py index f5b88e909a..02df12bca6 100644 --- a/heat/tests/test_dependencies.py +++ b/heat/tests/test_dependencies.py @@ -234,3 +234,11 @@ class dependenciesTest(common.HeatTestCase): leaves = sorted(list(d.leaves())) self.assertEqual(['first1', 'first2'], leaves) + + def test_roots(self): + d = dependencies.Dependencies([('last1', 'mid'), ('last2', 'mid'), + ('mid', 'first1'), ('mid', 'first2')]) + + leaves = sorted(list(d.roots())) + + self.assertEqual(['last1', 'last2'], leaves) diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 17552872a0..6aa5102ade 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -269,6 +269,25 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): is_update)) self.assertEqual(expected_calls, mock_cr.mock_calls) + def _mock_conv_update_requires(self, stack, conv_deps): + """Updates requires column of resources. + Required for testing the generation of convergence dependency graph + on an update. + """ + requires = dict() + for rsrc_id, is_update in conv_deps: + reqs = conv_deps.requires((rsrc_id, is_update)) + requires[rsrc_id] = list({id for id, is_update in reqs}) + + rsrcs_db = resource_objects.Resource.get_all_by_stack( + stack.context, stack.id) + + for res_name, rsrc in rsrcs_db.items(): + if rsrc.id in requires: + rsrcs_db[res_name].requires = requires[rsrc.id] + + return rsrcs_db + def test_conv_string_five_instance_stack_update(self, mock_cr): stack = tools.get_stack('test_stack', utils.dummy_context(), template=tools.string_template_five, @@ -283,7 +302,13 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): t2 = template_format.parse(string_template_five_update) template2 = templatem.Template( t2, env=environment.Environment({'KeyName2': 'test2'})) - curr_stack.converge_stack(template=template2, action=stack.UPDATE) + + # on our previous create_complete, worker would have updated the + # rsrc.requires. Mock the same behavior here. + with mock.patch.object(resource_objects.Resource, 'get_all_by_stack', + return_value=self._mock_conv_update_requires( + stack, stack.convergence_dependencies)): + curr_stack.converge_stack(template=template2, action=stack.UPDATE) self.assertIsNotNone(curr_stack.ext_rsrcs_db) self.assertEqual('Dependencies([' @@ -337,8 +362,6 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): # check if needed_by are stored properly # For A & B: # needed_by=C, F - # TODO(later): when worker is implemented test for current_template_id - # Also test for requires expected_needed_by = {'A': [3, 8], 'B': [3, 8], 'C': [1, 2], @@ -401,7 +424,12 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): curr_stack_db = stack_object.Stack.get_by_id(stack.context, stack.id) curr_stack = parser.Stack.load(curr_stack_db._context, stack=curr_stack_db) - curr_stack.converge_stack(template=template2, action=stack.DELETE) + # on our previous create_complete, worker would have updated the + # rsrc.requires. Mock the same behavior here. + with mock.patch.object(resource_objects.Resource, 'get_all_by_stack', + return_value=self._mock_conv_update_requires( + stack, stack.convergence_dependencies)): + curr_stack.converge_stack(template=template2, action=stack.DELETE) self.assertIsNotNone(curr_stack.ext_rsrcs_db) self.assertEqual('Dependencies([' @@ -421,8 +449,6 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): [[4, False], [3, False]]]), sorted(stack_db.current_deps['edges'])) - # TODO(later): when worker is implemented test for current_template_id - # Also test for requires expected_needed_by = {'A': [3], 'B': [3], 'C': [1, 2], 'D': [], 'E': []} @@ -437,10 +463,13 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): # check if sync_points are created for cleanup traversal # [A, B, C, D, E, Stack] for entity_id in [5, 4, 3, 2, 1, stack_db.id]: + is_update = False + if entity_id == stack_db.id: + is_update = True sync_point = sync_point_object.SyncPoint.get_by_key( - stack_db._context, entity_id, stack_db.current_traversal, False - ) - self.assertIsNotNone(sync_point) + stack_db._context, entity_id, stack_db.current_traversal, + is_update) + self.assertIsNotNone(sync_point, 'entity %s' % entity_id) self.assertEqual(stack_db.id, sync_point.stack_id) leaves = stack.convergence_dependencies.leaves() diff --git a/heat/tests/test_engine_worker.py b/heat/tests/test_engine_worker.py index 511e65f85c..5cdd90cffa 100644 --- a/heat/tests/test_engine_worker.py +++ b/heat/tests/test_engine_worker.py @@ -255,17 +255,17 @@ class MiscMethodsTest(common.HeatTestCase): def test_check_stack_complete_root(self, mock_sync): worker.check_stack_complete( self.ctx, self.stack, self.stack.current_traversal, - self.stack['E'].id, self.stack.convergence_dependencies.graph(), + self.stack['E'].id, self.stack.convergence_dependencies, True) mock_sync.assert_called_once_with( self.ctx, self.stack.id, self.stack.current_traversal, True, - mock.ANY, mock.ANY, {self.stack['E'].id: None}) + mock.ANY, mock.ANY, {(self.stack['E'].id, True): None}) @mock.patch.object(sync_point, 'sync') def test_check_stack_complete_child(self, mock_sync): worker.check_stack_complete( self.ctx, self.stack, self.stack.current_traversal, - self.resource.id, self.stack.convergence_dependencies.graph(), + self.resource.id, self.stack.convergence_dependencies, True) self.assertFalse(mock_sync.called) diff --git a/heat/tests/test_stack.py b/heat/tests/test_stack.py index 0eb3722d8d..45b1c8b955 100644 --- a/heat/tests/test_stack.py +++ b/heat/tests/test_stack.py @@ -1909,7 +1909,7 @@ class StackTest(common.HeatTestCase): } }) - cache_data = {'foo': {'id': 'physical-resource-id'}} + cache_data = {'foo': {'physical_resource_id': 'physical-resource-id'}} tmpl_stack = stack.Stack(self.ctx, 'test', tmpl) tmpl_stack.store() lightweight_stack = stack.Stack.load(self.ctx, stack_id=tmpl_stack.id, diff --git a/heat/tests/test_sync_point.py b/heat/tests/test_sync_point.py index 54c553a811..171143c299 100644 --- a/heat/tests/test_sync_point.py +++ b/heat/tests/test_sync_point.py @@ -62,3 +62,7 @@ class SyncPointTestCase(common.HeatTestCase): updated_sync_point.input_data) self.assertEqual({sender: None}, input_data) self.assertTrue(mock_callback.called) + + def test_serialize_input_data(self): + res = sync_point.serialize_input_data({(3L, 8): None}) + self.assertEqual({'input_data': [[[3L, 8], None]]}, res)