From 99b055b42357e2fae6006fe150c3c47c30dab1c0 Mon Sep 17 00:00:00 2001 From: Anant Patil Date: Fri, 16 Sep 2016 14:13:57 +0000 Subject: [PATCH] Re-trigger on update-replace It is found that the inter-leaving of lock when a update-replace of a resource is needed is the reason for new traversal not being triggered. Consider the order of events below: 1. A server is being updated. The worker locks the server resource. 2. A rollback is triggered because some one cancelled the stack. 3. As part of rollback, new update using old template is started. 4. The new update tries to take the lock but it has been already acquired in (1). The new update now expects that the when the old resource is done, it will re-trigger the new traversal. 5. The old update decides to create a new resource for replacement. The replacement resource is initiated for creation, a check_resource RPC call is made for new resource. 6. A worker, possibly in another engine, receives the call and then it bails out when it finds that there is a new traversal initiated (from 2). Now, there is no progress from here because it is expected (from 4) that there will be a re-trigger when the old resource is done. This change takes care of re-triggering the new traversal from worker when it finds that there is a new traversal and an update-replace. Note that this issue will not be seen when there is no update-replace because the old resource will finish (either fail or complete) and in the same thread it will find the new traversal and trigger it. Closes-Bug: #1625073 Change-Id: Icea5ba498ef8ca45cd85a9721937da2f4ac304e0 --- heat/engine/check_resource.py | 10 +++---- heat/engine/worker.py | 37 ++++++++++++++++++------ heat/tests/engine/test_check_resource.py | 20 ++++++------- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/heat/engine/check_resource.py b/heat/engine/check_resource.py index 1cd0bcaa79..4164e73a93 100644 --- a/heat/engine/check_resource.py +++ b/heat/engine/check_resource.py @@ -94,8 +94,8 @@ class CheckResource(object): latest_stack = parser.Stack.load(cnxt, stack_id=stack.id, force_reload=True) if traversal != latest_stack.current_traversal: - self._retrigger_check_resource(cnxt, is_update, rsrc_id, - latest_stack) + self.retrigger_check_resource(cnxt, is_update, rsrc_id, + latest_stack) def _handle_stack_timeout(self, cnxt, stack): failure_reason = u'Timed out' @@ -158,7 +158,7 @@ class CheckResource(object): return False - def _retrigger_check_resource(self, cnxt, is_update, resource_id, stack): + def retrigger_check_resource(self, cnxt, is_update, resource_id, stack): current_traversal = stack.current_traversal graph = stack.convergence_dependencies.graph() key = (resource_id, is_update) @@ -239,8 +239,8 @@ class CheckResource(object): current_traversal) return - self._retrigger_check_resource(cnxt, is_update, - resource_id, stack) + self.retrigger_check_resource(cnxt, is_update, + resource_id, stack) else: raise diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 70fea6bfe0..a5bbfdec26 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -131,6 +131,23 @@ class WorkerService(service.Service): return True + def _retrigger_replaced(self, is_update, rsrc, stack, msg_queue): + graph = stack.convergence_dependencies.graph() + key = (rsrc.id, is_update) + if key not in graph and rsrc.replaces is not None: + # This resource replaces old one and is not needed in + # current traversal. You need to mark the resource as + # DELETED so that it gets cleaned up in purge_db. + values = {'action': rsrc.DELETE} + db_api.resource_update_and_save(stack.context, rsrc.id, values) + # The old resource might be in the graph (a rollback case); + # just re-trigger it. + key = (rsrc.replaces, is_update) + cr = check_resource.CheckResource(self.engine_id, self._rpc_client, + self.thread_group_mgr, msg_queue) + cr.retrigger_check_resource(stack.context, is_update, key[0], + stack) + @context.request_context def check_resource(self, cnxt, resource_id, current_traversal, data, is_update, adopt_stack_data): @@ -146,18 +163,20 @@ class WorkerService(service.Service): if rsrc is None: return - if current_traversal != stack.current_traversal: - LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal) - return - msg_queue = eventlet.queue.LightQueue() try: self.thread_group_mgr.add_msg_queue(stack.id, msg_queue) - cr = check_resource.CheckResource(self.engine_id, self._rpc_client, - self.thread_group_mgr, msg_queue) - - cr.check(cnxt, resource_id, current_traversal, resource_data, - is_update, adopt_stack_data, rsrc, stack) + if current_traversal != stack.current_traversal: + LOG.debug('[%s] Traversal cancelled; re-trigerring.', + current_traversal) + self._retrigger_replaced(is_update, rsrc, stack, msg_queue) + else: + cr = check_resource.CheckResource(self.engine_id, + self._rpc_client, + self.thread_group_mgr, + msg_queue) + cr.check(cnxt, resource_id, current_traversal, resource_data, + is_update, adopt_stack_data, rsrc, stack) finally: self.thread_group_mgr.remove_msg_queue(None, stack.id, msg_queue) diff --git a/heat/tests/engine/test_check_resource.py b/heat/tests/engine/test_check_resource.py index bfe405398e..5d8054df52 100644 --- a/heat/tests/engine/test_check_resource.py +++ b/heat/tests/engine/test_check_resource.py @@ -77,12 +77,12 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]: self.assertFalse(mocked.called) + @mock.patch.object(worker.WorkerService, '_retrigger_replaced') def test_stale_traversal( - self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + self, mock_rnt, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): self.worker.check_resource(self.ctx, self.resource.id, 'stale-traversal', {}, True, None) - for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]: - self.assertFalse(mocked.called) + self.assertTrue(mock_rnt.called) def test_is_update_traversal( self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): @@ -320,7 +320,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): self.assertTrue(self.stack.purge_db.called) @mock.patch.object(check_resource.CheckResource, - '_retrigger_check_resource') + 'retrigger_check_resource') @mock.patch.object(stack.Stack, 'load') def test_initiate_propagate_rsrc_retriggers_check_rsrc_on_new_stack_update( self, mock_stack_load, mock_rcr, mock_cru, mock_crc, mock_pcr, @@ -368,8 +368,8 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): # A, B are predecessors to C when is_update is True expected_predecessors = {(self.stack['A'].id, True), (self.stack['B'].id, True)} - self.cr._retrigger_check_resource(self.ctx, self.is_update, - resC.id, self.stack) + self.cr.retrigger_check_resource(self.ctx, self.is_update, + resC.id, self.stack) mock_pcr.assert_called_once_with(self.ctx, mock.ANY, resC.id, self.stack.current_traversal, mock.ANY, (resC.id, True), None, @@ -386,7 +386,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): [(1, False), (1, True)], [(2, False), None]]) # simulate rsrc 2 completing its update for old traversal # and calling rcr - self.cr._retrigger_check_resource(self.ctx, True, 2, self.stack) + self.cr.retrigger_check_resource(self.ctx, True, 2, self.stack) # Ensure that pcr was called with proper delete traversal mock_pcr.assert_called_once_with(self.ctx, mock.ANY, 2, self.stack.current_traversal, @@ -401,7 +401,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): [(1, False), (1, True)], [(2, False), (2, True)]]) # simulate rsrc 2 completing its delete for old traversal # and calling rcr - self.cr._retrigger_check_resource(self.ctx, False, 2, self.stack) + self.cr.retrigger_check_resource(self.ctx, False, 2, self.stack) # Ensure that pcr was called with proper delete traversal mock_pcr.assert_called_once_with(self.ctx, mock.ANY, 2, self.stack.current_traversal, @@ -426,7 +426,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): @mock.patch.object(stack.Stack, 'purge_db') @mock.patch.object(stack.Stack, 'state_set') @mock.patch.object(check_resource.CheckResource, - '_retrigger_check_resource') + 'retrigger_check_resource') @mock.patch.object(check_resource.CheckResource, '_trigger_rollback') def test_handle_rsrc_failure_when_update_fails( self, mock_tr, mock_rcr, mock_ss, mock_pdb, mock_cru, mock_crc, @@ -444,7 +444,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): @mock.patch.object(stack.Stack, 'purge_db') @mock.patch.object(stack.Stack, 'state_set') @mock.patch.object(check_resource.CheckResource, - '_retrigger_check_resource') + 'retrigger_check_resource') @mock.patch.object(check_resource.CheckResource, '_trigger_rollback') def test_handle_rsrc_failure_when_update_fails_different_traversal( self, mock_tr, mock_rcr, mock_ss, mock_pdb, mock_cru, mock_crc,