From 22eb2e253822c176a1316a85317ceeec610ed140 Mon Sep 17 00:00:00 2001 From: Zane Bitter Date: Wed, 25 Apr 2018 16:02:24 -0400 Subject: [PATCH] Retry resource check if atomic key incremented When updating a resource that hasn't changed, we didn't previously retry the write when the atomic_key of the resource didn't match what we expect. In addition to locking a resource to update it, the atomic key is also incremented when modifying metadata and storing cached attribute values. Apparently there is some mechanism that can cause this to happen in the time between when the resource is loaded and when we attempt to update the template ID &c. in the DB. When the resource is not locked and its template ID hasn't changed since we loaded it, we can assume that the update failed due to a mismatched atomic key alone. Handle this case by sending another resource-check RPC message, so that the operation check will be retried with fresh data from the DB. Change-Id: I5afd5602096be54af5da256927fe828366dbd63b Closes-Bug: #1763021 --- heat/engine/check_resource.py | 39 +++++++++++++----- heat/tests/engine/test_check_resource.py | 50 +++++++++++++++++------- 2 files changed, 65 insertions(+), 24 deletions(-) diff --git a/heat/engine/check_resource.py b/heat/engine/check_resource.py index 1cf7556b1b..3e2045bb3f 100644 --- a/heat/engine/check_resource.py +++ b/heat/engine/check_resource.py @@ -56,16 +56,40 @@ class CheckResource(object): self.msg_queue = msg_queue self.input_data = input_data - def _try_steal_engine_lock(self, cnxt, resource_id): + def _stale_resource_needs_retry(self, cnxt, rsrc, prev_template_id): + """Determine whether a resource needs retrying after failure to lock. + + Return True if we need to retry the check operation because of a + failure to acquire the lock. This can be either because the engine + holding the lock is no longer working, or because no other engine had + locked the resource and the data was just out of date. + + In the former case, the lock will be stolen and the resource status + changed to FAILED. + """ + fields = {'current_template_id', 'engine_id'} rs_obj = resource_objects.Resource.get_obj(cnxt, - resource_id, - fields=('engine_id', )) + rsrc.id, + refresh=True, + fields=fields) if rs_obj.engine_id not in (None, self.engine_id): if not listener_client.EngineListenerClient( rs_obj.engine_id).is_alive(cnxt): # steal the lock. rs_obj.update_and_save({'engine_id': None}) + + # set the resource state as failed + status_reason = ('Worker went down ' + 'during resource %s' % rsrc.action) + rsrc.state_set(rsrc.action, + rsrc.FAILED, + six.text_type(status_reason)) return True + elif (rs_obj.engine_id is None and + rs_obj.current_template_id == prev_template_id): + LOG.debug('Resource id=%d stale; retrying check') + return True + LOG.debug('Resource id=%d modified by another traversal') return False def _trigger_rollback(self, stack): @@ -135,6 +159,7 @@ class CheckResource(object): def _do_check_resource(self, cnxt, current_traversal, tmpl, resource_data, is_update, rsrc, stack, adopt_stack_data): + prev_template_id = rsrc.current_template_id try: if is_update: try: @@ -155,14 +180,8 @@ class CheckResource(object): return True except exception.UpdateInProgress: - if self._try_steal_engine_lock(cnxt, rsrc.id): + if self._stale_resource_needs_retry(cnxt, rsrc, prev_template_id): rpc_data = sync_point.serialize_input_data(self.input_data) - # set the resource state as failed - status_reason = ('Worker went down ' - 'during resource %s' % rsrc.action) - rsrc.state_set(rsrc.action, - rsrc.FAILED, - six.text_type(status_reason)) self._rpc_client.check_resource(cnxt, rsrc.id, current_traversal, diff --git a/heat/tests/engine/test_check_resource.py b/heat/tests/engine/test_check_resource.py index 7bc867f4a1..cb68f024f8 100644 --- a/heat/tests/engine/test_check_resource.py +++ b/heat/tests/engine/test_check_resource.py @@ -15,6 +15,7 @@ import eventlet import mock +import uuid from oslo_config import cfg @@ -128,11 +129,11 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): self.assertFalse(mock_pcr.called) self.assertFalse(mock_csc.called) - @mock.patch.object(check_resource.CheckResource, '_try_steal_engine_lock') + @mock.patch.object(check_resource.CheckResource, + '_stale_resource_needs_retry') @mock.patch.object(stack.Stack, 'time_remaining') - @mock.patch.object(resource.Resource, 'state_set') def test_is_update_traversal_raise_update_inprogress( - self, mock_ss, tr, mock_tsl, mock_cru, mock_crc, mock_pcr, + self, tr, mock_tsl, mock_cru, mock_crc, mock_pcr, mock_csc): mock_cru.side_effect = exception.UpdateInProgress self.worker.engine_id = 'some-thing-else' @@ -145,43 +146,64 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): self.resource.stack.t.id, {}, self.worker.engine_id, mock.ANY, mock.ANY) - mock_ss.assert_called_once_with(self.resource.action, - resource.Resource.FAILED, - mock.ANY) self.assertFalse(mock_crc.called) self.assertFalse(mock_pcr.called) self.assertFalse(mock_csc.called) + @mock.patch.object(resource.Resource, 'state_set') + def test_stale_resource_retry( + self, mock_ss, mock_cru, mock_crc, mock_pcr, mock_csc): + current_template_id = self.resource.current_template_id + res = self.cr._stale_resource_needs_retry(self.ctx, + self.resource, + current_template_id) + self.assertTrue(res) + mock_ss.assert_not_called() + + @mock.patch.object(resource.Resource, 'state_set') def test_try_steal_lock_alive( - self, mock_cru, mock_crc, mock_pcr, mock_csc): - res = self.cr._try_steal_engine_lock(self.ctx, - self.resource.id) + self, mock_ss, mock_cru, mock_crc, mock_pcr, mock_csc): + res = self.cr._stale_resource_needs_retry(self.ctx, + self.resource, + str(uuid.uuid4())) self.assertFalse(res) + mock_ss.assert_not_called() @mock.patch.object(check_resource.listener_client, 'EngineListenerClient') @mock.patch.object(check_resource.resource_objects.Resource, 'get_obj') + @mock.patch.object(resource.Resource, 'state_set') def test_try_steal_lock_dead( - self, mock_get, mock_elc, mock_cru, mock_crc, mock_pcr, + self, mock_ss, mock_get, mock_elc, mock_cru, mock_crc, mock_pcr, mock_csc): fake_res = mock.Mock() fake_res.engine_id = 'some-thing-else' mock_get.return_value = fake_res mock_elc.return_value.is_alive.return_value = False - res = self.cr._try_steal_engine_lock(self.ctx, - self.resource.id) + current_template_id = self.resource.current_template_id + res = self.cr._stale_resource_needs_retry(self.ctx, + self.resource, + current_template_id) self.assertTrue(res) + mock_ss.assert_called_once_with(self.resource.action, + resource.Resource.FAILED, + mock.ANY) @mock.patch.object(check_resource.listener_client, 'EngineListenerClient') @mock.patch.object(check_resource.resource_objects.Resource, 'get_obj') + @mock.patch.object(resource.Resource, 'state_set') def test_try_steal_lock_not_dead( - self, mock_get, mock_elc, mock_cru, mock_crc, mock_pcr, + self, mock_ss, mock_get, mock_elc, mock_cru, mock_crc, mock_pcr, mock_csc): fake_res = mock.Mock() fake_res.engine_id = self.worker.engine_id mock_get.return_value = fake_res mock_elc.return_value.is_alive.return_value = True - res = self.cr._try_steal_engine_lock(self.ctx, self.resource.id) + current_template_id = self.resource.current_template_id + res = self.cr._stale_resource_needs_retry(self.ctx, + self.resource, + current_template_id) self.assertFalse(res) + mock_ss.assert_not_called() @mock.patch.object(check_resource.CheckResource, '_trigger_rollback') def test_resource_update_failure_sets_stack_state_as_failed(