Create replacement resource atomically

Use a single transaction to create the replacement resource and set it as
the replaced_by link in the old resource. Also, ensure that no other
traversal has taken a lock on the old resource before we modify it.

If we end up bailing out and not creating a replacement or sending an RPC
message to check it, make sure we retrigger any new traversal.

Change-Id: I23db4f06a4060f3d26a78f7b26700de426f355e3
Closes-Bug: #1727128
changes/88/514888/4
Zane Bitter 5 years ago committed by rabi
parent 86fcc11b80
commit c9792b96d2
  1. 27
      heat/db/sqlalchemy/api.py
  2. 48
      heat/engine/check_resource.py
  3. 45
      heat/engine/resource.py
  4. 15
      heat/objects/resource.py
  5. 130
      heat/tests/db/test_sqlalchemy_api.py
  6. 13
      heat/tests/utils.py

@ -18,6 +18,7 @@ import random
from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exception
from oslo_db import options
from oslo_db.sqlalchemy import enginefacade
from oslo_db.sqlalchemy import utils
@ -443,6 +444,32 @@ def resource_create(context, values):
return resource_ref
def resource_create_replacement(context,
existing_res_id, existing_res_values,
new_res_values,
atomic_key, expected_engine_id=None):
session = context.session
try:
with session.begin(subtransactions=True):
new_res = resource_create(context, new_res_values)
update_data = {'replaced_by': new_res.id}
update_data.update(existing_res_values)
if not resource_update(context,
existing_res_id, update_data,
atomic_key,
expected_engine_id=expected_engine_id):
data = {}
if 'name' in new_res_values:
data['resource_name'] = new_res_values['name']
raise exception.UpdateInProgress(**data)
except db_exception.DBReferenceError as exc:
# New template_id no longer exists
LOG.debug('Not creating replacement resource: %s', exc)
return None
else:
return new_res
def resource_get_all_by_stack(context, stack_id, filters=None):
query = context.session.query(
models.Resource

@ -93,10 +93,15 @@ class CheckResource(object):
# Another concurrent update has taken over. But there is a
# possibility for that update to be waiting for this rsrc to
# complete, hence retrigger current rsrc for latest traversal.
traversal = stack.current_traversal
latest_stack = parser.Stack.load(cnxt, stack_id=stack.id,
self._retrigger_new_traversal(cnxt, stack.current_traversal,
is_update,
stack.id, rsrc_id)
def _retrigger_new_traversal(self, cnxt, current_traversal, is_update,
stack_id, rsrc_id):
latest_stack = parser.Stack.load(cnxt, stack_id=stack_id,
force_reload=True)
if traversal != latest_stack.current_traversal:
if current_traversal != latest_stack.current_traversal:
self.retrigger_check_resource(cnxt, is_update, rsrc_id,
latest_stack)
@ -104,6 +109,30 @@ class CheckResource(object):
failure_reason = u'Timed out'
self._handle_failure(cnxt, stack, failure_reason)
def _handle_resource_replacement(self, cnxt,
current_traversal, new_tmpl_id,
rsrc, stack, adopt_stack_data):
"""Create a replacement resource and trigger a check on it."""
try:
new_res_id = rsrc.make_replacement(new_tmpl_id)
except exception.UpdateInProgress:
LOG.info("No replacement created - "
"resource already locked by new traversal")
return
if new_res_id is None:
LOG.info("No replacement created - "
"new traversal already in progress")
self._retrigger_new_traversal(cnxt, current_traversal, True,
stack.id, rsrc.id)
return
LOG.info("Replacing resource with new id %s", new_res_id)
rpc_data = sync_point.serialize_input_data(self.input_data)
self._rpc_client.check_resource(cnxt,
new_res_id,
current_traversal,
rpc_data, True,
adopt_stack_data)
def _do_check_resource(self, cnxt, current_traversal, tmpl, resource_data,
is_update, rsrc, stack, adopt_stack_data):
try:
@ -113,15 +142,10 @@ class CheckResource(object):
self.engine_id,
stack, self.msg_queue)
except resource.UpdateReplace:
new_res_id = rsrc.make_replacement(tmpl.id)
LOG.info("Replacing resource with new id %s",
new_res_id)
rpc_data = sync_point.serialize_input_data(self.input_data)
self._rpc_client.check_resource(cnxt,
new_res_id,
current_traversal,
rpc_data, is_update,
adopt_stack_data)
self._handle_resource_replacement(cnxt, current_traversal,
tmpl.id,
rsrc, stack,
adopt_stack_data)
return False
else:

@ -361,11 +361,18 @@ class Resource(status.ResourceStatus):
return resource, initial_stk_defn, curr_stack
def make_replacement(self, new_tmpl_id):
"""Create a replacement resource in the database.
Returns the DB ID of the new resource, or None if the new resource
cannot be created (generally because the template ID does not exist).
Raises UpdateInProgress if another traversal has already locked the
current resource.
"""
# 1. create the replacement with "replaces" = self.id
# Don't set physical_resource_id so that a create is triggered.
rs = {'stack_id': self.stack.id,
'name': self.name,
'rsrc_prop_data_id': self._create_or_replace_rsrc_prop_data(),
'rsrc_prop_data_id': None,
'needed_by': self.needed_by,
'requires': self.requires,
'replaces': self.id,
@ -374,13 +381,39 @@ class Resource(status.ResourceStatus):
'current_template_id': new_tmpl_id,
'stack_name': self.stack.name,
'root_stack_id': self.root_stack_id}
new_rs = resource_objects.Resource.create(self.context, rs)
update_data = {'status': self.COMPLETE}
# Retry in case a signal has updated the atomic_key
attempts = max(cfg.CONF.client_retry_limit, 0) + 1
def prepare_attempt(fn, attempt):
if attempt > 1:
res_obj = resource_objects.Resource.get_obj(
self.context, self.id)
if (res_obj.engine_id is not None or
res_obj.updated_at != self.updated_time):
raise exception.UpdateInProgress(resource_name=self.name)
self._atomic_key = res_obj.atomic_key
# 2. update the current resource to be replaced_by the one above.
@tenacity.retry(
stop=tenacity.stop_after_attempt(attempts),
retry=tenacity.retry_if_exception_type(
exception.UpdateInProgress),
before=prepare_attempt,
wait=tenacity.wait_random(max=2),
reraise=True)
def create_replacement():
return resource_objects.Resource.replacement(self.context,
self.id,
update_data,
rs,
self._atomic_key)
new_rs = create_replacement()
if new_rs is None:
return None
self._incr_atomic_key(self._atomic_key)
self.replaced_by = new_rs.id
resource_objects.Resource.update_by_id(
self.context, self.id,
{'status': self.COMPLETE, 'replaced_by': self.replaced_by})
return new_rs.id
def reparse(self, client_resolve=True):

@ -199,6 +199,21 @@ class Resource(
return cls._from_db_object(cls(context), context,
db_api.resource_create(context, values))
@classmethod
def replacement(cls, context,
existing_res_id, existing_res_values,
new_res_values,
atomic_key=0, expected_engine_id=None):
replacement = db_api.resource_create_replacement(context,
existing_res_id,
existing_res_values,
new_res_values,
atomic_key,
expected_engine_id)
if replacement is None:
return None
return cls._from_db_object(cls(context), context, replacement)
@classmethod
def delete(cls, context, resource_id):
db_api.resource_delete(context, resource_id)

@ -1387,6 +1387,7 @@ def create_resource(ctx, stack, legacy_prop_data=False, **kwargs):
'status_reason': 'create_complete',
'rsrc_metadata': json.loads('{"foo": "123"}'),
'stack_id': stack.id,
'atomic_key': 1,
}
if not legacy_prop_data:
values['rsrc_prop_data'] = rpd
@ -2543,6 +2544,135 @@ class DBAPIResourceTest(common.HeatTestCase):
self.assertEqual({'engine-001', 'engine-002'}, engines)
class DBAPIResourceReplacementTest(common.HeatTestCase):
def setUp(self):
self.useFixture(utils.ForeignKeyConstraintFixture())
super(DBAPIResourceReplacementTest, self).setUp()
self.ctx = utils.dummy_context()
self.template = create_raw_template(self.ctx)
self.user_creds = create_user_creds(self.ctx)
self.stack = create_stack(self.ctx, self.template, self.user_creds)
def test_resource_create_replacement(self):
orig = create_resource(self.ctx, self.stack)
tmpl_id = create_raw_template(self.ctx).id
repl = db_api.resource_create_replacement(
self.ctx,
orig.id,
{'status_reason': 'test replacement'},
{'name': orig.name, 'replaces': orig.id,
'stack_id': orig.stack_id, 'current_template_id': tmpl_id},
1, None)
self.assertIsNotNone(repl)
self.assertEqual(orig.name, repl.name)
self.assertNotEqual(orig.id, repl.id)
self.assertEqual(orig.id, repl.replaces)
def test_resource_create_replacement_template_gone(self):
orig = create_resource(self.ctx, self.stack)
other_ctx = utils.dummy_context()
tmpl_id = create_raw_template(self.ctx).id
db_api.raw_template_delete(other_ctx, tmpl_id)
repl = db_api.resource_create_replacement(
self.ctx,
orig.id,
{'status_reason': 'test replacement'},
{'name': orig.name, 'replaces': orig.id,
'stack_id': orig.stack_id, 'current_template_id': tmpl_id},
1, None)
self.assertIsNone(repl)
def test_resource_create_replacement_updated(self):
orig = create_resource(self.ctx, self.stack)
other_ctx = utils.dummy_context()
tmpl_id = create_raw_template(self.ctx).id
db_api.resource_update_and_save(other_ctx, orig.id, {'atomic_key': 2})
self.assertRaises(exception.UpdateInProgress,
db_api.resource_create_replacement,
self.ctx,
orig.id,
{'status_reason': 'test replacement'},
{'name': orig.name, 'replaces': orig.id,
'stack_id': orig.stack_id,
'current_template_id': tmpl_id},
1, None)
def test_resource_create_replacement_updated_concurrent(self):
orig = create_resource(self.ctx, self.stack)
other_ctx = utils.dummy_context()
tmpl_id = create_raw_template(self.ctx).id
def update_atomic_key(*args, **kwargs):
db_api.resource_update_and_save(other_ctx, orig.id,
{'atomic_key': 2})
self.patchobject(db_api, 'resource_update',
new=mock.Mock(wraps=db_api.resource_update,
side_effect=update_atomic_key))
self.assertRaises(exception.UpdateInProgress,
db_api.resource_create_replacement,
self.ctx,
orig.id,
{'status_reason': 'test replacement'},
{'name': orig.name, 'replaces': orig.id,
'stack_id': orig.stack_id,
'current_template_id': tmpl_id},
1, None)
def test_resource_create_replacement_locked(self):
orig = create_resource(self.ctx, self.stack)
other_ctx = utils.dummy_context()
tmpl_id = create_raw_template(self.ctx).id
db_api.resource_update_and_save(other_ctx, orig.id, {'engine_id': 'a',
'atomic_key': 2})
self.assertRaises(exception.UpdateInProgress,
db_api.resource_create_replacement,
self.ctx,
orig.id,
{'status_reason': 'test replacement'},
{'name': orig.name, 'replaces': orig.id,
'stack_id': orig.stack_id,
'current_template_id': tmpl_id},
1, None)
def test_resource_create_replacement_locked_concurrent(self):
orig = create_resource(self.ctx, self.stack)
other_ctx = utils.dummy_context()
tmpl_id = create_raw_template(self.ctx).id
def lock_resource(*args, **kwargs):
db_api.resource_update_and_save(other_ctx, orig.id,
{'engine_id': 'a',
'atomic_key': 2})
self.patchobject(db_api, 'resource_update',
new=mock.Mock(wraps=db_api.resource_update,
side_effect=lock_resource))
self.assertRaises(exception.UpdateInProgress,
db_api.resource_create_replacement,
self.ctx,
orig.id,
{'status_reason': 'test replacement'},
{'name': orig.name, 'replaces': orig.id,
'stack_id': orig.stack_id,
'current_template_id': tmpl_id},
1, None)
class DBAPIStackLockTest(common.HeatTestCase):
def setUp(self):
super(DBAPIStackLockTest, self).setUp()

@ -15,6 +15,7 @@ import random
import string
import uuid
import fixtures
import mox
from oslo_config import cfg
from oslo_db import options
@ -187,3 +188,15 @@ class JsonEquals(mox.Comparator):
def __repr__(self):
return "<equals to json '%s'>" % self.other_json
class ForeignKeyConstraintFixture(fixtures.Fixture):
def __init__(self, sqlite_fk=True):
self.enable_fkc = sqlite_fk
def _setUp(self):
new_context = db_api.db_context.make_new_manager()
new_context.configure(sqlite_fk=self.enable_fkc)
self.useFixture(fixtures.MockPatchObject(db_api, '_facade', None))
self.addCleanup(db_api.db_context.patch_factory(new_context._factory))

Loading…
Cancel
Save