Merge "Refactor deferral of stack state persistence"
This commit is contained in:
commit
8115054173
|
@ -180,10 +180,7 @@ class ThreadGroupManager(object):
|
||||||
Persist the stack state to COMPLETE and FAILED close to
|
Persist the stack state to COMPLETE and FAILED close to
|
||||||
releasing the lock to avoid race conditions.
|
releasing the lock to avoid race conditions.
|
||||||
"""
|
"""
|
||||||
if (stack is not None and stack.status != stack.IN_PROGRESS
|
if stack is not None and stack.defer_state_persist():
|
||||||
and stack.action not in (stack.DELETE,
|
|
||||||
stack.ROLLBACK,
|
|
||||||
stack.UPDATE)):
|
|
||||||
stack.persist_state_and_release_lock(lock.engine_id)
|
stack.persist_state_and_release_lock(lock.engine_id)
|
||||||
|
|
||||||
notify = kwargs.get('notify')
|
notify = kwargs.get('notify')
|
||||||
|
|
|
@ -957,6 +957,29 @@ class Stack(collections.Mapping):
|
||||||
self.env.get_event_sinks(),
|
self.env.get_event_sinks(),
|
||||||
ev.as_dict())
|
ev.as_dict())
|
||||||
|
|
||||||
|
def defer_state_persist(self):
|
||||||
|
"""Return whether to defer persisting the state.
|
||||||
|
|
||||||
|
If persistence is deferred, the new state will not be written to the
|
||||||
|
database until the stack lock is released (by calling
|
||||||
|
persist_state_and_release_lock()). This prevents races in the legacy
|
||||||
|
path where an observer sees the stack COMPLETE but an engine still
|
||||||
|
holds the lock.
|
||||||
|
"""
|
||||||
|
if self.status == self.IN_PROGRESS:
|
||||||
|
# Always persist IN_PROGRESS immediately
|
||||||
|
return False
|
||||||
|
|
||||||
|
if (self.convergence and
|
||||||
|
self.action in {self.UPDATE, self.DELETE, self.CREATE,
|
||||||
|
self.ADOPT, self.ROLLBACK, self.RESTORE}):
|
||||||
|
# These operations do not use the stack lock in convergence, so
|
||||||
|
# never defer.
|
||||||
|
return False
|
||||||
|
|
||||||
|
return self.action not in {self.UPDATE, self.DELETE, self.ROLLBACK,
|
||||||
|
self.RESTORE}
|
||||||
|
|
||||||
@profiler.trace('Stack.state_set', hide_args=False)
|
@profiler.trace('Stack.state_set', hide_args=False)
|
||||||
def state_set(self, action, status, reason):
|
def state_set(self, action, status, reason):
|
||||||
"""Update the stack state."""
|
"""Update the stack state."""
|
||||||
|
@ -971,13 +994,9 @@ class Stack(collections.Mapping):
|
||||||
self.status_reason = reason
|
self.status_reason = reason
|
||||||
self._log_status()
|
self._log_status()
|
||||||
|
|
||||||
if self.convergence and action in (
|
if not self.defer_state_persist():
|
||||||
self.UPDATE, self.DELETE, self.CREATE,
|
|
||||||
self.ADOPT, self.ROLLBACK, self.RESTORE):
|
|
||||||
# if convergence and stack operation is create/update/rollback/
|
|
||||||
# delete, stack lock is not used, hence persist state
|
|
||||||
updated = self._persist_state()
|
updated = self._persist_state()
|
||||||
if not updated:
|
if self.convergence and not updated:
|
||||||
LOG.info("Stack %(name)s traversal %(trvsl_id)s no longer "
|
LOG.info("Stack %(name)s traversal %(trvsl_id)s no longer "
|
||||||
"active; not setting state to %(action)s_%(status)s",
|
"active; not setting state to %(action)s_%(status)s",
|
||||||
{'name': self.name,
|
{'name': self.name,
|
||||||
|
@ -985,13 +1004,6 @@ class Stack(collections.Mapping):
|
||||||
'action': action, 'status': status})
|
'action': action, 'status': status})
|
||||||
return updated
|
return updated
|
||||||
|
|
||||||
# Persist state to db only if status == IN_PROGRESS
|
|
||||||
# or action == UPDATE/DELETE/ROLLBACK. Else, it would
|
|
||||||
# be done before releasing the stack lock.
|
|
||||||
if status == self.IN_PROGRESS or action in (
|
|
||||||
self.UPDATE, self.DELETE, self.ROLLBACK, self.RESTORE):
|
|
||||||
self._persist_state()
|
|
||||||
|
|
||||||
def _log_status(self):
|
def _log_status(self):
|
||||||
LOG.info('Stack %(action)s %(status)s (%(name)s): %(reason)s',
|
LOG.info('Stack %(action)s %(status)s (%(name)s): %(reason)s',
|
||||||
{'action': self.action,
|
{'action': self.action,
|
||||||
|
@ -1148,8 +1160,10 @@ class Stack(collections.Mapping):
|
||||||
'Failed stack pre-ops: %s' % six.text_type(e))
|
'Failed stack pre-ops: %s' % six.text_type(e))
|
||||||
if callable(post_func):
|
if callable(post_func):
|
||||||
post_func()
|
post_func()
|
||||||
|
if notify is not None:
|
||||||
# No need to call notify.signal(), because persistence of the
|
# No need to call notify.signal(), because persistence of the
|
||||||
# state is always deferred here.
|
# state is always deferred here.
|
||||||
|
assert self.defer_state_persist()
|
||||||
return
|
return
|
||||||
self.state_set(action, self.IN_PROGRESS,
|
self.state_set(action, self.IN_PROGRESS,
|
||||||
'Stack %s started' % action)
|
'Stack %s started' % action)
|
||||||
|
|
|
@ -587,38 +587,31 @@ class TestConvgStackStateSet(common.HeatTestCase):
|
||||||
|
|
||||||
def test_state_set_stack_suspend(self, mock_ps):
|
def test_state_set_stack_suspend(self, mock_ps):
|
||||||
mock_ps.return_value = 'updated'
|
mock_ps.return_value = 'updated'
|
||||||
ret_val = self.stack.state_set(
|
self.stack.state_set(self.stack.SUSPEND, self.stack.IN_PROGRESS,
|
||||||
self.stack.SUSPEND, self.stack.IN_PROGRESS, 'Suspend started')
|
'Suspend started')
|
||||||
self.assertTrue(mock_ps.called)
|
self.assertTrue(mock_ps.called)
|
||||||
# Ensure that state_set returns None for other actions in convergence
|
|
||||||
self.assertIsNone(ret_val)
|
|
||||||
mock_ps.reset_mock()
|
mock_ps.reset_mock()
|
||||||
ret_val = self.stack.state_set(
|
self.stack.state_set(self.stack.SUSPEND, self.stack.COMPLETE,
|
||||||
self.stack.SUSPEND, self.stack.COMPLETE, 'Suspend complete')
|
'Suspend complete')
|
||||||
self.assertFalse(mock_ps.called)
|
self.assertFalse(mock_ps.called)
|
||||||
self.assertIsNone(ret_val)
|
|
||||||
|
|
||||||
def test_state_set_stack_resume(self, mock_ps):
|
def test_state_set_stack_resume(self, mock_ps):
|
||||||
ret_val = self.stack.state_set(
|
self.stack.state_set(self.stack.RESUME, self.stack.IN_PROGRESS,
|
||||||
self.stack.RESUME, self.stack.IN_PROGRESS, 'Resume started')
|
'Resume started')
|
||||||
self.assertTrue(mock_ps.called)
|
self.assertTrue(mock_ps.called)
|
||||||
self.assertIsNone(ret_val)
|
|
||||||
mock_ps.reset_mock()
|
mock_ps.reset_mock()
|
||||||
ret_val = self.stack.state_set(self.stack.RESUME, self.stack.COMPLETE,
|
self.stack.state_set(self.stack.RESUME, self.stack.COMPLETE,
|
||||||
'Resume complete')
|
'Resume complete')
|
||||||
self.assertFalse(mock_ps.called)
|
self.assertFalse(mock_ps.called)
|
||||||
self.assertIsNone(ret_val)
|
|
||||||
|
|
||||||
def test_state_set_stack_snapshot(self, mock_ps):
|
def test_state_set_stack_snapshot(self, mock_ps):
|
||||||
ret_val = self.stack.state_set(
|
self.stack.state_set(self.stack.SNAPSHOT, self.stack.IN_PROGRESS,
|
||||||
self.stack.SNAPSHOT, self.stack.IN_PROGRESS, 'Snapshot started')
|
'Snapshot started')
|
||||||
self.assertTrue(mock_ps.called)
|
self.assertTrue(mock_ps.called)
|
||||||
self.assertIsNone(ret_val)
|
|
||||||
mock_ps.reset_mock()
|
mock_ps.reset_mock()
|
||||||
ret_val = self.stack.state_set(
|
self.stack.state_set(self.stack.SNAPSHOT, self.stack.COMPLETE,
|
||||||
self.stack.SNAPSHOT, self.stack.COMPLETE, 'Snapshot complete')
|
'Snapshot complete')
|
||||||
self.assertFalse(mock_ps.called)
|
self.assertFalse(mock_ps.called)
|
||||||
self.assertIsNone(ret_val)
|
|
||||||
|
|
||||||
def test_state_set_stack_restore(self, mock_ps):
|
def test_state_set_stack_restore(self, mock_ps):
|
||||||
mock_ps.return_value = 'updated'
|
mock_ps.return_value = 'updated'
|
||||||
|
|
|
@ -3091,8 +3091,7 @@ class StackStateSetTest(common.HeatTestCase):
|
||||||
self.assertRaises(ValueError, self.stack.state_set,
|
self.assertRaises(ValueError, self.stack.state_set,
|
||||||
self.action, self.status, 'test')
|
self.action, self.status, 'test')
|
||||||
else:
|
else:
|
||||||
self.assertIsNone(self.stack.state_set(self.action,
|
self.stack.state_set(self.action, self.status, 'test')
|
||||||
self.status, 'test'))
|
|
||||||
self.assertEqual((self.action, self.status), self.stack.state)
|
self.assertEqual((self.action, self.status), self.stack.state)
|
||||||
self.assertEqual('test', self.stack.status_reason)
|
self.assertEqual('test', self.stack.status_reason)
|
||||||
self.assertEqual(self.persist_count, persist_state.call_count)
|
self.assertEqual(self.persist_count, persist_state.call_count)
|
||||||
|
|
Loading…
Reference in New Issue