Consolidate resource locking with state changes

Change-Id: I261b2f0968e16d35b7d5d791a3edb4b265a4f1d1
Closes-Bug: #1662585
This commit is contained in:
Crag Wolfe 2017-02-09 06:21:37 +00:00
parent 132e9a2fea
commit a7376f7494
6 changed files with 247 additions and 186 deletions

View File

@ -97,6 +97,9 @@ class PollDelay(Exception):
class Resource(status.ResourceStatus):
BASE_ATTRIBUTES = (SHOW, ) = (attributes.SHOW_ATTR, )
LOCK_ACTIONS = (LOCK_NONE, LOCK_ACQUIRE, LOCK_RELEASE) = (
0, 1, -1)
# If True, this resource must be created before it can be referenced.
strict_dependency = True
@ -254,6 +257,8 @@ class Resource(status.ResourceStatus):
self.replaced_by = None
self.current_template_id = None
self.root_stack_id = None
self._calling_engine_id = None
self._atomic_key = None
if stack.cache_data is None:
resource = stack.db_resource_get(name)
@ -296,6 +301,7 @@ class Resource(status.ResourceStatus):
self.replaced_by = resource.replaced_by
self.current_template_id = resource.current_template_id
self.root_stack_id = resource.root_stack_id
self._atomic_key = resource.atomic_key
@property
def external_id(self):
@ -448,7 +454,8 @@ class Resource(status.ResourceStatus):
LOG.debug('Setting metadata for %s', six.text_type(self))
if refresh:
metadata = merge_metadata(metadata, db_res.rsrc_metadata)
db_res.update_metadata(metadata)
if db_res.update_metadata(metadata):
self._incr_atomic_key()
self._rsrc_metadata = metadata
def handle_metadata_reset(self):
@ -747,6 +754,12 @@ class Resource(status.ResourceStatus):
def glance(self):
return self.client('glance')
def _incr_atomic_key(self):
if self._atomic_key is None:
self._atomic_key = 1
else:
self._atomic_key = self._atomic_key + 1
@contextlib.contextmanager
def _action_recorder(self, action, expected_exceptions=tuple()):
"""Return a context manager to record the progress of an action.
@ -759,12 +772,22 @@ class Resource(status.ResourceStatus):
Expected exceptions are re-raised, with the Resource moved to the
COMPLETE state.
"""
if self.stack.convergence:
lock_acquire = self.LOCK_ACQUIRE
lock_release = self.LOCK_RELEASE
else:
lock_acquire = lock_release = self.LOCK_NONE
try:
self.state_set(action, self.IN_PROGRESS)
self.state_set(action, self.IN_PROGRESS, lock=lock_acquire)
yield
except exception.UpdateInProgress as ex:
with excutils.save_and_reraise_exception():
LOG.info('Update in progress for %s', self.name)
except expected_exceptions as ex:
with excutils.save_and_reraise_exception():
self.state_set(action, self.COMPLETE, six.text_type(ex))
self.state_set(action, self.COMPLETE, six.text_type(ex),
lock=lock_release)
LOG.debug('%s', six.text_type(ex))
except Exception as ex:
LOG.info('%(action)s: %(info)s',
@ -772,7 +795,8 @@ class Resource(status.ResourceStatus):
"info": six.text_type(self)},
exc_info=True)
failure = exception.ResourceFailure(ex, self, action)
self.state_set(action, self.FAILED, six.text_type(failure))
self.state_set(action, self.FAILED, six.text_type(failure),
lock=lock_release)
raise failure
except BaseException as exc:
with excutils.save_and_reraise_exception():
@ -781,11 +805,12 @@ class Resource(status.ResourceStatus):
msg = '%s aborted' % action
if reason:
msg += ' (%s)' % reason
self.state_set(action, self.FAILED, msg)
self.state_set(action, self.FAILED, msg,
lock=lock_release)
except Exception:
LOG.exception('Error marking resource as failed')
else:
self.state_set(action, self.COMPLETE)
self.state_set(action, self.COMPLETE, lock=lock_release)
def action_handler_task(self, action, args=None, action_prefix=None):
"""A task to call the Resource subclass's handler methods for action.
@ -910,19 +935,19 @@ class Resource(status.ResourceStatus):
def create_convergence(self, template_id, resource_data, engine_id,
timeout, progress_callback=None):
"""Creates the resource by invoking the scheduler TaskRunner."""
with self.lock(engine_id):
self.requires = list(
set(data.primary_key for data in resource_data.values()
if data is not None)
)
self.current_template_id = template_id
if self.stack.adopt_stack_data is None:
runner = scheduler.TaskRunner(self.create)
else:
adopt_data = self.stack._adopt_kwargs(self)
runner = scheduler.TaskRunner(self.adopt, **adopt_data)
self._calling_engine_id = engine_id
self.requires = list(
set(data.primary_key for data in resource_data.values()
if data is not None)
)
self.current_template_id = template_id
if self.stack.adopt_stack_data is None:
runner = scheduler.TaskRunner(self.create)
else:
adopt_data = self.stack._adopt_kwargs(self)
runner = scheduler.TaskRunner(self.adopt, **adopt_data)
runner(timeout=timeout, progress_callback=progress_callback)
runner(timeout=timeout, progress_callback=progress_callback)
def validate_external(self):
if self.external_id is not None:
@ -1172,55 +1197,74 @@ class Resource(status.ResourceStatus):
resource_data and existing resource's requires, then updates the
resource by invoking the scheduler TaskRunner.
"""
def update_tmpl_id_and_requires():
def update_templ_id_and_requires(persist=True):
self.current_template_id = template_id
self.requires = list(
set(data.primary_key for data in resource_data.values()
if data is not None)
)
if not persist:
return
with self.lock(engine_id):
registry = new_stack.env.registry
new_res_def = new_stack.t.resource_definitions(
new_stack)[self.name]
new_res_type = registry.get_class_to_instantiate(
new_res_def.resource_type, resource_name=self.name)
restricted_actions = registry.get_rsrc_restricted_actions(
self.name)
is_substituted = self.check_is_substituted(new_res_type)
if type(self) is not new_res_type and not is_substituted:
self._check_for_convergence_replace(restricted_actions)
rs = {'current_template_id': self.current_template_id,
'updated_at': self.updated_time,
'requires': self.requires}
if not resource_objects.Resource.select_and_update_by_id(
self.context, self.id, rs, expected_engine_id=None,
atomic_key=self._atomic_key):
LOG.info("Resource %s is locked, can't set template",
six.text_type(self))
LOG.debug('Resource id:%(resource_id)s locked. '
'Expected atomic_key:%(atomic_key)s, '
'accessing from engine_id:%(engine_id)s',
{'resource_id': self.id,
'atomic_key': self._atomic_key,
'engine_id': self._calling_engine_id})
raise exception.UpdateInProgress(self.name)
self._incr_atomic_key()
action_rollback = self.stack.action == self.stack.ROLLBACK
status_in_progress = self.stack.status == self.stack.IN_PROGRESS
if action_rollback and status_in_progress and self.replaced_by:
try:
self.restore_prev_rsrc(convergence=True)
except Exception as e:
failure = exception.ResourceFailure(e, self, self.action)
self.state_set(self.UPDATE, self.FAILED,
six.text_type(failure))
raise failure
self._calling_engine_id = engine_id
registry = new_stack.env.registry
new_res_def = new_stack.t.resource_definitions(
new_stack)[self.name]
new_res_type = registry.get_class_to_instantiate(
new_res_def.resource_type, resource_name=self.name)
restricted_actions = registry.get_rsrc_restricted_actions(
self.name)
is_substituted = self.check_is_substituted(new_res_type)
if type(self) is not new_res_type and not is_substituted:
self._check_for_convergence_replace(restricted_actions)
# Use new resource as update method if existing resource
# need to be substituted.
if is_substituted:
substitute = new_res_type(self.name, self.t, self.stack)
self.stack.resources[self.name] = substitute
updater = substitute.update
else:
updater = self.update
runner = scheduler.TaskRunner(updater, new_res_def)
action_rollback = self.stack.action == self.stack.ROLLBACK
status_in_progress = self.stack.status == self.stack.IN_PROGRESS
if action_rollback and status_in_progress and self.replaced_by:
try:
runner(timeout=timeout, progress_callback=progress_callback)
update_tmpl_id_and_requires()
except UpdateReplace:
raise
except BaseException:
with excutils.save_and_reraise_exception():
update_tmpl_id_and_requires()
else:
update_tmpl_id_and_requires()
self.restore_prev_rsrc(convergence=True)
except Exception as e:
failure = exception.ResourceFailure(e, self, self.action)
self.state_set(self.UPDATE, self.FAILED,
six.text_type(failure))
raise failure
# Use new resource as update method if existing resource
# need to be substituted.
if is_substituted:
substitute = new_res_type(self.name, self.t, self.stack)
self.stack.resources[self.name] = substitute
substitute._calling_engine_id = engine_id
updater = substitute.update
else:
updater = self.update
runner = scheduler.TaskRunner(
updater, new_res_def,
update_templ_func=update_templ_id_and_requires)
try:
runner(timeout=timeout, progress_callback=progress_callback)
except UpdateReplace:
raise
except BaseException:
with excutils.save_and_reraise_exception():
update_templ_id_and_requires(persist=True)
def preview_update(self, after, before, after_props, before_props,
prev_resource, check_init_complete=False):
@ -1316,7 +1360,8 @@ class Resource(status.ResourceStatus):
return False
@scheduler.wrappertask
def update(self, after, before=None, prev_resource=None):
def update(self, after, before=None, prev_resource=None,
update_templ_func=None):
"""Return a task to update the resource.
Subclasses should provide a handle_update() method to customise update,
@ -1353,11 +1398,15 @@ class Resource(status.ResourceStatus):
after_props,
before_props,
prev_resource):
if update_templ_func is not None:
update_templ_func(persist=True)
return
else:
if not self._needs_update(after, before,
after_props, before_props,
prev_resource):
if update_templ_func is not None:
update_templ_func(persist=True)
return
if not self.stack.convergence:
@ -1389,6 +1438,9 @@ class Resource(status.ResourceStatus):
self.t = after
self.reparse()
self._update_stored_properties()
if update_templ_func is not None:
# template/requires will be persisted by _action_recorder()
update_templ_func(persist=False)
except exception.ResourceActionRestricted as ae:
# catch all ResourceActionRestricted exceptions
@ -1617,12 +1669,12 @@ class Resource(status.ResourceStatus):
if (db_res.current_template_id == template_id):
# Following update failure is ignorable; another
# update might have locked/updated the resource.
db_res.select_and_update(
{'needed_by': self.needed_by,
'replaces': None},
atomic_key=db_res.atomic_key,
expected_engine_id=None
)
if db_res.select_and_update(
{'needed_by': self.needed_by,
'replaces': None},
atomic_key=db_res.atomic_key,
expected_engine_id=None):
self._incr_atomic_key()
def delete_convergence(self, template_id, input_data, engine_id, timeout,
progress_callback=None):
@ -1636,22 +1688,22 @@ class Resource(status.ResourceStatus):
replaced by more recent resource, then delete this and update
the replacement resource's needed_by and replaces fields.
"""
with self.lock(engine_id):
self.needed_by = list(set(v for v in input_data.values()
if v is not None))
self._calling_engine_id = engine_id
self.needed_by = list(set(v for v in input_data.values()
if v is not None))
if self.current_template_id != template_id:
# just delete the resources in INIT state
if self.action == self.INIT:
try:
resource_objects.Resource.delete(self.context, self.id)
except exception.NotFound:
pass
else:
runner = scheduler.TaskRunner(self.delete)
runner(timeout=timeout,
progress_callback=progress_callback)
self._update_replacement_data(template_id)
if self.current_template_id != template_id:
# just delete the resources in INIT state
if self.action == self.INIT:
try:
resource_objects.Resource.delete(self.context, self.id)
except exception.NotFound:
pass
else:
runner = scheduler.TaskRunner(self.delete)
runner(timeout=timeout,
progress_callback=progress_callback)
self._update_replacement_data(template_id)
def handle_delete(self):
"""Default implementation; should be overridden by resources."""
@ -1769,7 +1821,7 @@ class Resource(status.ResourceStatus):
except Exception as ex:
LOG.warning('db error %s', ex)
def store(self, set_metadata=False):
def store(self, set_metadata=False, lock=LOCK_NONE):
"""Create the resource in the database.
If self.id is set, we update the existing stack.
@ -1800,14 +1852,43 @@ class Resource(status.ResourceStatus):
self._rsrc_metadata = metadata
if self.id is not None:
resource_objects.Resource.update_by_id(
self.context, self.id, rs)
if (lock == self.LOCK_NONE or self._calling_engine_id is None):
resource_objects.Resource.update_by_id(
self.context, self.id, rs)
if (lock != self.LOCK_NONE and
self._calling_engine_id is None):
LOG.warning('no calling_engine_id in store %s',
str(rs))
else:
self._store_with_lock(rs, lock)
else:
new_rs = resource_objects.Resource.create(self.context, rs)
self.id = new_rs.id
self.uuid = new_rs.uuid
self.created_time = new_rs.created_at
def _store_with_lock(self, rs, lock):
if lock == self.LOCK_ACQUIRE:
rs['engine_id'] = self._calling_engine_id
expected_engine_id = None
else: # self.LOCK_RELEASE
expected_engine_id = self._calling_engine_id
rs['engine_id'] = None
if resource_objects.Resource.select_and_update_by_id(
self.context, self.id, rs, expected_engine_id,
self._atomic_key):
self._incr_atomic_key()
else:
LOG.info('Resource %s is locked or does not exist',
six.text_type(self))
LOG.debug('Resource id:%(resource_id)s locked or does not exist. '
'Expected atomic_key:%(atomic_key)s, '
'accessing from engine_id:%(engine_id)s',
{'resource_id': self.id,
'atomic_key': self._atomic_key,
'engine_id': self._calling_engine_id})
raise exception.UpdateInProgress(self.name)
def _add_event(self, action, status, reason):
"""Add a state change event to the database."""
physical_res_id = self.resource_id or self.physical_resource_name()
@ -1820,61 +1901,17 @@ class Resource(status.ResourceStatus):
@contextlib.contextmanager
def lock(self, engine_id):
self._acquire(engine_id)
self._calling_engine_id = engine_id
try:
self._store_with_lock({}, self.LOCK_ACQUIRE)
yield
except: # noqa
with excutils.save_and_reraise_exception():
self._release(engine_id)
else:
self._release(engine_id)
def _acquire(self, engine_id):
updated_ok = False
try:
rs = resource_objects.Resource.get_obj(self.context, self.id)
updated_ok = rs.select_and_update(
{'engine_id': engine_id},
atomic_key=rs.atomic_key,
expected_engine_id=None)
except Exception as ex:
LOG.error('DB error %s', ex)
except exception.UpdateInProgress:
raise
if not updated_ok:
LOG.info('Resource %s is locked for update; deferring',
six.text_type(self))
LOG.debug(('Resource id:%(resource_id)s with '
'atomic_key:%(atomic_key)s, locked '
'by engine_id:%(rs_engine_id)s/%(engine_id)s') % {
'resource_id': rs.id, 'atomic_key': rs.atomic_key,
'rs_engine_id': rs.engine_id,
'engine_id': engine_id})
raise exception.UpdateInProgress(self.name)
def _release(self, engine_id):
rs = None
try:
rs = resource_objects.Resource.get_obj(self.context, self.id)
except (exception.NotFound, exception.EntityNotFound):
# ignore: Resource is deleted holding a lock-on
return
atomic_key = rs.atomic_key
if atomic_key is None:
atomic_key = 0
updated_ok = rs.select_and_update(
{'engine_id': None,
'current_template_id': self.current_template_id,
'updated_at': self.updated_time,
'requires': self.requires,
'needed_by': self.needed_by},
expected_engine_id=engine_id,
atomic_key=atomic_key)
if not updated_ok:
LOG.warning('Failed to unlock resource %s', self.name)
except BaseException:
with excutils.save_and_reraise_exception():
self._store_with_lock({}, self.LOCK_RELEASE)
else:
self._store_with_lock({}, self.LOCK_RELEASE)
def _resolve_all_attributes(self, attr):
"""Method for resolving all attributes.
@ -2010,7 +2047,8 @@ class Resource(status.ResourceStatus):
self.action = self.INIT
self.status = self.COMPLETE
def state_set(self, action, status, reason="state changed"):
def state_set(self, action, status, reason="state changed",
lock=LOCK_NONE):
if action not in self.ACTIONS:
raise ValueError(_("Invalid action %s") % action)
@ -2023,7 +2061,7 @@ class Resource(status.ResourceStatus):
self.action = action
self.status = status
self.status_reason = reason
self.store(set_metadata)
self.store(set_metadata, lock=lock)
if new_state != old_state:
self._add_event(action, status, reason)

