Convergence: Compute resource dependencies

1. Resource requires need to be computed from the input data supplied by
sync point. Since the replacement impersonates the replaced, we need to
get the requires from input data rather than from the graph key. The
graph key will still have old (replaced) resource and input data will
have correct requires.

2. Resources are always INITed with correct template, that they belong
to, so there is no need to pass template ID again to create or delete.

Co-Authored-By: Rakesh HS  <rh-s@hp.com>
Co-Authored-By: Kanagaraj Manickam  <kanagaraj.manickam@hp.com>

Change-Id: I706cf02b438f19d3e1a59b3e8ca5ebe6b3e60328
This commit is contained in:
Anant Patil 2015-07-07 09:07:51 +05:30
parent 5f9d1ebbdb
commit df0aadd888
3 changed files with 48 additions and 30 deletions

View File

@ -640,22 +640,18 @@ class Resource(object):
'''
return self
def create_convergence(self, template_id, resource_data, engine_id):
def create_convergence(self, resource_data, engine_id):
'''
Creates the resource by invoking the scheduler TaskRunner
and it persists the resource's current_template_id to template_id and
resource's requires to list of the required resource id from the
given resource_data.
Creates the resource by invoking the scheduler TaskRunner.
'''
with self.lock(engine_id):
self.requires = list(
set(data[u'id'] for data in resource_data.values()
if data is not None)
)
runner = scheduler.TaskRunner(self.create)
runner()
# update the resource db record (stored in unlock())
self.current_template_id = template_id
self.requires = list(
{graph_key[0] for graph_key, data in resource_data.items()})
@scheduler.wrappertask
def create(self):
'''
@ -812,9 +808,10 @@ class Resource(object):
# update the resource db record (stored in unlock)
self.current_template_id = template_id
current_requires = set(
graph_key[0] for graph_key, data in resource_data.items())
self.requires = list(set(self.requires) | current_requires)
self.requires = list(
set(data[u'id'] for data in resource_data.values()
if data is not None)
)
@scheduler.wrappertask
def update(self, after, before=None, prev_resource=None):
@ -1006,7 +1003,7 @@ class Resource(object):
msg = _('"%s" deletion policy not supported') % policy
raise exception.StackValidationFailed(message=msg)
def delete_convergence(self, template_id, resource_data, engine_id):
def delete_convergence(self, engine_id):
'''
Destroys the resource. The destroy task is run in a scheduler
TaskRunner after acquiring the lock on resource.

View File

@ -311,7 +311,7 @@ def check_resource_update(rsrc, template_id, data, engine_id):
resource.Resource.COMPLETE,
resource.Resource.FAILED
])):
rsrc.create_convergence(template_id, data, engine_id)
rsrc.create_convergence(data, engine_id)
else:
rsrc.update_convergence(template_id, data, engine_id)
@ -322,4 +322,4 @@ def check_resource_cleanup(rsrc, template_id, data, engine_id):
'''
if rsrc.current_template_id != template_id:
rsrc.delete_convergence(template_id, data, engine_id)
rsrc.delete_convergence(engine_id)

View File

@ -1427,13 +1427,31 @@ class ResourceTest(common.HeatTestCase):
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res._store()
self._assert_resource_lock(res.id, None, None)
res.create_convergence('template_key', {(1, True): {},
(1, True): {}},
'engine-007')
res_data = {(1, True): {u'id': 1, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res.create_convergence(res_data, 'engine-007')
mock_create.assert_called_once_with()
self.assertEqual('template_key', res.current_template_id)
self.assertEqual([1], res.requires)
self.assertItemsEqual([1, 3], res.requires)
self._assert_resource_lock(res.id, None, 2)
def test_create_convergence_sets_requires_for_failure(self):
'''
Ensure that requires are computed correctly even if resource
create fails,
'''
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res._store()
dummy_ex = exception.ResourceNotAvailable(resource_name=res.name)
res.create = mock.Mock(side_effect=dummy_ex)
self._assert_resource_lock(res.id, None, None)
res_data = {(1, True): {u'id': 5, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
self.assertRaises(exception.ResourceNotAvailable,
res.create_convergence, res_data,
'engine-007')
self.assertItemsEqual([5, 3], res.requires)
self._assert_resource_lock(res.id, None, 2)
@mock.patch.object(resource.Resource, 'update')
@ -1453,30 +1471,36 @@ class ResourceTest(common.HeatTestCase):
}}, env=self.env)
new_temp.store()
res.update_convergence(new_temp.id, {(1, True): {},
(1, True): {}}, 'engine-007')
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res.update_convergence(new_temp.id, res_data, 'engine-007')
mock_update.assert_called_once_with(
new_temp.resource_definitions(self.stack)[res.name])
self.assertEqual(new_temp.id, res.current_template_id)
self.assertEqual([1, 2], res.requires)
self.assertItemsEqual([3, 4], res.requires)
self._assert_resource_lock(res.id, None, 2)
def test_update_in_progress_convergence(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.requires = [1, 2]
res._store()
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
rs.update_and_save({'engine_id': 'not-this'})
self._assert_resource_lock(res.id, 'not-this', None)
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
ex = self.assertRaises(resource.UpdateInProgress,
res.update_convergence,
'template_key',
{}, 'engine-007')
res_data, 'engine-007')
msg = ("The resource %s is already being updated." %
res.name)
self.assertEqual(msg, six.text_type(ex))
# ensure requirements are not updated for failed resource
self.assertEqual([1, 2], res.requires)
def test_delete_convergence(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@ -1485,9 +1509,7 @@ class ResourceTest(common.HeatTestCase):
res._store()
res.destroy = mock.Mock()
self._assert_resource_lock(res.id, None, None)
res.delete_convergence('template_key', {(1, True): {},
(1, True): {}},
'engine-007')
res.delete_convergence('engine-007')
self.assertTrue(res.destroy.called)
def test_delete_in_progress_convergence(self):
@ -1499,8 +1521,7 @@ class ResourceTest(common.HeatTestCase):
self._assert_resource_lock(res.id, 'not-this', None)
ex = self.assertRaises(resource.UpdateInProgress,
res.delete_convergence,
'template_key',
{}, 'engine-007')
'engine-007')
msg = ("The resource %s is already being updated." %
res.name)
self.assertEqual(msg, six.text_type(ex))