From 906a0ed6fd94dcb25bdc345ef09b5e5a9d1e1816 Mon Sep 17 00:00:00 2001 From: Rakesh H S Date: Fri, 12 Feb 2016 15:41:29 +0530 Subject: [PATCH] Convergence: Fix concurrent update resource delete In convergence, wherein concurrent updates are possible, if a resource is deleted (by previous traversal) after dependency graph is created for new traversal, the resource remains in graph but wouldn't be available in DB for processing. It is prerequisite to have resources in DB before any action can be taken on them. Hence during convergence resource delete action, the resource entry from DB is not deleted i.e soft deleted, so that the latest/new update can find the entry. All of these soft deleted resources will be deleted when the stack has completed its operation. Closes-Bug: #1528560 Change-Id: I0b36ce098022560d7fe01623ce7b66d1d5b38d55 --- heat/db/api.py | 4 +++ heat/db/sqlalchemy/api.py | 7 ++++ heat/engine/check_resource.py | 17 +++++++--- heat/engine/resource.py | 26 ++++++++------- heat/engine/stack.py | 11 ++++--- heat/objects/resource.py | 4 +++ heat/tests/db/test_sqlalchemy_api.py | 9 +++++ heat/tests/engine/test_check_resource.py | 17 +++++++++- heat/tests/test_convg_stack.py | 10 ++++++ heat/tests/test_resource.py | 42 ++++++++++++++++++++++-- 10 files changed, 123 insertions(+), 24 deletions(-) diff --git a/heat/db/api.py b/heat/db/api.py index 73b4ecaade..5162202c4c 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -134,6 +134,10 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None): return IMPL.resource_get_all_by_root_stack(context, stack_id, filters) +def resource_purge_deleted(context, stack_id): + return IMPL.resource_purge_deleted(context, stack_id) + + def resource_get_by_name_and_stack(context, resource_name, stack_id): return IMPL.resource_get_by_name_and_stack(context, resource_name, stack_id) diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 66787b79c6..c2a5ebd86a 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -194,6 +194,13 @@ def resource_get_all(context): return results +def resource_purge_deleted(context, stack_id): + filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'} + query = context.session.query(models.Resource.id) + result = query.filter_by(**filters) + result.delete() + + def resource_update(context, resource_id, values, atomic_key, expected_engine_id=None): session = context.session diff --git a/heat/engine/check_resource.py b/heat/engine/check_resource.py index b14d5ab188..137e521610 100644 --- a/heat/engine/check_resource.py +++ b/heat/engine/check_resource.py @@ -75,7 +75,8 @@ class CheckResource(object): # possibility for that update to be waiting for this rsrc to # complete, hence retrigger current rsrc for latest traversal. traversal = stack.current_traversal - latest_stack = parser.Stack.load(cnxt, stack_id=stack.id) + latest_stack = parser.Stack.load(cnxt, stack_id=stack.id, + force_reload=True) if traversal != latest_stack.current_traversal: self._retrigger_check_resource(cnxt, is_update, rsrc_id, latest_stack) @@ -143,11 +144,18 @@ class CheckResource(object): graph = stack.convergence_dependencies.graph() key = (resource_id, is_update) if is_update: - # When re-triggering for a rsrc, we need to first check if update - # traversal is present for the rsrc in latest stack traversal, + # When re-trigger received for update in latest traversal, first + # check if update key is available in graph. # if No, then latest traversal is waiting for delete. if (resource_id, is_update) not in graph: key = (resource_id, not is_update) + else: + # When re-trigger received for delete in latest traversal, first + # check if update key is available in graph, + # if yes, then latest traversal is waiting for update. + if (resource_id, True) in graph: + # not is_update evaluates to True below, which means update + key = (resource_id, not is_update) LOG.info(_LI('Re-trigger resource: (%(key1)s, %(key2)s)'), {'key1': key[0], 'key2': key[1]}) predecessors = set(graph[key]) @@ -205,7 +213,8 @@ class CheckResource(object): # check the SyncPoint for the current node to determine if # it is ready. If it is, then retrigger the current node # with the appropriate data for the latest traversal. - stack = parser.Stack.load(cnxt, stack_id=rsrc.stack.id) + stack = parser.Stack.load(cnxt, stack_id=rsrc.stack.id, + force_reload=True) if current_traversal == stack.current_traversal: LOG.debug('[%s] Traversal sync point missing.', current_traversal) diff --git a/heat/engine/resource.py b/heat/engine/resource.py index b239a3d8ea..dfeefa2788 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -995,7 +995,8 @@ class Resource(object): def _needs_update(self, after, before, after_props, before_props, prev_resource, check_init_complete=True): - if self.status == self.FAILED: + if self.status == self.FAILED or (self.stack.convergence and ( + self.action, self.status) == (self.DELETE, self.COMPLETE)): raise exception.UpdateReplace(self) if check_init_complete and (self.action == self.INIT @@ -1455,22 +1456,23 @@ class Resource(object): replaced by more recent resource, then delete this and update the replacement resource's needed_by and replaces fields. """ - self._acquire(engine_id) - try: + with self.lock(engine_id): self.needed_by = list(set(v for v in input_data.values() if v is not None)) if self.current_template_id != template_id: - runner = scheduler.TaskRunner(self.destroy) - runner(timeout=timeout) + # just delete the resources in INIT state + if self.action == self.INIT: + try: + resource_objects.Resource.delete(self.context, self.id) + except exception.NotFound: + pass + else: + runner = scheduler.TaskRunner(self.delete) + runner(timeout=timeout) - # update needed_by and replaces of replacement resource - self._update_replacement_data(template_id) - else: - self._release(engine_id) - except: # noqa - with excutils.save_and_reraise_exception(): - self._release(engine_id) + # update needed_by and replaces of replacement resource + self._update_replacement_data(template_id) def handle_delete(self): """Default implementation; should be overridden by resources.""" diff --git a/heat/engine/stack.py b/heat/engine/stack.py index f84a629c1f..324f935911 100644 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -1972,13 +1972,16 @@ class Stack(collections.Mapping): def purge_db(self): """Cleanup database after stack has completed/failed. - 1. If the stack failed, update the current_traversal to empty string + 1. Delete the resources from DB. + 2. If the stack failed, update the current_traversal to empty string so that the resource workers bail out. - 2. Delete previous raw template if stack completes successfully. - 3. Deletes all sync points. They are no longer needed after stack + 3. Delete previous raw template if stack completes successfully. + 4. Deletes all sync points. They are no longer needed after stack has completed/failed. - 4. Delete the stack if the action is DELETE. + 5. Delete the stack if the action is DELETE. """ + resource_objects.Resource.purge_deleted(self.context, self.id) + exp_trvsl = self.current_traversal if self.status == self.FAILED: self.current_traversal = '' diff --git a/heat/objects/resource.py b/heat/objects/resource.py index 8ccb01d9b1..c0a9ca2437 100644 --- a/heat/objects/resource.py +++ b/heat/objects/resource.py @@ -172,6 +172,10 @@ class Resource( filters) return cls._resources_to_dict(context, resources_db) + @classmethod + def purge_deleted(cls, context, stack_id): + return db_api.resource_purge_deleted(context, stack_id) + @classmethod def get_by_name_and_stack(cls, context, resource_name, stack_id): resource_db = db_api.resource_get_by_name_and_stack( diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index 4288382f4d..ea01a94b4c 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -2345,6 +2345,15 @@ class DBAPIResourceTest(common.HeatTestCase): self.assertEqual({}, db_api.resource_get_all_by_root_stack( self.ctx, self.stack2.id)) + def test_resource_purge_deleted_by_stack(self): + val = {'name': 'res1', 'action': rsrc.Resource.DELETE, + 'status': rsrc.Resource.COMPLETE} + resource = create_resource(self.ctx, self.stack, **val) + + db_api.resource_purge_deleted(self.ctx, self.stack.id) + self.assertRaises(exception.NotFound, db_api.resource_get, + self.ctx, resource.id) + class DBAPIStackLockTest(common.HeatTestCase): def setUp(self): diff --git a/heat/tests/engine/test_check_resource.py b/heat/tests/engine/test_check_resource.py index 0051f29f3a..9a6e4838b8 100644 --- a/heat/tests/engine/test_check_resource.py +++ b/heat/tests/engine/test_check_resource.py @@ -353,7 +353,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): actual_predecessors = call_args[4] self.assertItemsEqual(expected_predecessors, actual_predecessors) - def test_retrigger_check_resource_new_traversal_deletes_rsrc( + def test_update_retrigger_check_resource_new_traversal_deletes_rsrc( self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): # mock dependencies to indicate a rsrc with id 2 is not present # in latest traversal @@ -368,6 +368,21 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): mock.ANY, (2, False), None, False, None) + def test_delete_retrigger_check_resource_new_traversal_updates_rsrc( + self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + # mock dependencies to indicate a rsrc with id 2 has an update + # in latest traversal + self.stack._convg_deps = dependencies.Dependencies([ + [(1, False), (1, True)], [(2, False), (2, True)]]) + # simulate rsrc 2 completing its delete for old traversal + # and calling rcr + self.cr._retrigger_check_resource(self.ctx, False, 2, self.stack) + # Ensure that pcr was called with proper delete traversal + mock_pcr.assert_called_once_with(self.ctx, mock.ANY, 2, + self.stack.current_traversal, + mock.ANY, (2, True), None, + True, None) + @mock.patch.object(stack.Stack, 'purge_db') def test_handle_failure(self, mock_purgedb, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): diff --git a/heat/tests/test_convg_stack.py b/heat/tests/test_convg_stack.py index 06ed17938f..893694c2e9 100644 --- a/heat/tests/test_convg_stack.py +++ b/heat/tests/test_convg_stack.py @@ -451,6 +451,16 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): stack.purge_db() self.assertTrue(mock_stack_delete.called) + @mock.patch.object(resource_objects.Resource, 'purge_deleted') + def test_purge_db_calls_rsrc_purge_deleted(self, mock_rsrc_purge_delete, + mock_cr): + stack = tools.get_stack('test_stack', utils.dummy_context(), + template=tools.string_template_five, + convergence=True) + stack.store() + stack.purge_db() + self.assertTrue(mock_rsrc_purge_delete.called) + def test_get_best_existing_db_resource(self, mock_cr): stack = tools.get_stack('test_stack', utils.dummy_context(), template=tools.string_template_five, diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py index 392b32c0f5..31f56a1bc3 100644 --- a/heat/tests/test_resource.py +++ b/heat/tests/test_resource.py @@ -1006,6 +1006,31 @@ class ResourceTest(common.HeatTestCase): self.assertRaises(exception.UpdateReplace, res._needs_update, tmpl, tmpl, prop, prop, res) + def test_need_update_in_create_failed_state_for_resource(self): + tmpl = rsrc_defn.ResourceDefinition('test_resource', + 'GenericResourceType', + {'Foo': 'abc'}) + res = generic_rsrc.ResourceWithProps('test_resource', tmpl, + self.stack) + res.update_allowed_properties = ('Foo',) + res.state_set(res.CREATE, res.FAILED) + prop = {'Foo': 'abc'} + self.assertRaises(exception.UpdateReplace, + res._needs_update, tmpl, tmpl, prop, prop, res) + + def test_convg_need_update_in_delete_complete_state_for_resource(self): + tmpl = rsrc_defn.ResourceDefinition('test_resource', + 'GenericResourceType', + {'Foo': 'abc'}) + res = generic_rsrc.ResourceWithProps('test_resource', tmpl, + self.stack) + res.update_allowed_properties = ('Foo',) + res.stack.convergence = True + res.state_set(res.DELETE, res.COMPLETE) + prop = {'Foo': 'abc'} + self.assertRaises(exception.UpdateReplace, + res._needs_update, tmpl, tmpl, prop, prop, res) + def test_update_fail_missing_req_prop(self): tmpl = rsrc_defn.ResourceDefinition('test_resource', 'GenericResourceType', @@ -2042,7 +2067,7 @@ class ResourceTest(common.HeatTestCase): self._assert_resource_lock(res.id, None, None) res.delete_convergence(2, {}, 'engine-007', 20) - mock_init.assert_called_once_with(res.destroy) + mock_init.assert_called_once_with(res.delete) mock_call.assert_called_once_with(timeout=20) self.assertTrue(res._update_replacement_data.called) @@ -2051,10 +2076,10 @@ class ResourceTest(common.HeatTestCase): res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res.current_template_id = 'same-template' res._store() - res.destroy = mock.Mock() + res.delete = mock.Mock() res.delete_convergence('same-template', {}, 'engine-007', self.dummy_timeout) - self.assertFalse(res.destroy.called) + self.assertFalse(res.delete.called) def test_delete_convergence_fail(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') @@ -2223,9 +2248,20 @@ class ResourceTest(common.HeatTestCase): test_obj.get.side_effect = AttributeError self.assertIsNone(res._show_resource()) + def test_delete_convergence_deletes_resource_in_init_state(self): + tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') + res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) + # action is INIT by default + res._store() + with mock.patch.object(resource_objects.Resource, + 'delete') as resource_del: + res.delete_convergence(1, {}, 'engine-007', 1) + resource_del.assert_called_once_with(res.context, res.id) + def test_delete_convergence_throws_timeout(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) + res.action = res.CREATE res._store() timeout = -1 # to emulate timeout self.assertRaises(scheduler.Timeout, res.delete_convergence,