Re-trigger on update-replace

It is found that the inter-leaving of lock when a update-replace of a
resource is needed is the reason for new traversal not being triggered.

Consider the order of events below:
1. A server is being updated. The worker locks the server resource.
2. A rollback is triggered because some one cancelled the stack.
3. As part of rollback, new update using old template is started.
4. The new update tries to take the lock but it has been already
acquired in (1). The new update now expects that the when the old
resource is done, it will re-trigger the new traversal.
5. The old update decides to create a new resource for replacement. The
replacement resource is initiated for creation, a check_resource RPC
call is made for new resource.
6. A worker, possibly in another engine, receives the call and then it
bails out when it finds that there is a new traversal initiated (from
2). Now, there is no progress from here because it is expected (from 4)
that there will be a re-trigger when the old resource is done.

This change takes care of re-triggering the new traversal from worker
when it finds that there is a new traversal and an update-replace. Note
that this issue will not be seen when there is no update-replace
because the old resource will finish (either fail or complete) and in
the same thread it will find the new traversal and trigger it.

Closes-Bug: #1625073
Change-Id: Icea5ba498ef8ca45cd85a9721937da2f4ac304e0
changes/72/371572/3
Anant Patil 6 years ago
parent 4ec6192e76
commit 99b055b423
  1. 10
      heat/engine/check_resource.py
  2. 37
      heat/engine/worker.py
  3. 20
      heat/tests/engine/test_check_resource.py

@ -94,8 +94,8 @@ class CheckResource(object):
latest_stack = parser.Stack.load(cnxt, stack_id=stack.id,
force_reload=True)
if traversal != latest_stack.current_traversal:
self._retrigger_check_resource(cnxt, is_update, rsrc_id,
latest_stack)
self.retrigger_check_resource(cnxt, is_update, rsrc_id,
latest_stack)
def _handle_stack_timeout(self, cnxt, stack):
failure_reason = u'Timed out'
@ -158,7 +158,7 @@ class CheckResource(object):
return False
def _retrigger_check_resource(self, cnxt, is_update, resource_id, stack):
def retrigger_check_resource(self, cnxt, is_update, resource_id, stack):
current_traversal = stack.current_traversal
graph = stack.convergence_dependencies.graph()
key = (resource_id, is_update)
@ -239,8 +239,8 @@ class CheckResource(object):
current_traversal)
return
self._retrigger_check_resource(cnxt, is_update,
resource_id, stack)
self.retrigger_check_resource(cnxt, is_update,
resource_id, stack)
else:
raise

@ -131,6 +131,23 @@ class WorkerService(service.Service):
return True
def _retrigger_replaced(self, is_update, rsrc, stack, msg_queue):
graph = stack.convergence_dependencies.graph()
key = (rsrc.id, is_update)
if key not in graph and rsrc.replaces is not None:
# This resource replaces old one and is not needed in
# current traversal. You need to mark the resource as
# DELETED so that it gets cleaned up in purge_db.
values = {'action': rsrc.DELETE}
db_api.resource_update_and_save(stack.context, rsrc.id, values)
# The old resource might be in the graph (a rollback case);
# just re-trigger it.
key = (rsrc.replaces, is_update)
cr = check_resource.CheckResource(self.engine_id, self._rpc_client,
self.thread_group_mgr, msg_queue)
cr.retrigger_check_resource(stack.context, is_update, key[0],
stack)
@context.request_context
def check_resource(self, cnxt, resource_id, current_traversal, data,
is_update, adopt_stack_data):
@ -146,18 +163,20 @@ class WorkerService(service.Service):
if rsrc is None:
return
if current_traversal != stack.current_traversal:
LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal)
return
msg_queue = eventlet.queue.LightQueue()
try:
self.thread_group_mgr.add_msg_queue(stack.id, msg_queue)
cr = check_resource.CheckResource(self.engine_id, self._rpc_client,
self.thread_group_mgr, msg_queue)
cr.check(cnxt, resource_id, current_traversal, resource_data,
is_update, adopt_stack_data, rsrc, stack)
if current_traversal != stack.current_traversal:
LOG.debug('[%s] Traversal cancelled; re-trigerring.',
current_traversal)
self._retrigger_replaced(is_update, rsrc, stack, msg_queue)
else:
cr = check_resource.CheckResource(self.engine_id,
self._rpc_client,
self.thread_group_mgr,
msg_queue)
cr.check(cnxt, resource_id, current_traversal, resource_data,
is_update, adopt_stack_data, rsrc, stack)
finally:
self.thread_group_mgr.remove_msg_queue(None,
stack.id, msg_queue)

