Merge "Convergence: Don't try to unlock a resource after the DB row is deleted"

This commit is contained in:
Jenkins 2015-08-03 06:04:11 +00:00 committed by Gerrit Code Review
commit 735af00cc0
2 changed files with 58 additions and 16 deletions

View File

@ -1113,7 +1113,8 @@ class Resource(object):
replaced by more recent resource, then delete this and update
the replacement resource's needed_by and replaces fields.
'''
with self.lock(engine_id):
self._aquire(engine_id)
try:
self.needed_by = list(set(v for v in input_data.values()
if v is not None))
@ -1123,6 +1124,11 @@ class Resource(object):
# update needed_by and replaces of replacement resource
self._update_replacement_data(template_id)
else:
self._release(engine_id)
except: # noqa
with excutils.save_and_reraise_exception():
self._release(engine_id)
@scheduler.wrappertask
def delete(self):
@ -1267,6 +1273,16 @@ class Resource(object):
@contextlib.contextmanager
def lock(self, engine_id):
self._aquire(engine_id)
try:
yield
except: # noqa
with excutils.save_and_reraise_exception():
self._release(engine_id)
else:
self._release(engine_id)
def _aquire(self, engine_id):
updated_ok = False
try:
rs = resource_objects.Resource.get_obj(self.context, self.id)
@ -1284,29 +1300,23 @@ class Resource(object):
rs.atomic_key, rs.engine_id, engine_id))
raise ex
try:
yield
except: # noqa
with excutils.save_and_reraise_exception():
self.unlock(rs, engine_id, rs.atomic_key)
else:
self.unlock(rs, engine_id, rs.atomic_key)
def unlock(self, rsrc, engine_id, atomic_key):
def _release(self, engine_id):
rs = resource_objects.Resource.get_obj(self.context, self.id)
atomic_key = rs.atomic_key
if atomic_key is None:
atomic_key = 0
updated_ok = rsrc.select_and_update(
updated_ok = rs.select_and_update(
{'engine_id': None,
'current_template_id': self.current_template_id,
'updated_at': self.updated_time,
'requires': self.requires,
'needed_by': self.needed_by},
expected_engine_id=engine_id,
atomic_key=atomic_key + 1)
atomic_key=atomic_key)
if not updated_ok:
LOG.warn(_LW('Failed to unlock resource %s'), rsrc.name)
LOG.warn(_LW('Failed to unlock resource %s'), self.name)
def _resolve_attribute(self, name):
"""

View File

@ -1567,16 +1567,24 @@ class ResourceTest(common.HeatTestCase):
# ensure requirements are not updated for failed resource
self.assertEqual([1, 2], res.requires)
def test_delete_convergence(self):
def test_delete_convergence_ok(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.current_template_id = 1
res.status = res.COMPLETE
res.action = res.CREATE
res._store()
res.destroy = mock.Mock()
res_id = res.id
res.handle_delete = mock.Mock(return_value=None)
res._update_replacement_data = mock.Mock()
self._assert_resource_lock(res.id, None, None)
res.delete_convergence(2, {}, 'engine-007')
self.assertTrue(res.destroy.called)
self.assertTrue(res.handle_delete.called)
self.assertRaises(exception.NotFound,
resource_objects.Resource.get_obj,
self.stack.context, res_id)
self.assertTrue(res._update_replacement_data.called)
def test_delete_convergence_does_not_delete_same_template_resource(self):
@ -1588,10 +1596,32 @@ class ResourceTest(common.HeatTestCase):
res.delete_convergence('same-template', {}, 'engine-007')
self.assertFalse(res.destroy.called)
def test_delete_convergence_fail(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.current_template_id = 1
res.status = res.COMPLETE
res.action = res.CREATE
res._store()
res_id = res.id
res.handle_delete = mock.Mock(side_effect=ValueError('test'))
self._assert_resource_lock(res.id, None, None)
self.assertRaises(exception.ResourceFailure,
res.delete_convergence, 2, {}, 'engine-007')
self.assertTrue(res.handle_delete.called)
# confirm that the DB object still exists, and it's lock is released.
rs = resource_objects.Resource.get_obj(self.stack.context, res_id)
self.assertEqual(rs.id, res_id)
self.assertEqual(res.FAILED, rs.status)
self._assert_resource_lock(res.id, None, 2)
def test_delete_in_progress_convergence(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.current_template_id = 1
res.status = res.COMPLETE
res.action = res.CREATE
res._store()
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
rs.update_and_save({'engine_id': 'not-this'})
@ -1607,6 +1637,8 @@ class ResourceTest(common.HeatTestCase):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.current_template_id = 1
res.status = res.COMPLETE
res.action = res.CREATE
res._store()
res.destroy = mock.Mock()
input_data = {(1, False): 4, (2, False): 5} # needed_by resource ids