diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 04dd71fa22..79ef68d840 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -18,6 +18,7 @@ import random from oslo_config import cfg from oslo_db import api as oslo_db_api +from oslo_db import exception as db_exception from oslo_db import options from oslo_db.sqlalchemy import enginefacade from oslo_db.sqlalchemy import utils @@ -443,6 +444,32 @@ def resource_create(context, values): return resource_ref +def resource_create_replacement(context, + existing_res_id, existing_res_values, + new_res_values, + atomic_key, expected_engine_id=None): + session = context.session + try: + with session.begin(subtransactions=True): + new_res = resource_create(context, new_res_values) + update_data = {'replaced_by': new_res.id} + update_data.update(existing_res_values) + if not resource_update(context, + existing_res_id, update_data, + atomic_key, + expected_engine_id=expected_engine_id): + data = {} + if 'name' in new_res_values: + data['resource_name'] = new_res_values['name'] + raise exception.UpdateInProgress(**data) + except db_exception.DBReferenceError as exc: + # New template_id no longer exists + LOG.debug('Not creating replacement resource: %s', exc) + return None + else: + return new_res + + def resource_get_all_by_stack(context, stack_id, filters=None): query = context.session.query( models.Resource diff --git a/heat/engine/check_resource.py b/heat/engine/check_resource.py index 5c2db661a9..ae0f847bd1 100644 --- a/heat/engine/check_resource.py +++ b/heat/engine/check_resource.py @@ -93,10 +93,15 @@ class CheckResource(object): # Another concurrent update has taken over. But there is a # possibility for that update to be waiting for this rsrc to # complete, hence retrigger current rsrc for latest traversal. - traversal = stack.current_traversal - latest_stack = parser.Stack.load(cnxt, stack_id=stack.id, + self._retrigger_new_traversal(cnxt, stack.current_traversal, + is_update, + stack.id, rsrc_id) + + def _retrigger_new_traversal(self, cnxt, current_traversal, is_update, + stack_id, rsrc_id): + latest_stack = parser.Stack.load(cnxt, stack_id=stack_id, force_reload=True) - if traversal != latest_stack.current_traversal: + if current_traversal != latest_stack.current_traversal: self.retrigger_check_resource(cnxt, is_update, rsrc_id, latest_stack) @@ -104,6 +109,30 @@ class CheckResource(object): failure_reason = u'Timed out' self._handle_failure(cnxt, stack, failure_reason) + def _handle_resource_replacement(self, cnxt, + current_traversal, new_tmpl_id, + rsrc, stack, adopt_stack_data): + """Create a replacement resource and trigger a check on it.""" + try: + new_res_id = rsrc.make_replacement(new_tmpl_id) + except exception.UpdateInProgress: + LOG.info("No replacement created - " + "resource already locked by new traversal") + return + if new_res_id is None: + LOG.info("No replacement created - " + "new traversal already in progress") + self._retrigger_new_traversal(cnxt, current_traversal, True, + stack.id, rsrc.id) + return + LOG.info("Replacing resource with new id %s", new_res_id) + rpc_data = sync_point.serialize_input_data(self.input_data) + self._rpc_client.check_resource(cnxt, + new_res_id, + current_traversal, + rpc_data, True, + adopt_stack_data) + def _do_check_resource(self, cnxt, current_traversal, tmpl, resource_data, is_update, rsrc, stack, adopt_stack_data): try: @@ -113,15 +142,10 @@ class CheckResource(object): self.engine_id, stack, self.msg_queue) except resource.UpdateReplace: - new_res_id = rsrc.make_replacement(tmpl.id) - LOG.info("Replacing resource with new id %s", - new_res_id) - rpc_data = sync_point.serialize_input_data(self.input_data) - self._rpc_client.check_resource(cnxt, - new_res_id, - current_traversal, - rpc_data, is_update, - adopt_stack_data) + self._handle_resource_replacement(cnxt, current_traversal, + tmpl.id, + rsrc, stack, + adopt_stack_data) return False else: diff --git a/heat/engine/resource.py b/heat/engine/resource.py index 3b15a30d53..ef91a71c54 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -361,11 +361,18 @@ class Resource(status.ResourceStatus): return resource, initial_stk_defn, curr_stack def make_replacement(self, new_tmpl_id): + """Create a replacement resource in the database. + + Returns the DB ID of the new resource, or None if the new resource + cannot be created (generally because the template ID does not exist). + Raises UpdateInProgress if another traversal has already locked the + current resource. + """ # 1. create the replacement with "replaces" = self.id # Don't set physical_resource_id so that a create is triggered. rs = {'stack_id': self.stack.id, 'name': self.name, - 'rsrc_prop_data_id': self._create_or_replace_rsrc_prop_data(), + 'rsrc_prop_data_id': None, 'needed_by': self.needed_by, 'requires': self.requires, 'replaces': self.id, @@ -374,13 +381,39 @@ class Resource(status.ResourceStatus): 'current_template_id': new_tmpl_id, 'stack_name': self.stack.name, 'root_stack_id': self.root_stack_id} - new_rs = resource_objects.Resource.create(self.context, rs) + update_data = {'status': self.COMPLETE} - # 2. update the current resource to be replaced_by the one above. + # Retry in case a signal has updated the atomic_key + attempts = max(cfg.CONF.client_retry_limit, 0) + 1 + + def prepare_attempt(fn, attempt): + if attempt > 1: + res_obj = resource_objects.Resource.get_obj( + self.context, self.id) + if (res_obj.engine_id is not None or + res_obj.updated_at != self.updated_time): + raise exception.UpdateInProgress(resource_name=self.name) + self._atomic_key = res_obj.atomic_key + + @tenacity.retry( + stop=tenacity.stop_after_attempt(attempts), + retry=tenacity.retry_if_exception_type( + exception.UpdateInProgress), + before=prepare_attempt, + wait=tenacity.wait_random(max=2), + reraise=True) + def create_replacement(): + return resource_objects.Resource.replacement(self.context, + self.id, + update_data, + rs, + self._atomic_key) + + new_rs = create_replacement() + if new_rs is None: + return None + self._incr_atomic_key(self._atomic_key) self.replaced_by = new_rs.id - resource_objects.Resource.update_by_id( - self.context, self.id, - {'status': self.COMPLETE, 'replaced_by': self.replaced_by}) return new_rs.id def reparse(self, client_resolve=True): diff --git a/heat/objects/resource.py b/heat/objects/resource.py index 1f2e2bbdf4..67cc24a0c9 100644 --- a/heat/objects/resource.py +++ b/heat/objects/resource.py @@ -199,6 +199,21 @@ class Resource( return cls._from_db_object(cls(context), context, db_api.resource_create(context, values)) + @classmethod + def replacement(cls, context, + existing_res_id, existing_res_values, + new_res_values, + atomic_key=0, expected_engine_id=None): + replacement = db_api.resource_create_replacement(context, + existing_res_id, + existing_res_values, + new_res_values, + atomic_key, + expected_engine_id) + if replacement is None: + return None + return cls._from_db_object(cls(context), context, replacement) + @classmethod def delete(cls, context, resource_id): db_api.resource_delete(context, resource_id) diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index b480e5d1b5..c4adcef0c2 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -1387,6 +1387,7 @@ def create_resource(ctx, stack, legacy_prop_data=False, **kwargs): 'status_reason': 'create_complete', 'rsrc_metadata': json.loads('{"foo": "123"}'), 'stack_id': stack.id, + 'atomic_key': 1, } if not legacy_prop_data: values['rsrc_prop_data'] = rpd @@ -2543,6 +2544,135 @@ class DBAPIResourceTest(common.HeatTestCase): self.assertEqual({'engine-001', 'engine-002'}, engines) +class DBAPIResourceReplacementTest(common.HeatTestCase): + def setUp(self): + self.useFixture(utils.ForeignKeyConstraintFixture()) + super(DBAPIResourceReplacementTest, self).setUp() + self.ctx = utils.dummy_context() + self.template = create_raw_template(self.ctx) + self.user_creds = create_user_creds(self.ctx) + self.stack = create_stack(self.ctx, self.template, self.user_creds) + + def test_resource_create_replacement(self): + orig = create_resource(self.ctx, self.stack) + + tmpl_id = create_raw_template(self.ctx).id + + repl = db_api.resource_create_replacement( + self.ctx, + orig.id, + {'status_reason': 'test replacement'}, + {'name': orig.name, 'replaces': orig.id, + 'stack_id': orig.stack_id, 'current_template_id': tmpl_id}, + 1, None) + + self.assertIsNotNone(repl) + self.assertEqual(orig.name, repl.name) + self.assertNotEqual(orig.id, repl.id) + self.assertEqual(orig.id, repl.replaces) + + def test_resource_create_replacement_template_gone(self): + orig = create_resource(self.ctx, self.stack) + + other_ctx = utils.dummy_context() + tmpl_id = create_raw_template(self.ctx).id + db_api.raw_template_delete(other_ctx, tmpl_id) + + repl = db_api.resource_create_replacement( + self.ctx, + orig.id, + {'status_reason': 'test replacement'}, + {'name': orig.name, 'replaces': orig.id, + 'stack_id': orig.stack_id, 'current_template_id': tmpl_id}, + 1, None) + + self.assertIsNone(repl) + + def test_resource_create_replacement_updated(self): + orig = create_resource(self.ctx, self.stack) + + other_ctx = utils.dummy_context() + tmpl_id = create_raw_template(self.ctx).id + db_api.resource_update_and_save(other_ctx, orig.id, {'atomic_key': 2}) + + self.assertRaises(exception.UpdateInProgress, + db_api.resource_create_replacement, + self.ctx, + orig.id, + {'status_reason': 'test replacement'}, + {'name': orig.name, 'replaces': orig.id, + 'stack_id': orig.stack_id, + 'current_template_id': tmpl_id}, + 1, None) + + def test_resource_create_replacement_updated_concurrent(self): + orig = create_resource(self.ctx, self.stack) + + other_ctx = utils.dummy_context() + tmpl_id = create_raw_template(self.ctx).id + + def update_atomic_key(*args, **kwargs): + db_api.resource_update_and_save(other_ctx, orig.id, + {'atomic_key': 2}) + + self.patchobject(db_api, 'resource_update', + new=mock.Mock(wraps=db_api.resource_update, + side_effect=update_atomic_key)) + + self.assertRaises(exception.UpdateInProgress, + db_api.resource_create_replacement, + self.ctx, + orig.id, + {'status_reason': 'test replacement'}, + {'name': orig.name, 'replaces': orig.id, + 'stack_id': orig.stack_id, + 'current_template_id': tmpl_id}, + 1, None) + + def test_resource_create_replacement_locked(self): + orig = create_resource(self.ctx, self.stack) + + other_ctx = utils.dummy_context() + tmpl_id = create_raw_template(self.ctx).id + db_api.resource_update_and_save(other_ctx, orig.id, {'engine_id': 'a', + 'atomic_key': 2}) + + self.assertRaises(exception.UpdateInProgress, + db_api.resource_create_replacement, + self.ctx, + orig.id, + {'status_reason': 'test replacement'}, + {'name': orig.name, 'replaces': orig.id, + 'stack_id': orig.stack_id, + 'current_template_id': tmpl_id}, + 1, None) + + def test_resource_create_replacement_locked_concurrent(self): + orig = create_resource(self.ctx, self.stack) + + other_ctx = utils.dummy_context() + tmpl_id = create_raw_template(self.ctx).id + + def lock_resource(*args, **kwargs): + db_api.resource_update_and_save(other_ctx, orig.id, + {'engine_id': 'a', + 'atomic_key': 2}) + + self.patchobject(db_api, 'resource_update', + new=mock.Mock(wraps=db_api.resource_update, + side_effect=lock_resource)) + + self.assertRaises(exception.UpdateInProgress, + db_api.resource_create_replacement, + self.ctx, + orig.id, + {'status_reason': 'test replacement'}, + {'name': orig.name, 'replaces': orig.id, + 'stack_id': orig.stack_id, + 'current_template_id': tmpl_id}, + 1, None) + + class DBAPIStackLockTest(common.HeatTestCase): def setUp(self): super(DBAPIStackLockTest, self).setUp() diff --git a/heat/tests/utils.py b/heat/tests/utils.py index 8970b21bef..208c852f1d 100644 --- a/heat/tests/utils.py +++ b/heat/tests/utils.py @@ -15,6 +15,7 @@ import random import string import uuid +import fixtures import mox from oslo_config import cfg from oslo_db import options @@ -187,3 +188,15 @@ class JsonEquals(mox.Comparator): def __repr__(self): return "" % self.other_json + + +class ForeignKeyConstraintFixture(fixtures.Fixture): + def __init__(self, sqlite_fk=True): + self.enable_fkc = sqlite_fk + + def _setUp(self): + new_context = db_api.db_context.make_new_manager() + new_context.configure(sqlite_fk=self.enable_fkc) + + self.useFixture(fixtures.MockPatchObject(db_api, '_facade', None)) + self.addCleanup(db_api.db_context.patch_factory(new_context._factory))