@ -77,12 +77,12 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]:
self.assertFalse(mocked.called)
@mock.patch.object(worker.WorkerService, '_retrigger_replaced')
def test_stale_traversal(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self, mock_rnt, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self.worker.check_resource(self.ctx, self.resource.id,
'stale-traversal', {}, True, None)
for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]:
self.assertFalse(mocked.called)
self.assertTrue(mock_rnt.called)
def test_is_update_traversal(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
@ -320,7 +320,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self.assertTrue(self.stack.purge_db.called)
@mock.patch.object(check_resource.CheckResource,
'_retrigger_check_resource')
'retrigger_check_resource')
@mock.patch.object(stack.Stack, 'load')
def test_initiate_propagate_rsrc_retriggers_check_rsrc_on_new_stack_update(
self, mock_stack_load, mock_rcr, mock_cru, mock_crc, mock_pcr,
@ -368,8 +368,8 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
# A, B are predecessors to C when is_update is True
expected_predecessors = {(self.stack['A'].id, True),
(self.stack['B'].id, True)}
self.cr._retrigger_check_resource(self.ctx, self.is_update,
resC.id, self.stack)
self.cr.retrigger_check_resource(self.ctx, self.is_update,
resC.id, self.stack)
mock_pcr.assert_called_once_with(self.ctx, mock.ANY, resC.id,
self.stack.current_traversal,
mock.ANY, (resC.id, True), None,
@ -386,7 +386,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
[(1, False), (1, True)], [(2, False), None]])
# simulate rsrc 2 completing its update for old traversal
# and calling rcr
self.cr._retrigger_check_resource(self.ctx, True, 2, self.stack)
self.cr.retrigger_check_resource(self.ctx, True, 2, self.stack)
# Ensure that pcr was called with proper delete traversal
mock_pcr.assert_called_once_with(self.ctx, mock.ANY, 2,
self.stack.current_traversal,
@ -401,7 +401,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
[(1, False), (1, True)], [(2, False), (2, True)]])
# simulate rsrc 2 completing its delete for old traversal
# and calling rcr
self.cr._retrigger_check_resource(self.ctx, False, 2, self.stack)
self.cr.retrigger_check_resource(self.ctx, False, 2, self.stack)
# Ensure that pcr was called with proper delete traversal
mock_pcr.assert_called_once_with(self.ctx, mock.ANY, 2,
self.stack.current_traversal,
@ -426,7 +426,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
@mock.patch.object(stack.Stack, 'purge_db')
@mock.patch.object(stack.Stack, 'state_set')
@mock.patch.object(check_resource.CheckResource,
'_retrigger_check_resource')
'retrigger_check_resource')
@mock.patch.object(check_resource.CheckResource, '_trigger_rollback')
def test_handle_rsrc_failure_when_update_fails(
self, mock_tr, mock_rcr, mock_ss, mock_pdb, mock_cru, mock_crc,
@ -444,7 +444,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
@mock.patch.object(stack.Stack, 'purge_db')
@mock.patch.object(stack.Stack, 'state_set')
@mock.patch.object(check_resource.CheckResource,
'_retrigger_check_resource')
'retrigger_check_resource')
@mock.patch.object(check_resource.CheckResource, '_trigger_rollback')
def test_handle_rsrc_failure_when_update_fails_different_traversal(
self, mock_tr, mock_rcr, mock_ss, mock_pdb, mock_cru, mock_crc,

Loading…
Cancel
Save