Convergence: Fix concurrent update resource delete

In convergence, wherein concurrent updates are possible, if a resource
is deleted (by previous traversal) after dependency graph is created
for new traversal, the resource remains in graph but wouldn't be
available in DB for processing.
It is prerequisite to have resources in DB before any action can be
taken on them.

Hence during convergence resource delete action, the resource entry
from DB is not deleted i.e soft deleted, so that the latest/new update
can find the entry.
All of these soft deleted resources will be deleted when the stack has
completed its operation.

Closes-Bug: #1528560
Change-Id: I0b36ce098022560d7fe01623ce7b66d1d5b38d55
This commit is contained in:
Rakesh H S 2016-02-12 15:41:29 +05:30 committed by Anant Patil
parent 0272577c9e
commit 906a0ed6fd
10 changed files with 123 additions and 24 deletions

View File

@ -134,6 +134,10 @@ def resource_get_all_by_root_stack(context, stack_id, filters=None):
return IMPL.resource_get_all_by_root_stack(context, stack_id, filters) return IMPL.resource_get_all_by_root_stack(context, stack_id, filters)
def resource_purge_deleted(context, stack_id):
return IMPL.resource_purge_deleted(context, stack_id)
def resource_get_by_name_and_stack(context, resource_name, stack_id): def resource_get_by_name_and_stack(context, resource_name, stack_id):
return IMPL.resource_get_by_name_and_stack(context, return IMPL.resource_get_by_name_and_stack(context,
resource_name, stack_id) resource_name, stack_id)

View File

@ -194,6 +194,13 @@ def resource_get_all(context):
return results return results
def resource_purge_deleted(context, stack_id):
filters = {'stack_id': stack_id, 'action': 'DELETE', 'status': 'COMPLETE'}
query = context.session.query(models.Resource.id)
result = query.filter_by(**filters)
result.delete()
def resource_update(context, resource_id, values, atomic_key, def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None): expected_engine_id=None):
session = context.session session = context.session

View File

@ -75,7 +75,8 @@ class CheckResource(object):
# possibility for that update to be waiting for this rsrc to # possibility for that update to be waiting for this rsrc to
# complete, hence retrigger current rsrc for latest traversal. # complete, hence retrigger current rsrc for latest traversal.
traversal = stack.current_traversal traversal = stack.current_traversal
latest_stack = parser.Stack.load(cnxt, stack_id=stack.id) latest_stack = parser.Stack.load(cnxt, stack_id=stack.id,
force_reload=True)
if traversal != latest_stack.current_traversal: if traversal != latest_stack.current_traversal:
self._retrigger_check_resource(cnxt, is_update, rsrc_id, self._retrigger_check_resource(cnxt, is_update, rsrc_id,
latest_stack) latest_stack)
@ -143,11 +144,18 @@ class CheckResource(object):
graph = stack.convergence_dependencies.graph() graph = stack.convergence_dependencies.graph()
key = (resource_id, is_update) key = (resource_id, is_update)
if is_update: if is_update:
# When re-triggering for a rsrc, we need to first check if update # When re-trigger received for update in latest traversal, first
# traversal is present for the rsrc in latest stack traversal, # check if update key is available in graph.
# if No, then latest traversal is waiting for delete. # if No, then latest traversal is waiting for delete.
if (resource_id, is_update) not in graph: if (resource_id, is_update) not in graph:
key = (resource_id, not is_update) key = (resource_id, not is_update)
else:
# When re-trigger received for delete in latest traversal, first
# check if update key is available in graph,
# if yes, then latest traversal is waiting for update.
if (resource_id, True) in graph:
# not is_update evaluates to True below, which means update
key = (resource_id, not is_update)
LOG.info(_LI('Re-trigger resource: (%(key1)s, %(key2)s)'), LOG.info(_LI('Re-trigger resource: (%(key1)s, %(key2)s)'),
{'key1': key[0], 'key2': key[1]}) {'key1': key[0], 'key2': key[1]})
predecessors = set(graph[key]) predecessors = set(graph[key])
@ -205,7 +213,8 @@ class CheckResource(object):
# check the SyncPoint for the current node to determine if # check the SyncPoint for the current node to determine if
# it is ready. If it is, then retrigger the current node # it is ready. If it is, then retrigger the current node
# with the appropriate data for the latest traversal. # with the appropriate data for the latest traversal.
stack = parser.Stack.load(cnxt, stack_id=rsrc.stack.id) stack = parser.Stack.load(cnxt, stack_id=rsrc.stack.id,
force_reload=True)
if current_traversal == stack.current_traversal: if current_traversal == stack.current_traversal:
LOG.debug('[%s] Traversal sync point missing.', LOG.debug('[%s] Traversal sync point missing.',
current_traversal) current_traversal)