View File

@ -261,6 +261,14 @@ class Resource(
atomic_key=atomic_key,
expected_engine_id=expected_engine_id)
@classmethod
def select_and_update_by_id(cls, context, resource_id,
values, expected_engine_id=None,
atomic_key=0):
return db_api.resource_update(context, resource_id, values,
atomic_key=atomic_key,
expected_engine_id=expected_engine_id)
def refresh(self):
resource_db = db_api.resource_get(self._context, self.id, refresh=True)
return self.__class__._from_db_object(
@ -282,3 +290,6 @@ class Resource(
if not rows_updated:
action = _('metadata setting for resource %s') % self.name
raise exception.ConcurrentTransaction(action=action)
return True
else:
return False

View File

@ -676,25 +676,25 @@ class StackResourcesServiceTest(common.HeatTestCase):
@tools.stack_context('service_mark_unhealthy_lock_converge_test_stk',
convergence=True)
def test_mark_unhealthy_stack_lock_convergence(self):
mock_acquire = self.patchobject(res.Resource,
'_acquire',
return_value=None)
mock_store_with_lock = self.patchobject(res.Resource,
'_store_with_lock',
return_value=None)
self.eng.resource_mark_unhealthy(self.ctx, self.stack.identifier(),
'WebServer', True,
resource_status_reason="")
mock_acquire.assert_called_once_with(self.eng.engine_id)
self.assertEqual(2, mock_store_with_lock.call_count)
@tools.stack_context('service_mark_unhealthy_lockexc_converge_test_stk',
convergence=True)
def test_mark_unhealthy_stack_lock_exc_convergence(self):
def _acquire(*args, **kwargs):
def _store_with_lock(*args, **kwargs):
raise exception.UpdateInProgress(self.stack.name)
self.patchobject(
res.Resource,
'_acquire',
'_store_with_lock',
return_value=None,
side_effect=exception.UpdateInProgress(self.stack.name))
ex = self.assertRaises(dispatcher.ExpectedException,

View File

@ -159,6 +159,7 @@ class ResourceWithProps(GenericResource):
properties_schema = {
'Foo': properties.Schema(properties.Schema.STRING),
'FooInt': properties.Schema(properties.Schema.INTEGER)}
atomic_key = None
class ResourceWithPropsRefPropOnDelete(ResourceWithProps):

View File

@ -1996,17 +1996,6 @@ class ResourceTest(common.HeatTestCase):
self.assertEqual(engine_id, rs.engine_id)
self.assertEqual(atomic_key, rs.atomic_key)
@mock.patch.object(resource_objects.Resource, 'get_obj')
@mock.patch.object(resource_objects.Resource, 'select_and_update')
def test_release_ignores_not_found_error(self, mock_sau, mock_get_obj):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.store()
res._acquire('engine-id')
mock_get_obj.side_effect = exception.NotFound()
res._release('engine-id')
self.assertFalse(mock_sau.called)
def test_create_convergence(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
@ -2024,7 +2013,7 @@ class ResourceTest(common.HeatTestCase):
self.assertTrue(mock_create.called)
self.assertItemsEqual([1, 3], res.requires)
self._assert_resource_lock(res.id, None, 2)
self._assert_resource_lock(res.id, None, None)
def test_create_convergence_throws_timeout(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@ -2058,7 +2047,8 @@ class ResourceTest(common.HeatTestCase):
res.create_convergence, self.stack.t.id, res_data,
'engine-007', self.dummy_timeout, self.dummy_event)
self.assertItemsEqual([5, 3], res.requires)
self._assert_resource_lock(res.id, None, 2)
# The locking happens in create which we mocked out
self._assert_resource_lock(res.id, None, None)
@mock.patch.object(resource.Resource, 'adopt')
def test_adopt_convergence_ok(self, mock_adopt):
@ -2079,7 +2069,7 @@ class ResourceTest(common.HeatTestCase):
mock_adopt.assert_called_once_with(
resource_data={'resource_id': 'fluffy'})
self.assertItemsEqual([5, 3], res.requires)
self._assert_resource_lock(res.id, None, 2)
self._assert_resource_lock(res.id, None, None)
def test_adopt_convergence_bad_data(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@ -2097,7 +2087,9 @@ class ResourceTest(common.HeatTestCase):
exc = self.assertRaises(exception.ResourceFailure, tr)
self.assertIn('Resource ID was not provided', six.text_type(exc))
def test_update_convergence(self):
@mock.patch.object(resource.Resource, 'update_template_diff_properties')
@mock.patch.object(resource.Resource, '_needs_update')
def test_update_convergence(self, mock_nu, mock_utd):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
@ -2122,19 +2114,18 @@ class ResourceTest(common.HeatTestCase):
new_temp.store(stack.context)
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
new_temp, stack_id=self.stack.id)
res.stack.convergence = True
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res_data = node_data.load_resources_data(res_data)
pcb = mock.Mock()
with mock.patch.object(resource.Resource, 'update') as mock_update:
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
res_data, 'engine-007', 120, new_stack,
pcb)
tr()
self.assertTrue(mock_update.called)
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
res_data, 'engine-007', 120, new_stack)
tr()
self.assertItemsEqual([3, 4], res.requires)
self.assertEqual(res.action, resource.Resource.UPDATE)
self.assertEqual(res.status, resource.Resource.COMPLETE)
self._assert_resource_lock(res.id, None, 2)
def test_update_convergence_throws_timeout(self):
@ -2210,7 +2201,9 @@ class ResourceTest(common.HeatTestCase):
self.dummy_event)
self.assertRaises(resource.UpdateReplace, tr)
def test_update_in_progress_convergence(self):
@mock.patch.object(resource.Resource, '_needs_update')
@mock.patch.object(resource.Resource, '_check_for_convergence_replace')
def test_update_in_progress_convergence(self, mock_cfcr, mock_nu):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
res.requires = [1, 2]
@ -2219,21 +2212,34 @@ class ResourceTest(common.HeatTestCase):
rs.update_and_save({'engine_id': 'not-this'})
self._assert_resource_lock(res.id, 'not-this', None)
res.stack.convergence = True
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res_data = node_data.load_resources_data(res_data)
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
'test_res': {'Type': 'ResourceWithPropsType'}
}}, env=self.env)
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
tmpl, stack_id=self.stack.id)
tr = scheduler.TaskRunner(res.update_convergence, 'template_key',
res_data, 'engine-007', self.dummy_timeout,
mock.ANY, self.dummy_event)
new_stack)
ex = self.assertRaises(exception.UpdateInProgress, tr)
msg = ("The resource %s is already being updated." %
res.name)
self.assertEqual(msg, six.text_type(ex))
# ensure requirements are not updated for failed resource
self.assertEqual([1, 2], res.requires)
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
self.assertEqual([1, 2], rs.requires)
@mock.patch.object(resource.Resource, 'update')
def test_update_resource_convergence_failed(self, mock_update):
@mock.patch.object(resource.Resource, 'update_template_diff_properties')
@mock.patch.object(resource.Resource, '_needs_update')
def test_update_resource_convergence_failed(self,
mock_needs_update,
mock_update_template_diff):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
@ -2259,26 +2265,25 @@ class ResourceTest(common.HeatTestCase):
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res_data = node_data.load_resources_data(res_data)
exc = Exception(_('Resource update failed'))
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
new_temp, stack_id=self.stack.id)
dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE)
mock_update.side_effect = dummy_ex
res.stack.convergence = True
res._calling_engine_id = 'engine-9'
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
res_data, 'engine-007', 120, new_stack,
self.dummy_event)
self.assertRaises(exception.ResourceFailure, tr)
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
mock_update.assert_called_once_with(expected_rsrc_def)
# check if current_template_id was updated
self.assertEqual(new_temp.id, res.current_template_id)
# check if requires was updated
self.assertItemsEqual([3, 4], res.requires)
self._assert_resource_lock(res.id, None, 2)
self.assertEqual(res.action, resource.Resource.UPDATE)
self.assertEqual(res.status, resource.Resource.FAILED)
self._assert_resource_lock(res.id, None, 3)
@mock.patch.object(resource.Resource, 'update')
def test_update_resource_convergence_update_replace(self, mock_update):
def test_update_resource_convergence_update_replace(self):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
@ -2301,10 +2306,11 @@ class ResourceTest(common.HeatTestCase):
}}, env=self.env)
new_temp.store(stack.context)
res.stack.convergence = True
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
res_data = node_data.load_resources_data(res_data)
mock_update.side_effect = resource.UpdateReplace
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
new_temp, stack_id=self.stack.id)
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
@ -2312,13 +2318,11 @@ class ResourceTest(common.HeatTestCase):
self.dummy_event)
self.assertRaises(resource.UpdateReplace, tr)
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
mock_update.assert_called_once_with(expected_rsrc_def)
# ensure that current_template_id was not updated
self.assertEqual(stack.t.id, res.current_template_id)
# ensure that requires was not updated
self.assertItemsEqual([2], res.requires)
self._assert_resource_lock(res.id, None, 2)
self._assert_resource_lock(res.id, None, None)
def test_convergence_update_replace_rollback(self):
rsrc_def = rsrc_defn.ResourceDefinition('test_res',
@ -2386,7 +2390,7 @@ class ResourceTest(common.HeatTestCase):
tr()
self.assertTrue(mock_delete.called)
self.assertTrue(res._update_replacement_data.called)
self._assert_resource_lock(res.id, None, 2)
self._assert_resource_lock(res.id, None, None)
def test_delete_convergence_does_not_delete_same_template_resource(self):
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
@ -2410,6 +2414,7 @@ class ResourceTest(common.HeatTestCase):
res_id = res.id
res.handle_delete = mock.Mock(side_effect=ValueError('test'))
self._assert_resource_lock(res.id, None, None)
res.stack.convergence = True
tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, 'engine-007',
self.dummy_timeout, self.dummy_event)
self.assertRaises(exception.ResourceFailure, tr)
@ -2428,12 +2433,13 @@ class ResourceTest(common.HeatTestCase):
res.status = res.COMPLETE
res.action = res.CREATE
res.store()
self.stack.convergence = True
res._calling_engine_id = 'engine-9'
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
rs.update_and_save({'engine_id': 'not-this'})
self._assert_resource_lock(res.id, 'not-this', None)
tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007',
self.dummy_timeout, self.dummy_event)
tr = scheduler.TaskRunner(res.delete)
ex = self.assertRaises(exception.UpdateInProgress, tr)
msg = ("The resource %s is already being updated." %
res.name)

View File

@ -55,3 +55,8 @@ class AodhAlarmTest(scenario_base.ScenarioTestsBase):
# Note: there is little point waiting more than 60s+time to scale up.
self.assertTrue(test.call_until_true(
120, 2, self.check_instance_count, stack_identifier, 2))
# Temporarily avoids a race condition, addressed in the
# next change https://review.openstack.org/#/c/449351/
import time
time.sleep(3)