From a7376f7494b310e9367ebe5dcb43b432a4053023 Mon Sep 17 00:00:00 2001 From: Crag Wolfe Date: Thu, 9 Feb 2017 06:21:37 +0000 Subject: [PATCH] Consolidate resource locking with state changes Change-Id: I261b2f0968e16d35b7d5d791a3edb4b265a4f1d1 Closes-Bug: #1662585 --- heat/engine/resource.py | 312 ++++++++++-------- heat/objects/resource.py | 11 + .../engine/service/test_stack_resources.py | 12 +- heat/tests/generic_resource.py | 1 + heat/tests/test_resource.py | 92 +++--- .../scenario/test_aodh_alarm.py | 5 + 6 files changed, 247 insertions(+), 186 deletions(-) diff --git a/heat/engine/resource.py b/heat/engine/resource.py index 45144c3204..c1a84de6b2 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -97,6 +97,9 @@ class PollDelay(Exception): class Resource(status.ResourceStatus): BASE_ATTRIBUTES = (SHOW, ) = (attributes.SHOW_ATTR, ) + LOCK_ACTIONS = (LOCK_NONE, LOCK_ACQUIRE, LOCK_RELEASE) = ( + 0, 1, -1) + # If True, this resource must be created before it can be referenced. strict_dependency = True @@ -254,6 +257,8 @@ class Resource(status.ResourceStatus): self.replaced_by = None self.current_template_id = None self.root_stack_id = None + self._calling_engine_id = None + self._atomic_key = None if stack.cache_data is None: resource = stack.db_resource_get(name) @@ -296,6 +301,7 @@ class Resource(status.ResourceStatus): self.replaced_by = resource.replaced_by self.current_template_id = resource.current_template_id self.root_stack_id = resource.root_stack_id + self._atomic_key = resource.atomic_key @property def external_id(self): @@ -448,7 +454,8 @@ class Resource(status.ResourceStatus): LOG.debug('Setting metadata for %s', six.text_type(self)) if refresh: metadata = merge_metadata(metadata, db_res.rsrc_metadata) - db_res.update_metadata(metadata) + if db_res.update_metadata(metadata): + self._incr_atomic_key() self._rsrc_metadata = metadata def handle_metadata_reset(self): @@ -747,6 +754,12 @@ class Resource(status.ResourceStatus): def glance(self): return self.client('glance') + def _incr_atomic_key(self): + if self._atomic_key is None: + self._atomic_key = 1 + else: + self._atomic_key = self._atomic_key + 1 + @contextlib.contextmanager def _action_recorder(self, action, expected_exceptions=tuple()): """Return a context manager to record the progress of an action. @@ -759,12 +772,22 @@ class Resource(status.ResourceStatus): Expected exceptions are re-raised, with the Resource moved to the COMPLETE state. """ + if self.stack.convergence: + lock_acquire = self.LOCK_ACQUIRE + lock_release = self.LOCK_RELEASE + else: + lock_acquire = lock_release = self.LOCK_NONE + try: - self.state_set(action, self.IN_PROGRESS) + self.state_set(action, self.IN_PROGRESS, lock=lock_acquire) yield + except exception.UpdateInProgress as ex: + with excutils.save_and_reraise_exception(): + LOG.info('Update in progress for %s', self.name) except expected_exceptions as ex: with excutils.save_and_reraise_exception(): - self.state_set(action, self.COMPLETE, six.text_type(ex)) + self.state_set(action, self.COMPLETE, six.text_type(ex), + lock=lock_release) LOG.debug('%s', six.text_type(ex)) except Exception as ex: LOG.info('%(action)s: %(info)s', @@ -772,7 +795,8 @@ class Resource(status.ResourceStatus): "info": six.text_type(self)}, exc_info=True) failure = exception.ResourceFailure(ex, self, action) - self.state_set(action, self.FAILED, six.text_type(failure)) + self.state_set(action, self.FAILED, six.text_type(failure), + lock=lock_release) raise failure except BaseException as exc: with excutils.save_and_reraise_exception(): @@ -781,11 +805,12 @@ class Resource(status.ResourceStatus): msg = '%s aborted' % action if reason: msg += ' (%s)' % reason - self.state_set(action, self.FAILED, msg) + self.state_set(action, self.FAILED, msg, + lock=lock_release) except Exception: LOG.exception('Error marking resource as failed') else: - self.state_set(action, self.COMPLETE) + self.state_set(action, self.COMPLETE, lock=lock_release) def action_handler_task(self, action, args=None, action_prefix=None): """A task to call the Resource subclass's handler methods for action. @@ -910,19 +935,19 @@ class Resource(status.ResourceStatus): def create_convergence(self, template_id, resource_data, engine_id, timeout, progress_callback=None): """Creates the resource by invoking the scheduler TaskRunner.""" - with self.lock(engine_id): - self.requires = list( - set(data.primary_key for data in resource_data.values() - if data is not None) - ) - self.current_template_id = template_id - if self.stack.adopt_stack_data is None: - runner = scheduler.TaskRunner(self.create) - else: - adopt_data = self.stack._adopt_kwargs(self) - runner = scheduler.TaskRunner(self.adopt, **adopt_data) + self._calling_engine_id = engine_id + self.requires = list( + set(data.primary_key for data in resource_data.values() + if data is not None) + ) + self.current_template_id = template_id + if self.stack.adopt_stack_data is None: + runner = scheduler.TaskRunner(self.create) + else: + adopt_data = self.stack._adopt_kwargs(self) + runner = scheduler.TaskRunner(self.adopt, **adopt_data) - runner(timeout=timeout, progress_callback=progress_callback) + runner(timeout=timeout, progress_callback=progress_callback) def validate_external(self): if self.external_id is not None: @@ -1172,55 +1197,74 @@ class Resource(status.ResourceStatus): resource_data and existing resource's requires, then updates the resource by invoking the scheduler TaskRunner. """ - def update_tmpl_id_and_requires(): + def update_templ_id_and_requires(persist=True): self.current_template_id = template_id self.requires = list( set(data.primary_key for data in resource_data.values() if data is not None) ) + if not persist: + return - with self.lock(engine_id): - registry = new_stack.env.registry - new_res_def = new_stack.t.resource_definitions( - new_stack)[self.name] - new_res_type = registry.get_class_to_instantiate( - new_res_def.resource_type, resource_name=self.name) - restricted_actions = registry.get_rsrc_restricted_actions( - self.name) - is_substituted = self.check_is_substituted(new_res_type) - if type(self) is not new_res_type and not is_substituted: - self._check_for_convergence_replace(restricted_actions) + rs = {'current_template_id': self.current_template_id, + 'updated_at': self.updated_time, + 'requires': self.requires} + if not resource_objects.Resource.select_and_update_by_id( + self.context, self.id, rs, expected_engine_id=None, + atomic_key=self._atomic_key): + LOG.info("Resource %s is locked, can't set template", + six.text_type(self)) + LOG.debug('Resource id:%(resource_id)s locked. ' + 'Expected atomic_key:%(atomic_key)s, ' + 'accessing from engine_id:%(engine_id)s', + {'resource_id': self.id, + 'atomic_key': self._atomic_key, + 'engine_id': self._calling_engine_id}) + raise exception.UpdateInProgress(self.name) + self._incr_atomic_key() - action_rollback = self.stack.action == self.stack.ROLLBACK - status_in_progress = self.stack.status == self.stack.IN_PROGRESS - if action_rollback and status_in_progress and self.replaced_by: - try: - self.restore_prev_rsrc(convergence=True) - except Exception as e: - failure = exception.ResourceFailure(e, self, self.action) - self.state_set(self.UPDATE, self.FAILED, - six.text_type(failure)) - raise failure + self._calling_engine_id = engine_id + registry = new_stack.env.registry + new_res_def = new_stack.t.resource_definitions( + new_stack)[self.name] + new_res_type = registry.get_class_to_instantiate( + new_res_def.resource_type, resource_name=self.name) + restricted_actions = registry.get_rsrc_restricted_actions( + self.name) + is_substituted = self.check_is_substituted(new_res_type) + if type(self) is not new_res_type and not is_substituted: + self._check_for_convergence_replace(restricted_actions) - # Use new resource as update method if existing resource - # need to be substituted. - if is_substituted: - substitute = new_res_type(self.name, self.t, self.stack) - self.stack.resources[self.name] = substitute - updater = substitute.update - else: - updater = self.update - runner = scheduler.TaskRunner(updater, new_res_def) + action_rollback = self.stack.action == self.stack.ROLLBACK + status_in_progress = self.stack.status == self.stack.IN_PROGRESS + if action_rollback and status_in_progress and self.replaced_by: try: - runner(timeout=timeout, progress_callback=progress_callback) - update_tmpl_id_and_requires() - except UpdateReplace: - raise - except BaseException: - with excutils.save_and_reraise_exception(): - update_tmpl_id_and_requires() - else: - update_tmpl_id_and_requires() + self.restore_prev_rsrc(convergence=True) + except Exception as e: + failure = exception.ResourceFailure(e, self, self.action) + self.state_set(self.UPDATE, self.FAILED, + six.text_type(failure)) + raise failure + + # Use new resource as update method if existing resource + # need to be substituted. + if is_substituted: + substitute = new_res_type(self.name, self.t, self.stack) + self.stack.resources[self.name] = substitute + substitute._calling_engine_id = engine_id + updater = substitute.update + else: + updater = self.update + runner = scheduler.TaskRunner( + updater, new_res_def, + update_templ_func=update_templ_id_and_requires) + try: + runner(timeout=timeout, progress_callback=progress_callback) + except UpdateReplace: + raise + except BaseException: + with excutils.save_and_reraise_exception(): + update_templ_id_and_requires(persist=True) def preview_update(self, after, before, after_props, before_props, prev_resource, check_init_complete=False): @@ -1316,7 +1360,8 @@ class Resource(status.ResourceStatus): return False @scheduler.wrappertask - def update(self, after, before=None, prev_resource=None): + def update(self, after, before=None, prev_resource=None, + update_templ_func=None): """Return a task to update the resource. Subclasses should provide a handle_update() method to customise update, @@ -1353,11 +1398,15 @@ class Resource(status.ResourceStatus): after_props, before_props, prev_resource): + if update_templ_func is not None: + update_templ_func(persist=True) return else: if not self._needs_update(after, before, after_props, before_props, prev_resource): + if update_templ_func is not None: + update_templ_func(persist=True) return if not self.stack.convergence: @@ -1389,6 +1438,9 @@ class Resource(status.ResourceStatus): self.t = after self.reparse() self._update_stored_properties() + if update_templ_func is not None: + # template/requires will be persisted by _action_recorder() + update_templ_func(persist=False) except exception.ResourceActionRestricted as ae: # catch all ResourceActionRestricted exceptions @@ -1617,12 +1669,12 @@ class Resource(status.ResourceStatus): if (db_res.current_template_id == template_id): # Following update failure is ignorable; another # update might have locked/updated the resource. - db_res.select_and_update( - {'needed_by': self.needed_by, - 'replaces': None}, - atomic_key=db_res.atomic_key, - expected_engine_id=None - ) + if db_res.select_and_update( + {'needed_by': self.needed_by, + 'replaces': None}, + atomic_key=db_res.atomic_key, + expected_engine_id=None): + self._incr_atomic_key() def delete_convergence(self, template_id, input_data, engine_id, timeout, progress_callback=None): @@ -1636,22 +1688,22 @@ class Resource(status.ResourceStatus): replaced by more recent resource, then delete this and update the replacement resource's needed_by and replaces fields. """ - with self.lock(engine_id): - self.needed_by = list(set(v for v in input_data.values() - if v is not None)) + self._calling_engine_id = engine_id + self.needed_by = list(set(v for v in input_data.values() + if v is not None)) - if self.current_template_id != template_id: - # just delete the resources in INIT state - if self.action == self.INIT: - try: - resource_objects.Resource.delete(self.context, self.id) - except exception.NotFound: - pass - else: - runner = scheduler.TaskRunner(self.delete) - runner(timeout=timeout, - progress_callback=progress_callback) - self._update_replacement_data(template_id) + if self.current_template_id != template_id: + # just delete the resources in INIT state + if self.action == self.INIT: + try: + resource_objects.Resource.delete(self.context, self.id) + except exception.NotFound: + pass + else: + runner = scheduler.TaskRunner(self.delete) + runner(timeout=timeout, + progress_callback=progress_callback) + self._update_replacement_data(template_id) def handle_delete(self): """Default implementation; should be overridden by resources.""" @@ -1769,7 +1821,7 @@ class Resource(status.ResourceStatus): except Exception as ex: LOG.warning('db error %s', ex) - def store(self, set_metadata=False): + def store(self, set_metadata=False, lock=LOCK_NONE): """Create the resource in the database. If self.id is set, we update the existing stack. @@ -1800,14 +1852,43 @@ class Resource(status.ResourceStatus): self._rsrc_metadata = metadata if self.id is not None: - resource_objects.Resource.update_by_id( - self.context, self.id, rs) + if (lock == self.LOCK_NONE or self._calling_engine_id is None): + resource_objects.Resource.update_by_id( + self.context, self.id, rs) + if (lock != self.LOCK_NONE and + self._calling_engine_id is None): + LOG.warning('no calling_engine_id in store %s', + str(rs)) + else: + self._store_with_lock(rs, lock) else: new_rs = resource_objects.Resource.create(self.context, rs) self.id = new_rs.id self.uuid = new_rs.uuid self.created_time = new_rs.created_at + def _store_with_lock(self, rs, lock): + if lock == self.LOCK_ACQUIRE: + rs['engine_id'] = self._calling_engine_id + expected_engine_id = None + else: # self.LOCK_RELEASE + expected_engine_id = self._calling_engine_id + rs['engine_id'] = None + if resource_objects.Resource.select_and_update_by_id( + self.context, self.id, rs, expected_engine_id, + self._atomic_key): + self._incr_atomic_key() + else: + LOG.info('Resource %s is locked or does not exist', + six.text_type(self)) + LOG.debug('Resource id:%(resource_id)s locked or does not exist. ' + 'Expected atomic_key:%(atomic_key)s, ' + 'accessing from engine_id:%(engine_id)s', + {'resource_id': self.id, + 'atomic_key': self._atomic_key, + 'engine_id': self._calling_engine_id}) + raise exception.UpdateInProgress(self.name) + def _add_event(self, action, status, reason): """Add a state change event to the database.""" physical_res_id = self.resource_id or self.physical_resource_name() @@ -1820,61 +1901,17 @@ class Resource(status.ResourceStatus): @contextlib.contextmanager def lock(self, engine_id): - self._acquire(engine_id) + self._calling_engine_id = engine_id try: + self._store_with_lock({}, self.LOCK_ACQUIRE) yield - except: # noqa - with excutils.save_and_reraise_exception(): - self._release(engine_id) - else: - self._release(engine_id) - - def _acquire(self, engine_id): - updated_ok = False - try: - rs = resource_objects.Resource.get_obj(self.context, self.id) - updated_ok = rs.select_and_update( - {'engine_id': engine_id}, - atomic_key=rs.atomic_key, - expected_engine_id=None) - except Exception as ex: - LOG.error('DB error %s', ex) + except exception.UpdateInProgress: raise - - if not updated_ok: - LOG.info('Resource %s is locked for update; deferring', - six.text_type(self)) - LOG.debug(('Resource id:%(resource_id)s with ' - 'atomic_key:%(atomic_key)s, locked ' - 'by engine_id:%(rs_engine_id)s/%(engine_id)s') % { - 'resource_id': rs.id, 'atomic_key': rs.atomic_key, - 'rs_engine_id': rs.engine_id, - 'engine_id': engine_id}) - raise exception.UpdateInProgress(self.name) - - def _release(self, engine_id): - rs = None - try: - rs = resource_objects.Resource.get_obj(self.context, self.id) - except (exception.NotFound, exception.EntityNotFound): - # ignore: Resource is deleted holding a lock-on - return - - atomic_key = rs.atomic_key - if atomic_key is None: - atomic_key = 0 - - updated_ok = rs.select_and_update( - {'engine_id': None, - 'current_template_id': self.current_template_id, - 'updated_at': self.updated_time, - 'requires': self.requires, - 'needed_by': self.needed_by}, - expected_engine_id=engine_id, - atomic_key=atomic_key) - - if not updated_ok: - LOG.warning('Failed to unlock resource %s', self.name) + except BaseException: + with excutils.save_and_reraise_exception(): + self._store_with_lock({}, self.LOCK_RELEASE) + else: + self._store_with_lock({}, self.LOCK_RELEASE) def _resolve_all_attributes(self, attr): """Method for resolving all attributes. @@ -2010,7 +2047,8 @@ class Resource(status.ResourceStatus): self.action = self.INIT self.status = self.COMPLETE - def state_set(self, action, status, reason="state changed"): + def state_set(self, action, status, reason="state changed", + lock=LOCK_NONE): if action not in self.ACTIONS: raise ValueError(_("Invalid action %s") % action) @@ -2023,7 +2061,7 @@ class Resource(status.ResourceStatus): self.action = action self.status = status self.status_reason = reason - self.store(set_metadata) + self.store(set_metadata, lock=lock) if new_state != old_state: self._add_event(action, status, reason) diff --git a/heat/objects/resource.py b/heat/objects/resource.py index 0fea6db1b9..38b8dc000a 100644 --- a/heat/objects/resource.py +++ b/heat/objects/resource.py @@ -261,6 +261,14 @@ class Resource( atomic_key=atomic_key, expected_engine_id=expected_engine_id) + @classmethod + def select_and_update_by_id(cls, context, resource_id, + values, expected_engine_id=None, + atomic_key=0): + return db_api.resource_update(context, resource_id, values, + atomic_key=atomic_key, + expected_engine_id=expected_engine_id) + def refresh(self): resource_db = db_api.resource_get(self._context, self.id, refresh=True) return self.__class__._from_db_object( @@ -282,3 +290,6 @@ class Resource( if not rows_updated: action = _('metadata setting for resource %s') % self.name raise exception.ConcurrentTransaction(action=action) + return True + else: + return False diff --git a/heat/tests/engine/service/test_stack_resources.py b/heat/tests/engine/service/test_stack_resources.py index 812cbddd34..f226ea7cb1 100644 --- a/heat/tests/engine/service/test_stack_resources.py +++ b/heat/tests/engine/service/test_stack_resources.py @@ -676,25 +676,25 @@ class StackResourcesServiceTest(common.HeatTestCase): @tools.stack_context('service_mark_unhealthy_lock_converge_test_stk', convergence=True) def test_mark_unhealthy_stack_lock_convergence(self): - mock_acquire = self.patchobject(res.Resource, - '_acquire', - return_value=None) + mock_store_with_lock = self.patchobject(res.Resource, + '_store_with_lock', + return_value=None) self.eng.resource_mark_unhealthy(self.ctx, self.stack.identifier(), 'WebServer', True, resource_status_reason="") - mock_acquire.assert_called_once_with(self.eng.engine_id) + self.assertEqual(2, mock_store_with_lock.call_count) @tools.stack_context('service_mark_unhealthy_lockexc_converge_test_stk', convergence=True) def test_mark_unhealthy_stack_lock_exc_convergence(self): - def _acquire(*args, **kwargs): + def _store_with_lock(*args, **kwargs): raise exception.UpdateInProgress(self.stack.name) self.patchobject( res.Resource, - '_acquire', + '_store_with_lock', return_value=None, side_effect=exception.UpdateInProgress(self.stack.name)) ex = self.assertRaises(dispatcher.ExpectedException, diff --git a/heat/tests/generic_resource.py b/heat/tests/generic_resource.py index 72276e4f00..165c435f6d 100644 --- a/heat/tests/generic_resource.py +++ b/heat/tests/generic_resource.py @@ -159,6 +159,7 @@ class ResourceWithProps(GenericResource): properties_schema = { 'Foo': properties.Schema(properties.Schema.STRING), 'FooInt': properties.Schema(properties.Schema.INTEGER)} + atomic_key = None class ResourceWithPropsRefPropOnDelete(ResourceWithProps): diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py index 30a39c3ace..38b5c2828e 100644 --- a/heat/tests/test_resource.py +++ b/heat/tests/test_resource.py @@ -1996,17 +1996,6 @@ class ResourceTest(common.HeatTestCase): self.assertEqual(engine_id, rs.engine_id) self.assertEqual(atomic_key, rs.atomic_key) - @mock.patch.object(resource_objects.Resource, 'get_obj') - @mock.patch.object(resource_objects.Resource, 'select_and_update') - def test_release_ignores_not_found_error(self, mock_sau, mock_get_obj): - tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') - res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) - res.store() - res._acquire('engine-id') - mock_get_obj.side_effect = exception.NotFound() - res._release('engine-id') - self.assertFalse(mock_sau.called) - def test_create_convergence(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) @@ -2024,7 +2013,7 @@ class ResourceTest(common.HeatTestCase): self.assertTrue(mock_create.called) self.assertItemsEqual([1, 3], res.requires) - self._assert_resource_lock(res.id, None, 2) + self._assert_resource_lock(res.id, None, None) def test_create_convergence_throws_timeout(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') @@ -2058,7 +2047,8 @@ class ResourceTest(common.HeatTestCase): res.create_convergence, self.stack.t.id, res_data, 'engine-007', self.dummy_timeout, self.dummy_event) self.assertItemsEqual([5, 3], res.requires) - self._assert_resource_lock(res.id, None, 2) + # The locking happens in create which we mocked out + self._assert_resource_lock(res.id, None, None) @mock.patch.object(resource.Resource, 'adopt') def test_adopt_convergence_ok(self, mock_adopt): @@ -2079,7 +2069,7 @@ class ResourceTest(common.HeatTestCase): mock_adopt.assert_called_once_with( resource_data={'resource_id': 'fluffy'}) self.assertItemsEqual([5, 3], res.requires) - self._assert_resource_lock(res.id, None, 2) + self._assert_resource_lock(res.id, None, None) def test_adopt_convergence_bad_data(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') @@ -2097,7 +2087,9 @@ class ResourceTest(common.HeatTestCase): exc = self.assertRaises(exception.ResourceFailure, tr) self.assertIn('Resource ID was not provided', six.text_type(exc)) - def test_update_convergence(self): + @mock.patch.object(resource.Resource, 'update_template_diff_properties') + @mock.patch.object(resource.Resource, '_needs_update') + def test_update_convergence(self, mock_nu, mock_utd): tmpl = template.Template({ 'HeatTemplateFormatVersion': '2012-12-12', 'Resources': { @@ -2122,19 +2114,18 @@ class ResourceTest(common.HeatTestCase): new_temp.store(stack.context) new_stack = parser.Stack(utils.dummy_context(), 'test_stack', new_temp, stack_id=self.stack.id) + res.stack.convergence = True res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} res_data = node_data.load_resources_data(res_data) - pcb = mock.Mock() - with mock.patch.object(resource.Resource, 'update') as mock_update: - tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, - res_data, 'engine-007', 120, new_stack, - pcb) - tr() - self.assertTrue(mock_update.called) + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, + res_data, 'engine-007', 120, new_stack) + tr() self.assertItemsEqual([3, 4], res.requires) + self.assertEqual(res.action, resource.Resource.UPDATE) + self.assertEqual(res.status, resource.Resource.COMPLETE) self._assert_resource_lock(res.id, None, 2) def test_update_convergence_throws_timeout(self): @@ -2210,7 +2201,9 @@ class ResourceTest(common.HeatTestCase): self.dummy_event) self.assertRaises(resource.UpdateReplace, tr) - def test_update_in_progress_convergence(self): + @mock.patch.object(resource.Resource, '_needs_update') + @mock.patch.object(resource.Resource, '_check_for_convergence_replace') + def test_update_in_progress_convergence(self, mock_cfcr, mock_nu): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') res = generic_rsrc.GenericResource('test_res', tmpl, self.stack) res.requires = [1, 2] @@ -2219,21 +2212,34 @@ class ResourceTest(common.HeatTestCase): rs.update_and_save({'engine_id': 'not-this'}) self._assert_resource_lock(res.id, 'not-this', None) + res.stack.convergence = True + res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} res_data = node_data.load_resources_data(res_data) + tmpl = template.Template({ + 'HeatTemplateFormatVersion': '2012-12-12', + 'Resources': { + 'test_res': {'Type': 'ResourceWithPropsType'} + }}, env=self.env) + new_stack = parser.Stack(utils.dummy_context(), 'test_stack', + tmpl, stack_id=self.stack.id) tr = scheduler.TaskRunner(res.update_convergence, 'template_key', res_data, 'engine-007', self.dummy_timeout, - mock.ANY, self.dummy_event) + new_stack) ex = self.assertRaises(exception.UpdateInProgress, tr) msg = ("The resource %s is already being updated." % res.name) self.assertEqual(msg, six.text_type(ex)) # ensure requirements are not updated for failed resource - self.assertEqual([1, 2], res.requires) + rs = resource_objects.Resource.get_obj(self.stack.context, res.id) + self.assertEqual([1, 2], rs.requires) - @mock.patch.object(resource.Resource, 'update') - def test_update_resource_convergence_failed(self, mock_update): + @mock.patch.object(resource.Resource, 'update_template_diff_properties') + @mock.patch.object(resource.Resource, '_needs_update') + def test_update_resource_convergence_failed(self, + mock_needs_update, + mock_update_template_diff): tmpl = template.Template({ 'HeatTemplateFormatVersion': '2012-12-12', 'Resources': { @@ -2259,26 +2265,25 @@ class ResourceTest(common.HeatTestCase): res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} res_data = node_data.load_resources_data(res_data) - exc = Exception(_('Resource update failed')) new_stack = parser.Stack(utils.dummy_context(), 'test_stack', new_temp, stack_id=self.stack.id) - dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE) - mock_update.side_effect = dummy_ex + + res.stack.convergence = True + res._calling_engine_id = 'engine-9' + tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, res_data, 'engine-007', 120, new_stack, self.dummy_event) self.assertRaises(exception.ResourceFailure, tr) - expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] - mock_update.assert_called_once_with(expected_rsrc_def) - # check if current_template_id was updated self.assertEqual(new_temp.id, res.current_template_id) # check if requires was updated self.assertItemsEqual([3, 4], res.requires) - self._assert_resource_lock(res.id, None, 2) + self.assertEqual(res.action, resource.Resource.UPDATE) + self.assertEqual(res.status, resource.Resource.FAILED) + self._assert_resource_lock(res.id, None, 3) - @mock.patch.object(resource.Resource, 'update') - def test_update_resource_convergence_update_replace(self, mock_update): + def test_update_resource_convergence_update_replace(self): tmpl = template.Template({ 'HeatTemplateFormatVersion': '2012-12-12', 'Resources': { @@ -2301,10 +2306,11 @@ class ResourceTest(common.HeatTestCase): }}, env=self.env) new_temp.store(stack.context) + res.stack.convergence = True + res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} res_data = node_data.load_resources_data(res_data) - mock_update.side_effect = resource.UpdateReplace new_stack = parser.Stack(utils.dummy_context(), 'test_stack', new_temp, stack_id=self.stack.id) tr = scheduler.TaskRunner(res.update_convergence, new_temp.id, @@ -2312,13 +2318,11 @@ class ResourceTest(common.HeatTestCase): self.dummy_event) self.assertRaises(resource.UpdateReplace, tr) - expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] - mock_update.assert_called_once_with(expected_rsrc_def) # ensure that current_template_id was not updated self.assertEqual(stack.t.id, res.current_template_id) # ensure that requires was not updated self.assertItemsEqual([2], res.requires) - self._assert_resource_lock(res.id, None, 2) + self._assert_resource_lock(res.id, None, None) def test_convergence_update_replace_rollback(self): rsrc_def = rsrc_defn.ResourceDefinition('test_res', @@ -2386,7 +2390,7 @@ class ResourceTest(common.HeatTestCase): tr() self.assertTrue(mock_delete.called) self.assertTrue(res._update_replacement_data.called) - self._assert_resource_lock(res.id, None, 2) + self._assert_resource_lock(res.id, None, None) def test_delete_convergence_does_not_delete_same_template_resource(self): tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo') @@ -2410,6 +2414,7 @@ class ResourceTest(common.HeatTestCase): res_id = res.id res.handle_delete = mock.Mock(side_effect=ValueError('test')) self._assert_resource_lock(res.id, None, None) + res.stack.convergence = True tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, 'engine-007', self.dummy_timeout, self.dummy_event) self.assertRaises(exception.ResourceFailure, tr) @@ -2428,12 +2433,13 @@ class ResourceTest(common.HeatTestCase): res.status = res.COMPLETE res.action = res.CREATE res.store() + self.stack.convergence = True + res._calling_engine_id = 'engine-9' rs = resource_objects.Resource.get_obj(self.stack.context, res.id) rs.update_and_save({'engine_id': 'not-this'}) self._assert_resource_lock(res.id, 'not-this', None) - tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007', - self.dummy_timeout, self.dummy_event) + tr = scheduler.TaskRunner(res.delete) ex = self.assertRaises(exception.UpdateInProgress, tr) msg = ("The resource %s is already being updated." % res.name) diff --git a/heat_integrationtests/scenario/test_aodh_alarm.py b/heat_integrationtests/scenario/test_aodh_alarm.py index 90288a27ab..49bcad9834 100644 --- a/heat_integrationtests/scenario/test_aodh_alarm.py +++ b/heat_integrationtests/scenario/test_aodh_alarm.py @@ -55,3 +55,8 @@ class AodhAlarmTest(scenario_base.ScenarioTestsBase): # Note: there is little point waiting more than 60s+time to scale up. self.assertTrue(test.call_until_true( 120, 2, self.check_instance_count, stack_identifier, 2)) + + # Temporarily avoids a race condition, addressed in the + # next change https://review.openstack.org/#/c/449351/ + import time + time.sleep(3)