View File

@ -995,7 +995,8 @@ class Resource(object):
def _needs_update(self, after, before, after_props, before_props, def _needs_update(self, after, before, after_props, before_props,
prev_resource, check_init_complete=True): prev_resource, check_init_complete=True):
if self.status == self.FAILED: if self.status == self.FAILED or (self.stack.convergence and (
self.action, self.status) == (self.DELETE, self.COMPLETE)):
raise exception.UpdateReplace(self) raise exception.UpdateReplace(self)
if check_init_complete and (self.action == self.INIT if check_init_complete and (self.action == self.INIT
@ -1455,22 +1456,23 @@ class Resource(object):
replaced by more recent resource, then delete this and update replaced by more recent resource, then delete this and update
the replacement resource's needed_by and replaces fields. the replacement resource's needed_by and replaces fields.
""" """
self._acquire(engine_id) with self.lock(engine_id):
try:
self.needed_by = list(set(v for v in input_data.values() self.needed_by = list(set(v for v in input_data.values()
if v is not None)) if v is not None))
if self.current_template_id != template_id: if self.current_template_id != template_id:
runner = scheduler.TaskRunner(self.destroy) # just delete the resources in INIT state
runner(timeout=timeout) if self.action == self.INIT:
try:
resource_objects.Resource.delete(self.context, self.id)
except exception.NotFound:
pass
else:
runner = scheduler.TaskRunner(self.delete)
runner(timeout=timeout)
# update needed_by and replaces of replacement resource # update needed_by and replaces of replacement resource
self._update_replacement_data(template_id) self._update_replacement_data(template_id)
else:
self._release(engine_id)
except: # noqa
with excutils.save_and_reraise_exception():
self._release(engine_id)
def handle_delete(self): def handle_delete(self):
"""Default implementation; should be overridden by resources.""" """Default implementation; should be overridden by resources."""

View File

@ -1972,13 +1972,16 @@ class Stack(collections.Mapping):
def purge_db(self): def purge_db(self):
"""Cleanup database after stack has completed/failed. """Cleanup database after stack has completed/failed.
1. If the stack failed, update the current_traversal to empty string 1. Delete the resources from DB.
2. If the stack failed, update the current_traversal to empty string
so that the resource workers bail out. so that the resource workers bail out.
2. Delete previous raw template if stack completes successfully. 3. Delete previous raw template if stack completes successfully.
3. Deletes all sync points. They are no longer needed after stack 4. Deletes all sync points. They are no longer needed after stack
has completed/failed. has completed/failed.
4. Delete the stack if the action is DELETE. 5. Delete the stack if the action is DELETE.
""" """
resource_objects.Resource.purge_deleted(self.context, self.id)
exp_trvsl = self.current_traversal exp_trvsl = self.current_traversal
if self.status == self.FAILED: if self.status == self.FAILED:
self.current_traversal = '' self.current_traversal = ''

View File

@ -172,6 +172,10 @@ class Resource(
filters) filters)
return cls._resources_to_dict(context, resources_db) return cls._resources_to_dict(context, resources_db)
@classmethod
def purge_deleted(cls, context, stack_id):
return db_api.resource_purge_deleted(context, stack_id)
@classmethod @classmethod
def get_by_name_and_stack(cls, context, resource_name, stack_id): def get_by_name_and_stack(cls, context, resource_name, stack_id):
resource_db = db_api.resource_get_by_name_and_stack( resource_db = db_api.resource_get_by_name_and_stack(

View File

@ -2345,6 +2345,15 @@ class DBAPIResourceTest(common.HeatTestCase):
self.assertEqual({}, db_api.resource_get_all_by_root_stack( self.assertEqual({}, db_api.resource_get_all_by_root_stack(
self.ctx, self.stack2.id)) self.ctx, self.stack2.id))
def test_resource_purge_deleted_by_stack(self):
val = {'name': 'res1', 'action': rsrc.Resource.DELETE,
'status': rsrc.Resource.COMPLETE}
resource = create_resource(self.ctx, self.stack, **val)
db_api.resource_purge_deleted(self.ctx, self.stack.id)
self.assertRaises(exception.NotFound, db_api.resource_get,
self.ctx, resource.id)
class DBAPIStackLockTest(common.HeatTestCase): class DBAPIStackLockTest(common.HeatTestCase):
def setUp(self): def setUp(self):

View File

@ -353,7 +353,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
actual_predecessors = call_args[4] actual_predecessors = call_args[4]
self.assertItemsEqual(expected_predecessors, actual_predecessors) self.assertItemsEqual(expected_predecessors, actual_predecessors)
def test_retrigger_check_resource_new_traversal_deletes_rsrc( def test_update_retrigger_check_resource_new_traversal_deletes_rsrc(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
# mock dependencies to indicate a rsrc with id 2 is not present # mock dependencies to indicate a rsrc with id 2 is not present
# in latest traversal # in latest traversal
@ -368,6 +368,21 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
mock.ANY, (2, False), None, mock.ANY, (2, False), None,
False, None) False, None)
def test_delete_retrigger_check_resource_new_traversal_updates_rsrc(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
# mock dependencies to indicate a rsrc with id 2 has an update
# in latest traversal
self.stack._convg_deps = dependencies.Dependencies([
[(1, False), (1, True)], [(2, False), (2, True)]])
# simulate rsrc 2 completing its delete for old traversal
# and calling rcr
self.cr._retrigger_check_resource(self.ctx, False, 2, self.stack)
# Ensure that pcr was called with proper delete traversal
mock_pcr.assert_called_once_with(self.ctx, mock.ANY, 2,
self.stack.current_traversal,
mock.ANY, (2, True), None,
True, None)
@mock.patch.object(stack.Stack, 'purge_db') @mock.patch.object(stack.Stack, 'purge_db')
def test_handle_failure(self, mock_purgedb, mock_cru, mock_crc, mock_pcr, def test_handle_failure(self, mock_purgedb, mock_cru, mock_crc, mock_pcr,
mock_csc, mock_cid): mock_csc, mock_cid):

View File

@ -451,6 +451,16 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
stack.purge_db() stack.purge_db()
self.assertTrue(mock_stack_delete.called) self.assertTrue(mock_stack_delete.called)
@mock.patch.object(resource_objects.Resource, 'purge_deleted')
def test_purge_db_calls_rsrc_purge_deleted(self, mock_rsrc_purge_delete,
mock_cr):
stack = tools.get_stack('test_stack', utils.dummy_context(),
template=tools.string_template_five,
convergence=True)
stack.store()
stack.purge_db()
self.assertTrue(mock_rsrc_purge_delete.called)
def test_get_best_existing_db_resource(self, mock_cr): def test_get_best_existing_db_resource(self, mock_cr):
stack = tools.get_stack('test_stack', utils.dummy_context(), stack = tools.get_stack('test_stack', utils.dummy_context(),
template=tools.string_template_five, template=tools.string_template_five,

View File

@ -1006,6 +1006,31 @@ class ResourceTest(common.HeatTestCase):
self.assertRaises(exception.UpdateReplace, self.assertRaises(exception.UpdateReplace,
res._needs_update, tmpl, tmpl, prop, prop, res) res._needs_update, tmpl, tmpl, prop, prop, res)
def test_need_update_in_create_failed_state_for_resource(self):
tmpl = rsrc_defn.ResourceDefinition('test_resource',
'GenericResourceType',
{'Foo': 'abc'})
res = generic_rsrc.ResourceWithProps('test_resource', tmpl,
self.stack)
res.update_allowed_properties = ('Foo',)
res.state_set(res.CREATE, res.FAILED)
prop = {'Foo': 'abc'}
self.assertRaises(exception.UpdateReplace,
res._needs_update, tmpl, tmpl, prop, prop, res)
def test_convg_need_update_in_delete_complete_state_for_resource(self):
tmpl = rsrc_defn.ResourceDefinition('test_resource',
'GenericResourceType',
{'Foo': 'abc'})
res = generic_rsrc.ResourceWithProps('test_resource', tmpl,
self.stack)
res.update_allowed_properties = ('Foo',)
res.stack.convergence = True
res.state_set(res.DELETE, res.COMPLETE)
prop = {'Foo': 'abc'}
self.assertRaises(exception.UpdateReplace,
res._needs_update, tmpl, tmpl, prop, prop, res)
def test_update_fail_missing_req_prop(self): def test_update_fail_missing_req_prop(self):
tmpl = rsrc_defn.ResourceDefinition('test_resource', tmpl = rsrc_defn.ResourceDefinition('test_resource',
'GenericResourceType', 'GenericResourceType',
@ -2042,7 +2067,7 @@ class ResourceTest(common.HeatTestCase):
self._assert_resource_lock(res.id, None, None) self._assert_resource_lock(res.id, None, None)
res.delete_convergence(2, {}, 'engine-007', 20) res.delete_convergence(2, {}, 'engine-007', 20)
mock_init.assert_called_once_with(res.destroy) mock_init.assert_called_once_with(res.delete)
mock_call.assert_called_once_with(timeout=20) mock_call.assert_called_once_with(timeout=20)
self.assertTrue(res._update_replacement_data.called) self.assertTrue(res._update_replacement_data.called)
@ -2051,10 +2076,10 @@ class ResourceTest(common.HeatTestCase):
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.current_template_id = 'same-template' res.current_template_id = 'same-template'
res._store() res._store()
res.destroy = mock.Mock() res.delete = mock.Mock()
res.delete_convergence('same-template', {}, 'engine-007', res.delete_convergence('same-template', {}, 'engine-007',
self.dummy_timeout) self.dummy_timeout)
self.assertFalse(res.destroy.called) self.assertFalse(res.delete.called)
def test_delete_convergence_fail(self): def test_delete_convergence_fail(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@ -2223,9 +2248,20 @@ class ResourceTest(common.HeatTestCase):
test_obj.get.side_effect = AttributeError test_obj.get.side_effect = AttributeError
self.assertIsNone(res._show_resource()) self.assertIsNone(res._show_resource())
def test_delete_convergence_deletes_resource_in_init_state(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
# action is INIT by default
res._store()
with mock.patch.object(resource_objects.Resource,
'delete') as resource_del:
res.delete_convergence(1, {}, 'engine-007', 1)
resource_del.assert_called_once_with(res.context, res.id)
def test_delete_convergence_throws_timeout(self): def test_delete_convergence_throws_timeout(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.action = res.CREATE
res._store() res._store()
timeout = -1 # to emulate timeout timeout = -1 # to emulate timeout
self.assertRaises(scheduler.Timeout, res.delete_convergence, self.assertRaises(scheduler.Timeout, res.delete_convergence,