Merge "Convergence: Concurrency subtle issues"

This commit is contained in:
Jenkins 2015-11-27 08:26:42 +00:00 committed by Gerrit Code Review
commit 20663fbfbe
7 changed files with 218 additions and 84 deletions

View File

@ -503,7 +503,7 @@ class Stack(collections.Mapping):
return stack
@profiler.trace('Stack.store', hide_args=False)
def store(self, backup=False):
def store(self, backup=False, exp_trvsl=None):
"""Store the stack in the database and return its ID.
If self.id is set, we update the existing stack.
@ -519,7 +519,19 @@ class Stack(collections.Mapping):
s['raw_template_id'] = self.t.id
if self.id:
stack_object.Stack.update_by_id(self.context, self.id, s)
if exp_trvsl is None:
exp_trvsl = self.current_traversal
if self.convergence:
# do things differently for convergence
updated = stack_object.Stack.select_and_update(
self.context, self.id, s, exp_trvsl=exp_trvsl)
if not updated:
return None
else:
stack_object.Stack.update_by_id(self.context, self.id, s)
else:
if not self.user_creds_id:
# Create a context containing a trust_id and trustor_user_id
@ -533,6 +545,11 @@ class Stack(collections.Mapping):
s['user_creds_id'] = new_creds.id
self.user_creds_id = new_creds.id
if self.convergence:
# create a traversal ID
self.current_traversal = uuidutils.generate_uuid()
s['current_traversal'] = self.current_traversal
new_s = stack_object.Stack.create(self.context, s)
self.id = new_s.id
self.created_time = new_s.created_at
@ -748,8 +765,16 @@ class Stack(collections.Mapping):
self.CREATE):
# if convergence and stack operation is create/update/delete,
# stack lock is not used, hence persist state
self._persist_state()
return
updated = self._persist_state()
if not updated:
# Possibly failed concurrent update
LOG.warn(_LW("Failed to set state of stack %(name)s with"
" traversal ID %(trvsl_id)s, to"
" %(action)s_%(status)s"),
{'name': self.name,
'trvsl_id': self.current_traversal,
'action': action, 'status': status})
return updated
# Persist state to db only if status == IN_PROGRESS
# or action == self.DELETE/self.ROLLBACK. Else, it would
@ -768,7 +793,14 @@ class Stack(collections.Mapping):
'status': self.status,
'status_reason': self.status_reason}
self._send_notification_and_add_event()
stack.update_and_save(values)
if self.convergence:
# do things differently for convergence
updated = stack_object.Stack.select_and_update(
self.context, self.id, values,
exp_trvsl=self.current_traversal)
return updated
else:
stack.update_and_save(values)
def _send_notification_and_add_event(self):
notification.send(self)
@ -1023,9 +1055,6 @@ class Stack(collections.Mapping):
self.reset_dependencies()
self._resources = None
previous_traversal = self.current_traversal
self.current_traversal = uuidutils.generate_uuid()
if action is not self.CREATE:
self.updated_time = oslo_timeutils.utcnow()
@ -1041,15 +1070,27 @@ class Stack(collections.Mapping):
else:
stack_tag_object.StackTagList.delete(self.context, self.id)
self.store()
self.action = action
self.status = self.IN_PROGRESS
self.status_reason = 'Stack %s started' % self.action
# generate new traversal and store
previous_traversal = self.current_traversal
self.current_traversal = uuidutils.generate_uuid()
# we expect to update the stack having previous traversal ID
stack_id = self.store(exp_trvsl=previous_traversal)
if stack_id is None:
LOG.warn(_LW("Failed to store stack %(name)s with traversal ID"
" %(trvsl_id)s, aborting stack %(action)s"),
{'name': self.name, 'trvsl_id': previous_traversal,
'action': self.action})
return
# delete the prev traversal sync_points
if previous_traversal:
sync_point.delete_all(self.context, self.id, previous_traversal)
# TODO(later): lifecycle_plugin_utils.do_pre_ops
self.state_set(action, self.IN_PROGRESS,
'Stack %s started' % action)
self._converge_create_or_update()
@ -1061,7 +1102,14 @@ class Stack(collections.Mapping):
self.current_deps = {
'edges': [[rqr, rqd] for rqr, rqd in
self.convergence_dependencies.graph().edges()]}
self.store()
stack_id = self.store()
if stack_id is None:
# Failed concurrent update
LOG.warn(_LW("Failed to store stack %(name)s with traversal ID"
" %(trvsl_id)s, aborting stack %(action)s"),
{'name': self.name, 'trvsl_id': self.current_traversal,
'action': self.action})
return
LOG.info(_LI('convergence_dependencies: %s'),
self.convergence_dependencies)
@ -1077,7 +1125,7 @@ class Stack(collections.Mapping):
leaves = set(self.convergence_dependencies.leaves())
if not any(leaves):
self.mark_complete(self.current_traversal)
self.mark_complete()
else:
for rsrc_id, is_update in self.convergence_dependencies.leaves():
if is_update:
@ -1099,7 +1147,14 @@ class Stack(collections.Mapping):
else:
rollback_tmpl = tmpl.Template.load(self.context, old_tmpl_id)
self.prev_raw_template_id = None
self.store()
stack_id = self.store()
if stack_id is None:
# Failed concurrent update
LOG.warn(_LW("Failed to store stack %(name)s with traversal ID"
" %(trvsl_id)s, not trigerring rollback."),
{'name': self.name,
'trvsl_id': self.current_traversal})
return
self.converge_stack(rollback_tmpl, action=self.ROLLBACK)
@ -1758,21 +1813,23 @@ class Stack(collections.Mapping):
attrs = self.cache_data.get(resource_name, {}).get('attributes', {})
return attrs
def mark_complete(self, traversal_id):
def mark_complete(self):
"""Mark the update as complete.
This currently occurs when all resources have been updated; there may
still be resources being cleaned up, but the Stack should now be in
service.
"""
if traversal_id != self.current_traversal:
return
LOG.info(_LI('[%(name)s(%(id)s)] update traversal %(tid)s complete'),
{'name': self.name, 'id': self.id, 'tid': traversal_id})
{'name': self.name, 'id': self.id,
'tid': self.current_traversal})
reason = 'Stack %s completed successfully' % self.action
self.state_set(self.action, self.COMPLETE, reason)
updated = self.state_set(self.action, self.COMPLETE, reason)
if not updated:
return
self.purge_db()
def purge_db(self):
@ -1787,7 +1844,14 @@ class Stack(collections.Mapping):
self.status != self.FAILED):
prev_tmpl_id = self.prev_raw_template_id
self.prev_raw_template_id = None
self.store()
stack_id = self.store()
if stack_id is None:
# Failed concurrent update
LOG.warn(_LW("Failed to store stack %(name)s with traversal ID"
" %(trvsl_id)s, aborting stack purge"),
{'name': self.name,
'trvsl_id': self.current_traversal})
return
raw_template_object.RawTemplate.delete(self.context, prev_tmpl_id)
sync_point.delete_all(self.context, self.id, self.current_traversal)

View File

@ -110,7 +110,9 @@ class WorkerService(service.Service):
stack.rollback()
def _handle_failure(self, cnxt, stack, failure_reason):
stack.state_set(stack.action, stack.FAILED, failure_reason)
updated = stack.state_set(stack.action, stack.FAILED, failure_reason)
if not updated:
return
if (not stack.disable_rollback and
stack.action in (stack.CREATE, stack.ADOPT, stack.UPDATE)):
@ -118,13 +120,8 @@ class WorkerService(service.Service):
else:
stack.purge_db()
def _handle_resource_failure(self, cnxt, stack_id, traversal_id,
failure_reason):
def _handle_resource_failure(self, cnxt, stack_id, failure_reason):
stack = parser.Stack.load(cnxt, stack_id=stack_id)
# make sure no new stack operation was triggered
if stack.current_traversal != traversal_id:
return
self._handle_failure(cnxt, stack, failure_reason)
def _handle_stack_timeout(self, cnxt, stack):
@ -183,8 +180,7 @@ class WorkerService(service.Service):
except exception.ResourceFailure as ex:
reason = 'Resource %s failed: %s' % (rsrc.action,
six.text_type(ex))
self._handle_resource_failure(
cnxt, stack.id, current_traversal, reason)
self._handle_resource_failure(cnxt, stack.id, reason)
except scheduler.Timeout:
# reload the stack to verify current traversal
stack = parser.Stack.load(cnxt, stack_id=stack.id)
@ -345,7 +341,7 @@ def check_stack_complete(cnxt, stack, current_traversal, sender_id, deps,
return
def mark_complete(stack_id, data):
stack.mark_complete(current_traversal)
stack.mark_complete()
sender_key = (sender_id, is_update)
sync_point.sync(cnxt, stack.id, current_traversal, True,

View File

@ -151,6 +151,9 @@ class Stack(
def select_and_update(cls, context, stack_id, values, exp_trvsl=None):
"""Update the stack by selecting on traversal ID.
Uses UPDATE ... WHERE (compare and swap) to catch any concurrent
update problem.
If the stack is found with given traversal, it is updated.
If there occurs a race while updating, only one will succeed and

