Browse Source

Retry resource check if atomic key incremented

When updating a resource that hasn't changed, we didn't previously retry
the write when the atomic_key of the resource didn't match what we expect.
In addition to locking a resource to update it, the atomic key is also
incremented when modifying metadata and storing cached attribute values.
Apparently there is some mechanism that can cause this to happen in the
time between when the resource is loaded and when we attempt to update the
template ID &c. in the DB.

When the resource is not locked and its template ID hasn't changed since we
loaded it, we can assume that the update failed due to a mismatched atomic
key alone. Handle this case by sending another resource-check RPC message,
so that the operation check will be retried with fresh data from the DB.

Change-Id: I5afd5602096be54af5da256927fe828366dbd63b
Closes-Bug: #1763021
changes/48/564348/1
Zane Bitter 4 years ago
parent
commit
22eb2e2538
  1. 39
      heat/engine/check_resource.py
  2. 50
      heat/tests/engine/test_check_resource.py

39
heat/engine/check_resource.py

@ -56,16 +56,40 @@ class CheckResource(object):
self.msg_queue = msg_queue
self.input_data = input_data
def _try_steal_engine_lock(self, cnxt, resource_id):
def _stale_resource_needs_retry(self, cnxt, rsrc, prev_template_id):
"""Determine whether a resource needs retrying after failure to lock.
Return True if we need to retry the check operation because of a
failure to acquire the lock. This can be either because the engine
holding the lock is no longer working, or because no other engine had
locked the resource and the data was just out of date.
In the former case, the lock will be stolen and the resource status
changed to FAILED.
"""
fields = {'current_template_id', 'engine_id'}
rs_obj = resource_objects.Resource.get_obj(cnxt,
resource_id,
fields=('engine_id', ))
rsrc.id,
refresh=True,
fields=fields)
if rs_obj.engine_id not in (None, self.engine_id):
if not listener_client.EngineListenerClient(
rs_obj.engine_id).is_alive(cnxt):
# steal the lock.
rs_obj.update_and_save({'engine_id': None})
# set the resource state as failed
status_reason = ('Worker went down '
'during resource %s' % rsrc.action)
rsrc.state_set(rsrc.action,
rsrc.FAILED,
six.text_type(status_reason))
return True
elif (rs_obj.engine_id is None and
rs_obj.current_template_id == prev_template_id):
LOG.debug('Resource id=%d stale; retrying check')
return True
LOG.debug('Resource id=%d modified by another traversal')
return False
def _trigger_rollback(self, stack):
@ -135,6 +159,7 @@ class CheckResource(object):
def _do_check_resource(self, cnxt, current_traversal, tmpl, resource_data,
is_update, rsrc, stack, adopt_stack_data):
prev_template_id = rsrc.current_template_id
try:
if is_update:
try:
@ -155,14 +180,8 @@ class CheckResource(object):
return True
except exception.UpdateInProgress:
if self._try_steal_engine_lock(cnxt, rsrc.id):
if self._stale_resource_needs_retry(cnxt, rsrc, prev_template_id):
rpc_data = sync_point.serialize_input_data(self.input_data)
# set the resource state as failed
status_reason = ('Worker went down '
'during resource %s' % rsrc.action)
rsrc.state_set(rsrc.action,
rsrc.FAILED,
six.text_type(status_reason))
self._rpc_client.check_resource(cnxt,
rsrc.id,
current_traversal,

50
heat/tests/engine/test_check_resource.py

@ -15,6 +15,7 @@
import eventlet
import mock
import uuid
from oslo_config import cfg
@ -128,11 +129,11 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self.assertFalse(mock_pcr.called)
self.assertFalse(mock_csc.called)
@mock.patch.object(check_resource.CheckResource, '_try_steal_engine_lock')
@mock.patch.object(check_resource.CheckResource,
'_stale_resource_needs_retry')
@mock.patch.object(stack.Stack, 'time_remaining')
@mock.patch.object(resource.Resource, 'state_set')
def test_is_update_traversal_raise_update_inprogress(
self, mock_ss, tr, mock_tsl, mock_cru, mock_crc, mock_pcr,
self, tr, mock_tsl, mock_cru, mock_crc, mock_pcr,
mock_csc):
mock_cru.side_effect = exception.UpdateInProgress
self.worker.engine_id = 'some-thing-else'
@ -145,43 +146,64 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self.resource.stack.t.id,
{}, self.worker.engine_id,
mock.ANY, mock.ANY)
mock_ss.assert_called_once_with(self.resource.action,
resource.Resource.FAILED,
mock.ANY)
self.assertFalse(mock_crc.called)
self.assertFalse(mock_pcr.called)
self.assertFalse(mock_csc.called)
@mock.patch.object(resource.Resource, 'state_set')
def test_stale_resource_retry(
self, mock_ss, mock_cru, mock_crc, mock_pcr, mock_csc):
current_template_id = self.resource.current_template_id
res = self.cr._stale_resource_needs_retry(self.ctx,
self.resource,
current_template_id)
self.assertTrue(res)
mock_ss.assert_not_called()
@mock.patch.object(resource.Resource, 'state_set')
def test_try_steal_lock_alive(
self, mock_cru, mock_crc, mock_pcr, mock_csc):
res = self.cr._try_steal_engine_lock(self.ctx,
self.resource.id)
self, mock_ss, mock_cru, mock_crc, mock_pcr, mock_csc):
res = self.cr._stale_resource_needs_retry(self.ctx,
self.resource,
str(uuid.uuid4()))
self.assertFalse(res)
mock_ss.assert_not_called()
@mock.patch.object(check_resource.listener_client, 'EngineListenerClient')
@mock.patch.object(check_resource.resource_objects.Resource, 'get_obj')
@mock.patch.object(resource.Resource, 'state_set')
def test_try_steal_lock_dead(
self, mock_get, mock_elc, mock_cru, mock_crc, mock_pcr,
self, mock_ss, mock_get, mock_elc, mock_cru, mock_crc, mock_pcr,
mock_csc):
fake_res = mock.Mock()
fake_res.engine_id = 'some-thing-else'
mock_get.return_value = fake_res
mock_elc.return_value.is_alive.return_value = False
res = self.cr._try_steal_engine_lock(self.ctx,
self.resource.id)
current_template_id = self.resource.current_template_id
res = self.cr._stale_resource_needs_retry(self.ctx,
self.resource,
current_template_id)
self.assertTrue(res)
mock_ss.assert_called_once_with(self.resource.action,
resource.Resource.FAILED,
mock.ANY)
@mock.patch.object(check_resource.listener_client, 'EngineListenerClient')
@mock.patch.object(check_resource.resource_objects.Resource, 'get_obj')
@mock.patch.object(resource.Resource, 'state_set')
def test_try_steal_lock_not_dead(
self, mock_get, mock_elc, mock_cru, mock_crc, mock_pcr,
self, mock_ss, mock_get, mock_elc, mock_cru, mock_crc, mock_pcr,
mock_csc):
fake_res = mock.Mock()
fake_res.engine_id = self.worker.engine_id
mock_get.return_value = fake_res
mock_elc.return_value.is_alive.return_value = True
res = self.cr._try_steal_engine_lock(self.ctx, self.resource.id)
current_template_id = self.resource.current_template_id
res = self.cr._stale_resource_needs_retry(self.ctx,
self.resource,
current_template_id)
self.assertFalse(res)
mock_ss.assert_not_called()
@mock.patch.object(check_resource.CheckResource, '_trigger_rollback')
def test_resource_update_failure_sets_stack_state_as_failed(

Loading…
Cancel
Save