Convergence: Fix for resource needed_by
When resources are replaced, the needed_by needes to be updated. When the resources are visited in clean-up phase - after all the updates are done - new needed_by data is sent to the resources. Change-Id: Ib3bff461ab4cdd43391c7fcdfff6d8eb17fe2555 Co-Authored-By: Rakesh HS <rh-s@hp.com>
This commit is contained in:
parent
b2f9ec3287
commit
5a299b1626
|
@ -359,10 +359,12 @@ class Resource(object):
|
|||
self._rsrc_metadata = metadata
|
||||
|
||||
@classmethod
|
||||
def set_needed_by(cls, db_rsrc, needed_by):
|
||||
def set_needed_by(cls, db_rsrc, needed_by, expected_engine_id=None):
|
||||
if db_rsrc:
|
||||
db_rsrc.update_and_save(
|
||||
{'needed_by': needed_by}
|
||||
db_rsrc.select_and_update(
|
||||
{'needed_by': needed_by},
|
||||
atomic_key=db_rsrc.atomic_key,
|
||||
expected_engine_id=expected_engine_id
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
@ -1077,14 +1079,52 @@ class Resource(object):
|
|||
msg = _('"%s" deletion policy not supported') % policy
|
||||
raise exception.StackValidationFailed(message=msg)
|
||||
|
||||
def delete_convergence(self, engine_id):
|
||||
'''
|
||||
Destroys the resource. The destroy task is run in a scheduler
|
||||
TaskRunner after acquiring the lock on resource.
|
||||
def _update_replacement_data(self, template_id):
|
||||
# Udate the replacement resource's needed_by and replaces
|
||||
# fields. Make sure that the replacement belongs to the given
|
||||
# template and there is no engine is working on it.
|
||||
if self.replaced_by is None:
|
||||
return
|
||||
|
||||
try:
|
||||
db_res = resource_objects.Resource.get_obj(
|
||||
self.context, self.replaced_by)
|
||||
except exception.NotFound:
|
||||
LOG.info(_LI("Could not find replacement of resource %{name} "
|
||||
"with id %{id} while updating needed_by."),
|
||||
{'name': self.name, 'id': self.replaced_by})
|
||||
return
|
||||
|
||||
if (db_res.current_template_id == template_id):
|
||||
# Following update failure is ignorable; another
|
||||
# update might have locked/updated the resource.
|
||||
db_res.select_and_update(
|
||||
{'needed_by': self.needed_by,
|
||||
'replaces': None},
|
||||
atomic_key=db_res.atomic_key,
|
||||
expected_engine_id=None
|
||||
)
|
||||
|
||||
def delete_convergence(self, template_id, input_data, engine_id):
|
||||
'''Destroys the resource if it doesn't belong to given
|
||||
template. The given template is suppose to be the current
|
||||
template being provisioned.
|
||||
|
||||
Also, since this resource is visited as part of clean-up phase,
|
||||
the needed_by should be updated. If this resource was
|
||||
replaced by more recent resource, then delete this and update
|
||||
the replacement resource's needed_by and replaces fields.
|
||||
'''
|
||||
with self.lock(engine_id):
|
||||
runner = scheduler.TaskRunner(self.destroy)
|
||||
runner()
|
||||
self.needed_by = list(set(v for v in input_data.values()
|
||||
if v is not None))
|
||||
|
||||
if self.current_template_id != template_id:
|
||||
runner = scheduler.TaskRunner(self.destroy)
|
||||
runner()
|
||||
|
||||
# update needed_by and replaces of replacement resource
|
||||
self._update_replacement_data(template_id)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def delete(self):
|
||||
|
@ -1262,7 +1302,8 @@ class Resource(object):
|
|||
{'engine_id': None,
|
||||
'current_template_id': self.current_template_id,
|
||||
'updated_at': self.updated_time,
|
||||
'requires': self.requires},
|
||||
'requires': self.requires,
|
||||
'needed_by': self.needed_by},
|
||||
expected_engine_id=engine_id,
|
||||
atomic_key=atomic_key + 1)
|
||||
|
||||
|
|
|
@ -119,9 +119,15 @@ class WorkerService(service.Service):
|
|||
def _load_resource(self, cnxt, resource_id, data, is_update):
|
||||
adopt_data = data.get('adopt_stack_data')
|
||||
data = dict(sync_point.deserialize_input_data(data))
|
||||
cache_data = {in_data.get(
|
||||
'name'): in_data for in_data in data.values()
|
||||
if in_data is not None}
|
||||
|
||||
if is_update:
|
||||
cache_data = {in_data.get(
|
||||
'name'): in_data for in_data in data.values()
|
||||
if in_data is not None}
|
||||
else:
|
||||
# no data to resolve in cleanup phase
|
||||
cache_data = {}
|
||||
|
||||
cache_data['adopt_stack_data'] = adopt_data
|
||||
rsrc, stack = None, None
|
||||
try:
|
||||
|
@ -190,9 +196,6 @@ class WorkerService(service.Service):
|
|||
def _initiate_propagate_resource(self, cnxt, resource_id,
|
||||
current_traversal, is_update, rsrc,
|
||||
stack):
|
||||
input_data = None
|
||||
if is_update:
|
||||
input_data = construct_input_data(rsrc)
|
||||
deps = self._compute_dependencies(stack)
|
||||
graph = deps.graph()
|
||||
graph_key = (resource_id, is_update)
|
||||
|
@ -206,12 +209,25 @@ class WorkerService(service.Service):
|
|||
# for the next traversal.
|
||||
graph_key = (rsrc.replaces, is_update)
|
||||
|
||||
def _get_input_data(req, fwd):
|
||||
if fwd:
|
||||
return construct_input_data(rsrc)
|
||||
else:
|
||||
# Don't send data if initiating clean-up for self i.e.
|
||||
# initiating delete of a replaced resource
|
||||
if req not in graph_key:
|
||||
# send replaced resource as needed_by if it exists
|
||||
return (rsrc.replaced_by
|
||||
if rsrc.replaced_by is not None
|
||||
else resource_id)
|
||||
return None
|
||||
|
||||
try:
|
||||
for req, fwd in deps.required_by(graph_key):
|
||||
input_data = _get_input_data(req, fwd)
|
||||
propagate_check_resource(
|
||||
cnxt, self._rpc_client, req, current_traversal,
|
||||
set(graph[(req, fwd)]), graph_key,
|
||||
input_data if fwd else None, fwd)
|
||||
set(graph[(req, fwd)]), graph_key, input_data, fwd)
|
||||
|
||||
check_stack_complete(cnxt, stack, current_traversal,
|
||||
resource_id, deps, is_update)
|
||||
|
@ -338,6 +354,4 @@ def check_resource_cleanup(rsrc, template_id, data, engine_id):
|
|||
'''
|
||||
Delete the Resource if appropriate.
|
||||
'''
|
||||
|
||||
if rsrc.current_template_id != template_id:
|
||||
rsrc.delete_convergence(engine_id)
|
||||
rsrc.delete_convergence(template_id, data, engine_id)
|
||||
|
|
|
@ -542,9 +542,3 @@ class MiscMethodsTest(common.HeatTestCase):
|
|||
worker.check_resource_cleanup(self.resource, self.resource.stack.t.id,
|
||||
{}, 'engine-id')
|
||||
self.assertTrue(mock_delete.called)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'delete_convergence')
|
||||
def test_check_resource_cleanup_nodelete(self, mock_delete):
|
||||
worker.check_resource_cleanup(self.resource, self.resource.stack.t.id,
|
||||
{}, 'engine-id')
|
||||
self.assertFalse(mock_delete.called)
|
||||
|
|
|
@ -1556,27 +1556,81 @@ class ResourceTest(common.HeatTestCase):
|
|||
def test_delete_convergence(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
res.requires = [1, 2]
|
||||
res.current_template_id = 1
|
||||
res._store()
|
||||
res.destroy = mock.Mock()
|
||||
res._update_replacement_data = mock.Mock()
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
res.delete_convergence('engine-007')
|
||||
res.delete_convergence(2, {}, 'engine-007')
|
||||
self.assertTrue(res.destroy.called)
|
||||
self.assertTrue(res._update_replacement_data.called)
|
||||
|
||||
def test_delete_convergence_does_not_delete_same_template_resource(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
res.current_template_id = 'same-template'
|
||||
res._store()
|
||||
res.destroy = mock.Mock()
|
||||
res.delete_convergence('same-template', {}, 'engine-007')
|
||||
self.assertFalse(res.destroy.called)
|
||||
|
||||
def test_delete_in_progress_convergence(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
res.current_template_id = 1
|
||||
res._store()
|
||||
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
|
||||
rs.update_and_save({'engine_id': 'not-this'})
|
||||
self._assert_resource_lock(res.id, 'not-this', None)
|
||||
ex = self.assertRaises(resource.UpdateInProgress,
|
||||
res.delete_convergence,
|
||||
'engine-007')
|
||||
1, {}, 'engine-007')
|
||||
msg = ("The resource %s is already being updated." %
|
||||
res.name)
|
||||
self.assertEqual(msg, six.text_type(ex))
|
||||
|
||||
def test_delete_convergence_updates_needed_by(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
res.current_template_id = 1
|
||||
res._store()
|
||||
res.destroy = mock.Mock()
|
||||
input_data = {(1, False): 4, (2, False): 5} # needed_by resource ids
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
res.delete_convergence(1, input_data, 'engine-007')
|
||||
self.assertItemsEqual([4, 5], res.needed_by)
|
||||
|
||||
@mock.patch.object(resource_objects.Resource, 'get_obj')
|
||||
def test_update_replacement_data(self, mock_get_obj):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
r = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
r.replaced_by = 4
|
||||
r.needed_by = [4, 5]
|
||||
r._store()
|
||||
db_res = mock.MagicMock()
|
||||
db_res.current_template_id = 'same_tmpl'
|
||||
mock_get_obj.return_value = db_res
|
||||
r._update_replacement_data('same_tmpl')
|
||||
self.assertTrue(mock_get_obj.called)
|
||||
self.assertTrue(db_res.select_and_update.called)
|
||||
args, kwargs = db_res.select_and_update.call_args
|
||||
self.assertEqual({'replaces': None, 'needed_by': [4, 5]}, args[0])
|
||||
self.assertEqual(None, kwargs['expected_engine_id'])
|
||||
|
||||
@mock.patch.object(resource_objects.Resource, 'get_obj')
|
||||
def test_update_replacement_data_ignores_rsrc_from_different_tmpl(
|
||||
self, mock_get_obj):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
r = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
r.replaced_by = 4
|
||||
db_res = mock.MagicMock()
|
||||
db_res.current_template_id = 'tmpl'
|
||||
mock_get_obj.return_value = db_res
|
||||
# db_res as tmpl id as 2, and 1 is passed
|
||||
r._update_replacement_data('diff_tmpl')
|
||||
self.assertTrue(mock_get_obj.called)
|
||||
self.assertFalse(db_res.select_and_update.called)
|
||||
|
||||
|
||||
class ResourceAdoptTest(common.HeatTestCase):
|
||||
|
||||
|
|
Loading…
Reference in New Issue