View File

@ -78,7 +78,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_env.assert_called_once_with(params)
mock_stack.assert_called_once_with(
self.ctx, stk.name, stk.t,
convergence=False, current_traversal=None,
convergence=False,
current_traversal=old_stack.current_traversal,
prev_raw_template_id=None,
current_deps=None,
disable_rollback=True,
@ -306,7 +307,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_load.assert_called_once_with(self.ctx, stack=s)
mock_stack.assert_called_once_with(
self.ctx, stk.name, stk.t,
convergence=False, current_traversal=None,
convergence=False,
current_traversal=old_stack.current_traversal,
prev_raw_template_id=None, current_deps=None,
disable_rollback=False, nested_depth=0,
owner_id=None, parent_resource=None,
@ -418,7 +420,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_env.assert_called_once_with(params)
mock_stack.assert_called_once_with(
self.ctx, stk.name, stk.t,
convergence=False, current_traversal=None,
convergence=False,
current_traversal=old_stack.current_traversal,
prev_raw_template_id=None, current_deps=None,
disable_rollback=True, nested_depth=0,
owner_id=None, parent_resource=None,
@ -533,7 +536,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_env.assert_called_once_with(params)
mock_stack.assert_called_once_with(
self.ctx, stk.name, stk.t,
convergence=False, current_traversal=None,
convergence=False,
current_traversal=old_stack.current_traversal,
prev_raw_template_id=None, current_deps=None,
disable_rollback=True, nested_depth=0,
owner_id=None, parent_resource=None,
@ -595,7 +599,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_env.assert_called_once_with(params)
mock_stack.assert_called_once_with(
self.ctx, stk.name, stk.t,
convergence=False, current_traversal=None,
convergence=False, current_traversal=stk.current_traversal,
prev_raw_template_id=None, current_deps=None,
disable_rollback=True, nested_depth=0,
owner_id=None, parent_resource=None,
@ -731,11 +735,11 @@ resources:
# assertions
mock_stack.assert_called_once_with(
self.ctx, stk.name, stk.t, convergence=False,
current_traversal=None, prev_raw_template_id=None,
current_deps=None, disable_rollback=True,
nested_depth=0, owner_id=None, parent_resource=None,
stack_user_project_id='1234', strict_validate=True,
tenant_id='test_tenant_id', timeout_mins=60,
current_traversal=old_stack.current_traversal,
prev_raw_template_id=None, current_deps=None,
disable_rollback=True, nested_depth=0, owner_id=None,
parent_resource=None, stack_user_project_id='1234',
strict_validate=True, tenant_id='test_tenant_id', timeout_mins=60,
user_creds_id=u'1', username='test_username')
mock_load.assert_called_once_with(self.ctx, stack=s)
mock_tmpl.assert_called_once_with(new_template, files=None,

View File

@ -278,6 +278,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
def test_resource_update_failure_triggers_rollback_if_enabled(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
cfg.CONF.set_default('convergence_engine', True)
self.stack.disable_rollback = False
self.stack.store()
self.worker._trigger_rollback = mock.Mock()
@ -296,6 +297,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
def test_resource_cleanup_failure_triggers_rollback_if_enabled(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
cfg.CONF.set_default('convergence_engine', True)
self.is_update = False # invokes check_resource_cleanup
self.stack.disable_rollback = False
self.stack.store()
@ -345,6 +347,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
def test_resource_update_failure_purges_db_for_stack_failure(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
cfg.CONF.set_default('convergence_engine', True)
self.stack.disable_rollback = True
self.stack.store()
self.stack.purge_db = mock.Mock()
@ -359,6 +362,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
def test_resource_cleanup_failure_purges_db_for_stack_failure(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
cfg.CONF.set_default('convergence_engine', True)
self.is_update = False
self.stack.disable_rollback = True
self.stack.store()
@ -415,6 +419,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
@mock.patch.object(stack.Stack, 'purge_db')
def test_handle_failure(self, mock_purgedb, mock_cru, mock_crc, mock_pcr,
mock_csc, mock_cid):
cfg.CONF.set_default('convergence_engine', True)
self.worker._handle_failure(self.ctx, self.stack, 'dummy-reason')
mock_purgedb.assert_called_once_with()
self.assertEqual('dummy-reason', self.stack.status_reason)
@ -426,6 +431,19 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self.worker._handle_failure(self.ctx, self.stack, 'dummy-reason')
self.worker._trigger_rollback.assert_called_once_with(self.stack)
@mock.patch.object(stack.Stack, 'purge_db')
@mock.patch.object(stack.Stack, 'state_set')
def test_handle_failure_when_update_fails(self, mock_ss, mock_pdb,
mock_cru, mock_crc, mock_pcr,
mock_csc, mock_cid):
self.worker._trigger_rollback = mock.Mock()
# Emulate failure
mock_ss.return_value = False
self.worker._handle_failure(self.ctx, self.stack, 'dummy-reason')
self.assertTrue(mock_ss.called)
self.assertFalse(mock_pdb.called)
self.assertFalse(self.worker._trigger_rollback.called)
def test_handle_stack_timeout(self, mock_cru, mock_crc, mock_pcr,
mock_csc, mock_cid):
self.worker._handle_failure = mock.Mock()

View File

@ -44,7 +44,7 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
stack.store()
stack.converge_stack(template=stack.t, action=stack.CREATE)
self.assertFalse(mock_cr.called)
mock_mc.assert_called_once_with(stack.current_traversal)
mock_mc.assert_called_once_with()
def test_conv_wordpress_single_instance_stack_create(self, mock_cr):
stack = tools.get_stack('test_stack', utils.dummy_context(),
@ -366,7 +366,7 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
convergence=True)
stack.store()
stack.purge_db = mock.Mock()
stack.mark_complete(stack.current_traversal)
stack.mark_complete()
self.assertTrue(stack.purge_db.called)
@mock.patch.object(raw_template_object.RawTemplate, 'delete')

View File

@ -341,7 +341,7 @@ class StackTest(common.HeatTestCase):
use_stored_context=False,
username=mox.IgnoreArg(),
convergence=False,
current_traversal=None,
current_traversal=self.stack.current_traversal,
tags=mox.IgnoreArg(),
prev_raw_template_id=None,
current_deps=None, cache_data=None)
@ -2272,23 +2272,20 @@ class StackTest(common.HeatTestCase):
}
})
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl)
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl, convergence=True)
tmpl_stack.store()
tmpl_stack.action = tmpl_stack.CREATE
tmpl_stack.status = tmpl_stack.IN_PROGRESS
tmpl_stack.current_traversal = 'some-traversal'
tmpl_stack.mark_complete('some-traversal')
tmpl_stack.mark_complete()
self.assertEqual(tmpl_stack.prev_raw_template_id,
None)
self.assertFalse(mock_tmpl_delete.called)
self.assertFalse(mock_stack_delete.called)
self.assertEqual(tmpl_stack.status, tmpl_stack.COMPLETE)
@mock.patch.object(stack_object.Stack, 'delete')
@mock.patch.object(raw_template_object.RawTemplate, 'delete')
@mock.patch.object(stack.Stack, 'store')
def test_mark_complete_update(self, mock_store, mock_tmpl_delete,
mock_stack_delete):
@mock.patch.object(stack.Stack, 'purge_db')
def test_mark_complete_update(self, mock_purge_db):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
@ -2296,49 +2293,35 @@ class StackTest(common.HeatTestCase):
}
})
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl)
tmpl_stack.id = 2
tmpl_stack.t.id = 2
cfg.CONF.set_default('convergence_engine', True)
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl, convergence=True)
tmpl_stack.prev_raw_template_id = 1
tmpl_stack.action = tmpl_stack.UPDATE
tmpl_stack.status = tmpl_stack.IN_PROGRESS
tmpl_stack.current_traversal = 'some-traversal'
tmpl_stack.mark_complete('some-traversal')
self.assertEqual(tmpl_stack.prev_raw_template_id,
None)
self.assertFalse(mock_stack_delete.called)
mock_tmpl_delete.assert_called_once_with(self.ctx, 1)
self.assertEqual(tmpl_stack.status, tmpl_stack.COMPLETE)
tmpl_stack.store()
tmpl_stack.mark_complete()
self.assertTrue(mock_purge_db.called)
@mock.patch.object(stack_object.Stack, 'delete')
@mock.patch.object(raw_template_object.RawTemplate, 'delete')
@mock.patch.object(stack.Stack, 'store')
def test_mark_complete_update_delete(self, mock_store, mock_tmpl_delete,
mock_stack_delete):
@mock.patch.object(stack.Stack, 'purge_db')
def test_mark_complete_update_delete(self, mock_purge_db):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Description': 'Empty Template'
})
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl)
tmpl_stack.id = 2
tmpl_stack.t.id = 2
cfg.CONF.set_default('convergence_engine', True)
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl, convergence=True)
tmpl_stack.prev_raw_template_id = 1
tmpl_stack.action = tmpl_stack.DELETE
tmpl_stack.status = tmpl_stack.IN_PROGRESS
tmpl_stack.current_traversal = 'some-traversal'
tmpl_stack.mark_complete('some-traversal')
self.assertEqual(tmpl_stack.prev_raw_template_id,
None)
mock_tmpl_delete.assert_called_once_with(self.ctx, 1)
mock_stack_delete.assert_called_once_with(self.ctx, 2)
self.assertEqual(tmpl_stack.status, tmpl_stack.COMPLETE)
tmpl_stack.store()
tmpl_stack.mark_complete()
self.assertTrue(mock_purge_db.called)
@mock.patch.object(stack_object.Stack, 'delete')
@mock.patch.object(raw_template_object.RawTemplate, 'delete')
@mock.patch.object(stack.Stack, 'store')
def test_mark_complete_stale_traversal(self, mock_store, mock_tmpl_delete,
mock_stack_delete):
@mock.patch.object(stack.Stack, 'purge_db')
def test_mark_complete_stale_traversal(self, mock_purge_db):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
@ -2347,12 +2330,11 @@ class StackTest(common.HeatTestCase):
})
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl)
tmpl_stack.current_traversal = 'new-traversal'
tmpl_stack.mark_complete('old-traversal')
self.assertFalse(mock_tmpl_delete.called)
self.assertFalse(mock_stack_delete.called)
self.assertIsNone(tmpl_stack.prev_raw_template_id)
self.assertFalse(mock_store.called)
tmpl_stack.store()
# emulate stale traversal
tmpl_stack.current_traversal = 'old-traversal'
tmpl_stack.mark_complete()
self.assertFalse(mock_purge_db.called)
@mock.patch.object(function, 'validate')
def test_validate_assertion_exception_rethrow(self, func_val):
@ -2435,6 +2417,73 @@ class StackTest(common.HeatTestCase):
exc = stack.ForcedCancel(with_rollback=False)
self.update_exception_handler(exc, disable_rollback=True)
def test_store_generates_new_traversal_id_for_new_stack(self):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
'foo': {'Type': 'GenericResourceType'}
}
})
self.stack = stack.Stack(utils.dummy_context(),
'test_stack', tmpl, convergence=True)
self.assertIsNone(self.stack.current_traversal)
self.stack.store()
self.assertIsNotNone(self.stack.current_traversal)
@mock.patch.object(stack_object.Stack, 'select_and_update')
def test_store_uses_traversal_id_for_updating_db(self, mock_sau):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
'foo': {'Type': 'GenericResourceType'}
}
})
self.stack = stack.Stack(utils.dummy_context(),
'test_stack', tmpl, convergence=True)
mock_sau.return_value = True
self.stack.id = 1
self.stack.current_traversal = 1
stack_id = self.stack.store()
mock_sau.assert_called_once_with(mock.ANY, 1, mock.ANY, exp_trvsl=1)
self.assertEqual(1, stack_id)
# ensure store uses given expected traversal ID
stack_id = self.stack.store(exp_trvsl=2)
self.assertEqual(1, stack_id)
mock_sau.assert_called_with(mock.ANY, 1, mock.ANY, exp_trvsl=2)
@mock.patch.object(stack_object.Stack, 'select_and_update')
def test_store_db_update_failure(self, mock_sau):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
'foo': {'Type': 'GenericResourceType'}
}
})
self.stack = stack.Stack(utils.dummy_context(),
'test_stack', tmpl, convergence=True)
mock_sau.return_value = False
self.stack.id = 1
stack_id = self.stack.store()
self.assertIsNone(stack_id)
@mock.patch.object(stack_object.Stack, 'select_and_update')
def test_state_set_uses_curr_traversal_for_updating_db(self, mock_sau):
tmpl = template.Template({
'HeatTemplateFormatVersion': '2012-12-12',
'Resources': {
'foo': {'Type': 'GenericResourceType'}
}
})
self.stack = stack.Stack(utils.dummy_context(),
'test_stack', tmpl, convergence=True)
self.stack.id = 1
self.stack.current_traversal = 'curr-traversal'
self.stack.store()
self.stack.state_set(self.stack.UPDATE, self.stack.IN_PROGRESS, '')
mock_sau.assert_called_once_with(mock.ANY, 1, mock.ANY,
exp_trvsl='curr-traversal')
class StackKwargsForCloningTest(common.HeatTestCase):
scenarios = [