Eliminate client races in legacy operations

Wait for the legacy stack to move to the IN_PROGRESS state before returning
from the API call in the stack update, suspend, resume, check, and restore
operations.

For the stack delete operation, do the same provided that we can acquire
the stack lock immediately, and thus don't need to wait for existing
operations to be cancelled before we can change the state to IN_PROGRESS.
In other cases there is still a race.

Change-Id: Id94d009d69342f311a00ed3859f4ca8ac6b0af09
Story: #1669608
Task: 23175
(cherry picked from commit 2d2da74593)
This commit is contained in:
Zane Bitter 2018-07-30 20:24:34 -04:00
parent a13476e787
commit 1f08105fbd
7 changed files with 137 additions and 39 deletions

View File

@ -184,6 +184,11 @@ class ThreadGroupManager(object):
stack.ROLLBACK, stack.ROLLBACK,
stack.UPDATE)): stack.UPDATE)):
stack.persist_state_and_release_lock(lock.engine_id) stack.persist_state_and_release_lock(lock.engine_id)
notify = kwargs.get('notify')
if notify is not None:
assert not notify.signalled()
notify.signal()
else: else:
lock.release() lock.release()
@ -243,6 +248,38 @@ class ThreadGroupManager(object):
msg_queue.put_nowait(message) msg_queue.put_nowait(message)
class NotifyEvent(object):
def __init__(self):
self._queue = eventlet.queue.LightQueue(1)
self._signalled = False
def signalled(self):
return self._signalled
def signal(self):
"""Signal the event."""
if self._signalled:
return
self._signalled = True
self._queue.put(None)
# Yield control so that the waiting greenthread will get the message
# as soon as possible, so that the API handler can respond to the user.
# Another option would be to set the queue length to 0 (which would
# cause put() to block until the event has been seen, but many unit
# tests run in a single greenthread and would thus deadlock.
eventlet.sleep(0)
def wait(self):
"""Wait for the event."""
try:
# There's no timeout argument to eventlet.event.Event available
# until eventlet 0.22.1, so use a queue.
self._queue.get(timeout=cfg.CONF.rpc_response_timeout)
except eventlet.queue.Empty:
LOG.warning('Timed out waiting for operation to start')
@profiler.trace_cls("rpc") @profiler.trace_cls("rpc")
class EngineListener(object): class EngineListener(object):
"""Listen on an AMQP queue named for the engine. """Listen on an AMQP queue named for the engine.
@ -978,14 +1015,17 @@ class EngineService(service.ServiceBase):
new_stack=updated_stack) new_stack=updated_stack)
else: else:
msg_queue = eventlet.queue.LightQueue() msg_queue = eventlet.queue.LightQueue()
stored_event = NotifyEvent()
th = self.thread_group_mgr.start_with_lock(cnxt, current_stack, th = self.thread_group_mgr.start_with_lock(cnxt, current_stack,
self.engine_id, self.engine_id,
current_stack.update, current_stack.update,
updated_stack, updated_stack,
msg_queue=msg_queue) msg_queue=msg_queue,
notify=stored_event)
th.link(self.thread_group_mgr.remove_msg_queue, th.link(self.thread_group_mgr.remove_msg_queue,
current_stack.id, msg_queue) current_stack.id, msg_queue)
self.thread_group_mgr.add_msg_queue(current_stack.id, msg_queue) self.thread_group_mgr.add_msg_queue(current_stack.id, msg_queue)
stored_event.wait()
return dict(current_stack.identifier()) return dict(current_stack.identifier())
@context.request_context @context.request_context
@ -1371,8 +1411,11 @@ class EngineService(service.ServiceBase):
# Successfully acquired lock # Successfully acquired lock
if acquire_result is None: if acquire_result is None:
self.thread_group_mgr.stop_timers(stack.id) self.thread_group_mgr.stop_timers(stack.id)
stored = NotifyEvent()
self.thread_group_mgr.start_with_acquired_lock(stack, lock, self.thread_group_mgr.start_with_acquired_lock(stack, lock,
stack.delete) stack.delete,
notify=stored)
stored.wait()
return return
# Current engine has the lock # Current engine has the lock
@ -1977,30 +2020,28 @@ class EngineService(service.ServiceBase):
@context.request_context @context.request_context
def stack_suspend(self, cnxt, stack_identity): def stack_suspend(self, cnxt, stack_identity):
"""Handle request to perform suspend action on a stack.""" """Handle request to perform suspend action on a stack."""
def _stack_suspend(stack):
LOG.debug("suspending stack %s", stack.name)
stack.suspend()
s = self._get_stack(cnxt, stack_identity) s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s) stack = parser.Stack.load(cnxt, stack=s)
self.resource_enforcer.enforce_stack(stack, is_registered_policy=True) self.resource_enforcer.enforce_stack(stack, is_registered_policy=True)
stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
_stack_suspend, stack) stack.suspend,
notify=stored_event)
stored_event.wait()
@context.request_context @context.request_context
def stack_resume(self, cnxt, stack_identity): def stack_resume(self, cnxt, stack_identity):
"""Handle request to perform a resume action on a stack.""" """Handle request to perform a resume action on a stack."""
def _stack_resume(stack):
LOG.debug("resuming stack %s", stack.name)
stack.resume()
s = self._get_stack(cnxt, stack_identity) s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s) stack = parser.Stack.load(cnxt, stack=s)
self.resource_enforcer.enforce_stack(stack, is_registered_policy=True) self.resource_enforcer.enforce_stack(stack, is_registered_policy=True)
stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
_stack_resume, stack) stack.resume,
notify=stored_event)
stored_event.wait()
@context.request_context @context.request_context
def stack_snapshot(self, cnxt, stack_identity, name): def stack_snapshot(self, cnxt, stack_identity, name):
@ -2072,15 +2113,13 @@ class EngineService(service.ServiceBase):
stack = parser.Stack.load(cnxt, stack=s) stack = parser.Stack.load(cnxt, stack=s)
LOG.info("Checking stack %s", stack.name) LOG.info("Checking stack %s", stack.name)
stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
stack.check) stack.check, notify=stored_event)
stored_event.wait()
@context.request_context @context.request_context
def stack_restore(self, cnxt, stack_identity, snapshot_id): def stack_restore(self, cnxt, stack_identity, snapshot_id):
def _stack_restore(stack, snapshot):
LOG.debug("restoring stack %s", stack.name)
stack.restore(snapshot)
s = self._get_stack(cnxt, stack_identity) s = self._get_stack(cnxt, stack_identity)
stack = parser.Stack.load(cnxt, stack=s) stack = parser.Stack.load(cnxt, stack=s)
self.resource_enforcer.enforce_stack(stack, is_registered_policy=True) self.resource_enforcer.enforce_stack(stack, is_registered_policy=True)
@ -2096,8 +2135,11 @@ class EngineService(service.ServiceBase):
action=stack.RESTORE, action=stack.RESTORE,
new_stack=new_stack) new_stack=new_stack)
else: else:
stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock( self.thread_group_mgr.start_with_lock(
cnxt, stack, self.engine_id, _stack_restore, stack, snapshot) cnxt, stack, self.engine_id, stack.restore, snapshot,
notify=stored_event)
stored_event.wait()
@context.request_context @context.request_context
def stack_list_snapshots(self, cnxt, stack_identity): def stack_list_snapshots(self, cnxt, stack_identity):

View File

@ -1116,7 +1116,8 @@ class Stack(collections.Mapping):
@scheduler.wrappertask @scheduler.wrappertask
def stack_task(self, action, reverse=False, post_func=None, def stack_task(self, action, reverse=False, post_func=None,
aggregate_exceptions=False, pre_completion_func=None): aggregate_exceptions=False, pre_completion_func=None,
notify=None):
"""A task to perform an action on the stack. """A task to perform an action on the stack.
All of the resources are traversed in forward or reverse dependency All of the resources are traversed in forward or reverse dependency
@ -1140,9 +1141,13 @@ class Stack(collections.Mapping):
'Failed stack pre-ops: %s' % six.text_type(e)) 'Failed stack pre-ops: %s' % six.text_type(e))
if callable(post_func): if callable(post_func):
post_func() post_func()
# No need to call notify.signal(), because persistence of the
# state is always deferred here.
return return
self.state_set(action, self.IN_PROGRESS, self.state_set(action, self.IN_PROGRESS,
'Stack %s started' % action) 'Stack %s started' % action)
if notify is not None:
notify.signal()
stack_status = self.COMPLETE stack_status = self.COMPLETE
reason = 'Stack %s completed successfully' % action reason = 'Stack %s completed successfully' % action
@ -1201,12 +1206,13 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.check', hide_args=False) @profiler.trace('Stack.check', hide_args=False)
@reset_state_on_error @reset_state_on_error
def check(self): def check(self, notify=None):
self.updated_time = oslo_timeutils.utcnow() self.updated_time = oslo_timeutils.utcnow()
checker = scheduler.TaskRunner( checker = scheduler.TaskRunner(
self.stack_task, self.CHECK, self.stack_task, self.CHECK,
post_func=self.supports_check_action, post_func=self.supports_check_action,
aggregate_exceptions=True) aggregate_exceptions=True,
notify=notify)
checker() checker()
def supports_check_action(self): def supports_check_action(self):
@ -1274,7 +1280,7 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.update', hide_args=False) @profiler.trace('Stack.update', hide_args=False)
@reset_state_on_error @reset_state_on_error
def update(self, newstack, msg_queue=None): def update(self, newstack, msg_queue=None, notify=None):
"""Update the stack. """Update the stack.
Compare the current stack with newstack, Compare the current stack with newstack,
@ -1289,7 +1295,7 @@ class Stack(collections.Mapping):
""" """
self.updated_time = oslo_timeutils.utcnow() self.updated_time = oslo_timeutils.utcnow()
updater = scheduler.TaskRunner(self.update_task, newstack, updater = scheduler.TaskRunner(self.update_task, newstack,
msg_queue=msg_queue) msg_queue=msg_queue, notify=notify)
updater() updater()
@profiler.trace('Stack.converge_stack', hide_args=False) @profiler.trace('Stack.converge_stack', hide_args=False)
@ -1535,11 +1541,14 @@ class Stack(collections.Mapping):
self.state_set(self.action, self.FAILED, six.text_type(reason)) self.state_set(self.action, self.FAILED, six.text_type(reason))
@scheduler.wrappertask @scheduler.wrappertask
def update_task(self, newstack, action=UPDATE, msg_queue=None): def update_task(self, newstack, action=UPDATE,
msg_queue=None, notify=None):
if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE): if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):
LOG.error("Unexpected action %s passed to update!", action) LOG.error("Unexpected action %s passed to update!", action)
self.state_set(self.UPDATE, self.FAILED, self.state_set(self.UPDATE, self.FAILED,
"Invalid action %s" % action) "Invalid action %s" % action)
if notify is not None:
notify.signal()
return return
try: try:
@ -1548,6 +1557,8 @@ class Stack(collections.Mapping):
except Exception as e: except Exception as e:
self.state_set(action, self.FAILED, e.args[0] if e.args else self.state_set(action, self.FAILED, e.args[0] if e.args else
'Failed stack pre-ops: %s' % six.text_type(e)) 'Failed stack pre-ops: %s' % six.text_type(e))
if notify is not None:
notify.signal()
return return
if self.status == self.IN_PROGRESS: if self.status == self.IN_PROGRESS:
if action == self.ROLLBACK: if action == self.ROLLBACK:
@ -1556,6 +1567,8 @@ class Stack(collections.Mapping):
reason = _('Attempted to %s an IN_PROGRESS ' reason = _('Attempted to %s an IN_PROGRESS '
'stack') % action 'stack') % action
self.reset_stack_and_resources_in_progress(reason) self.reset_stack_and_resources_in_progress(reason)
if notify is not None:
notify.signal()
return return
# Save a copy of the new template. To avoid two DB writes # Save a copy of the new template. To avoid two DB writes
@ -1569,6 +1582,10 @@ class Stack(collections.Mapping):
self.status_reason = 'Stack %s started' % action self.status_reason = 'Stack %s started' % action
self._send_notification_and_add_event() self._send_notification_and_add_event()
self.store() self.store()
# Notify the caller that the state is stored
if notify is not None:
notify.signal()
if prev_tmpl_id is not None: if prev_tmpl_id is not None:
raw_template_object.RawTemplate.delete(self.context, prev_tmpl_id) raw_template_object.RawTemplate.delete(self.context, prev_tmpl_id)
@ -1836,7 +1853,7 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.delete', hide_args=False) @profiler.trace('Stack.delete', hide_args=False)
@reset_state_on_error @reset_state_on_error
def delete(self, action=DELETE, backup=False, abandon=False): def delete(self, action=DELETE, backup=False, abandon=False, notify=None):
"""Delete all of the resources, and then the stack itself. """Delete all of the resources, and then the stack itself.
The action parameter is used to differentiate between a user The action parameter is used to differentiate between a user
@ -1852,12 +1869,16 @@ class Stack(collections.Mapping):
LOG.error("Unexpected action %s passed to delete!", action) LOG.error("Unexpected action %s passed to delete!", action)
self.state_set(self.DELETE, self.FAILED, self.state_set(self.DELETE, self.FAILED,
"Invalid action %s" % action) "Invalid action %s" % action)
if notify is not None:
notify.signal()
return return
stack_status = self.COMPLETE stack_status = self.COMPLETE
reason = 'Stack %s completed successfully' % action reason = 'Stack %s completed successfully' % action
self.state_set(action, self.IN_PROGRESS, 'Stack %s started' % self.state_set(action, self.IN_PROGRESS, 'Stack %s started' %
action) action)
if notify is not None:
notify.signal()
backup_stack = self._backup_stack(False) backup_stack = self._backup_stack(False)
if backup_stack: if backup_stack:
@ -1921,7 +1942,7 @@ class Stack(collections.Mapping):
@profiler.trace('Stack.suspend', hide_args=False) @profiler.trace('Stack.suspend', hide_args=False)
@reset_state_on_error @reset_state_on_error
def suspend(self): def suspend(self, notify=None):
"""Suspend the stack. """Suspend the stack.
Invokes handle_suspend for all stack resources. Invokes handle_suspend for all stack resources.
@ -1932,6 +1953,7 @@ class Stack(collections.Mapping):
other than move to SUSPEND_COMPLETE, so the resources must implement other than move to SUSPEND_COMPLETE, so the resources must implement
handle_suspend for this to have any effect. handle_suspend for this to have any effect.
""" """
LOG.debug("Suspending stack %s", self)
# No need to suspend if the stack has been suspended # No need to suspend if the stack has been suspended
if self.state == (self.SUSPEND, self.COMPLETE): if self.state == (self.SUSPEND, self.COMPLETE):
LOG.info('%s is already suspended', self) LOG.info('%s is already suspended', self)
@ -1941,12 +1963,13 @@ class Stack(collections.Mapping):
sus_task = scheduler.TaskRunner( sus_task = scheduler.TaskRunner(
self.stack_task, self.stack_task,
action=self.SUSPEND, action=self.SUSPEND,
reverse=True) reverse=True,
notify=notify)
sus_task(timeout=self.timeout_secs()) sus_task(timeout=self.timeout_secs())
@profiler.trace('Stack.resume', hide_args=False) @profiler.trace('Stack.resume', hide_args=False)
@reset_state_on_error @reset_state_on_error
def resume(self): def resume(self, notify=None):
"""Resume the stack. """Resume the stack.
Invokes handle_resume for all stack resources. Invokes handle_resume for all stack resources.
@ -1957,6 +1980,7 @@ class Stack(collections.Mapping):
other than move to RESUME_COMPLETE, so the resources must implement other than move to RESUME_COMPLETE, so the resources must implement
handle_resume for this to have any effect. handle_resume for this to have any effect.
""" """
LOG.debug("Resuming stack %s", self)
# No need to resume if the stack has been resumed # No need to resume if the stack has been resumed
if self.state == (self.RESUME, self.COMPLETE): if self.state == (self.RESUME, self.COMPLETE):
LOG.info('%s is already resumed', self) LOG.info('%s is already resumed', self)
@ -1966,7 +1990,8 @@ class Stack(collections.Mapping):
sus_task = scheduler.TaskRunner( sus_task = scheduler.TaskRunner(
self.stack_task, self.stack_task,
action=self.RESUME, action=self.RESUME,
reverse=False) reverse=False,
notify=notify)
sus_task(timeout=self.timeout_secs()) sus_task(timeout=self.timeout_secs())
@profiler.trace('Stack.snapshot', hide_args=False) @profiler.trace('Stack.snapshot', hide_args=False)
@ -2028,16 +2053,17 @@ class Stack(collections.Mapping):
return newstack, template return newstack, template
@reset_state_on_error @reset_state_on_error
def restore(self, snapshot): def restore(self, snapshot, notify=None):
"""Restore the given snapshot. """Restore the given snapshot.
Invokes handle_restore on all resources. Invokes handle_restore on all resources.
""" """
LOG.debug("Restoring stack %s", self)
self.updated_time = oslo_timeutils.utcnow() self.updated_time = oslo_timeutils.utcnow()
newstack = self.restore_data(snapshot)[0] newstack = self.restore_data(snapshot)[0]
updater = scheduler.TaskRunner(self.update_task, newstack, updater = scheduler.TaskRunner(self.update_task, newstack,
action=self.RESTORE) action=self.RESTORE, notify=notify)
updater() updater()
def get_availability_zones(self): def get_availability_zones(self):

View File

@ -44,12 +44,14 @@ class StackServiceActionsTest(common.HeatTestCase):
thread = mock.MagicMock() thread = mock.MagicMock()
mock_link = self.patchobject(thread, 'link') mock_link = self.patchobject(thread, 'link')
mock_start.return_value = thread mock_start.return_value = thread
self.patchobject(service, 'NotifyEvent')
result = self.man.stack_suspend(self.ctx, stk.identifier()) result = self.man.stack_suspend(self.ctx, stk.identifier())
self.assertIsNone(result) self.assertIsNone(result)
mock_load.assert_called_once_with(self.ctx, stack=s) mock_load.assert_called_once_with(self.ctx, stack=s)
mock_link.assert_called_once_with(mock.ANY) mock_link.assert_called_once_with(mock.ANY)
mock_start.assert_called_once_with(stk.id, mock.ANY, stk) mock_start.assert_called_once_with(stk.id, stk.suspend,
notify=mock.ANY)
stk.delete() stk.delete()
@ -64,13 +66,14 @@ class StackServiceActionsTest(common.HeatTestCase):
thread = mock.MagicMock() thread = mock.MagicMock()
mock_link = self.patchobject(thread, 'link') mock_link = self.patchobject(thread, 'link')
mock_start.return_value = thread mock_start.return_value = thread
self.patchobject(service, 'NotifyEvent')
result = self.man.stack_resume(self.ctx, stk.identifier()) result = self.man.stack_resume(self.ctx, stk.identifier())
self.assertIsNone(result) self.assertIsNone(result)
mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) mock_load.assert_called_once_with(self.ctx, stack=mock.ANY)
mock_link.assert_called_once_with(mock.ANY) mock_link.assert_called_once_with(mock.ANY)
mock_start.assert_called_once_with(stk.id, mock.ANY, stk) mock_start.assert_called_once_with(stk.id, stk.resume, notify=mock.ANY)
stk.delete() stk.delete()
@ -108,6 +111,7 @@ class StackServiceActionsTest(common.HeatTestCase):
stk = utils.parse_stack(t, stack_name=stack_name) stk = utils.parse_stack(t, stack_name=stack_name)
stk.check = mock.Mock() stk.check = mock.Mock()
self.patchobject(service, 'NotifyEvent')
mock_load.return_value = stk mock_load.return_value = stk
mock_start.side_effect = self._mock_thread_start mock_start.side_effect = self._mock_thread_start

View File

@ -12,6 +12,8 @@
# under the License. # under the License.
import mock import mock
from oslo_config import cfg
from oslo_messaging import conffixture
from heat.engine import resource as res from heat.engine import resource as res
from heat.engine.resources.aws.ec2 import instance as instances from heat.engine.resources.aws.ec2 import instance as instances
@ -94,6 +96,7 @@ class StackEventTest(common.HeatTestCase):
@tools.stack_context('service_event_list_deleted_resource') @tools.stack_context('service_event_list_deleted_resource')
@mock.patch.object(instances.Instance, 'handle_delete') @mock.patch.object(instances.Instance, 'handle_delete')
def test_event_list_deleted_resource(self, mock_delete): def test_event_list_deleted_resource(self, mock_delete):
self.useFixture(conffixture.ConfFixture(cfg.CONF))
mock_delete.return_value = None mock_delete.return_value = None
res._register_class('GenericResourceType', res._register_class('GenericResourceType',
@ -103,7 +106,7 @@ class StackEventTest(common.HeatTestCase):
thread.link = mock.Mock(return_value=None) thread.link = mock.Mock(return_value=None)
def run(stack_id, func, *args, **kwargs): def run(stack_id, func, *args, **kwargs):
func(*args) func(*args, **kwargs)
return thread return thread
self.eng.thread_group_mgr.start = run self.eng.thread_group_mgr.start = run

View File

@ -15,6 +15,7 @@ import uuid
import eventlet.queue import eventlet.queue
import mock import mock
from oslo_config import cfg from oslo_config import cfg
from oslo_messaging import conffixture
from oslo_messaging.rpc import dispatcher from oslo_messaging.rpc import dispatcher
import six import six
@ -43,6 +44,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
def setUp(self): def setUp(self):
super(ServiceStackUpdateTest, self).setUp() super(ServiceStackUpdateTest, self).setUp()
self.useFixture(conffixture.ConfFixture(cfg.CONF))
self.ctx = utils.dummy_context() self.ctx = utils.dummy_context()
self.man = service.EngineService('a-host', 'a-topic') self.man = service.EngineService('a-host', 'a-topic')
self.man.thread_group_mgr = tools.DummyThreadGroupManager() self.man.thread_group_mgr = tools.DummyThreadGroupManager()
@ -68,7 +70,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_validate = self.patchobject(stk, 'validate', return_value=None) mock_validate = self.patchobject(stk, 'validate', return_value=None)
msgq_mock = mock.Mock() msgq_mock = mock.Mock()
self.patchobject(eventlet.queue, 'LightQueue', return_value=msgq_mock) self.patchobject(eventlet.queue, 'LightQueue',
side_effect=[msgq_mock, eventlet.queue.LightQueue()])
# do update # do update
api_args = {'timeout_mins': 60, rpc_api.PARAM_CONVERGE: True} api_args = {'timeout_mins': 60, rpc_api.PARAM_CONVERGE: True}
@ -122,7 +125,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
self.patchobject(environment, 'Environment', return_value=stk.env) self.patchobject(environment, 'Environment', return_value=stk.env)
self.patchobject(stk, 'validate', return_value=None) self.patchobject(stk, 'validate', return_value=None)
self.patchobject(eventlet.queue, 'LightQueue', self.patchobject(eventlet.queue, 'LightQueue',
return_value=mock.Mock()) side_effect=[mock.Mock(),
eventlet.queue.LightQueue()])
mock_merge = self.patchobject(env_util, 'merge_environments') mock_merge = self.patchobject(env_util, 'merge_environments')
@ -160,7 +164,8 @@ class ServiceStackUpdateTest(common.HeatTestCase):
mock_validate = self.patchobject(stk, 'validate', return_value=None) mock_validate = self.patchobject(stk, 'validate', return_value=None)
msgq_mock = mock.Mock() msgq_mock = mock.Mock()
self.patchobject(eventlet.queue, 'LightQueue', return_value=msgq_mock) self.patchobject(eventlet.queue, 'LightQueue',
side_effect=[msgq_mock, eventlet.queue.LightQueue()])
# do update # do update
api_args = {'timeout_mins': 60, rpc_api.PARAM_CONVERGE: False} api_args = {'timeout_mins': 60, rpc_api.PARAM_CONVERGE: False}
@ -219,6 +224,7 @@ class ServiceStackUpdateTest(common.HeatTestCase):
t['parameters']['newparam'] = {'type': 'number'} t['parameters']['newparam'] = {'type': 'number'}
with mock.patch('heat.engine.stack.Stack') as mock_stack: with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock() stk.update = mock.Mock()
self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stk mock_stack.load.return_value = stk
mock_stack.validate.return_value = None mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(), result = self.man.update_stack(self.ctx, stk.identifier(),
@ -275,7 +281,8 @@ resources:
rpc_api.PARAM_CONVERGE: False} rpc_api.PARAM_CONVERGE: False}
with mock.patch('heat.engine.stack.Stack') as mock_stack: with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock() loaded_stack.update = mock.Mock()
self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = loaded_stack mock_stack.load.return_value = loaded_stack
mock_stack.validate.return_value = None mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(), result = self.man.update_stack(self.ctx, stk.identifier(),
@ -318,6 +325,7 @@ resources:
t['parameters']['newparam'] = {'type': 'number'} t['parameters']['newparam'] = {'type': 'number'}
with mock.patch('heat.engine.stack.Stack') as mock_stack: with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock() stk.update = mock.Mock()
self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stk mock_stack.load.return_value = stk
mock_stack.validate.return_value = None mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(), result = self.man.update_stack(self.ctx, stk.identifier(),
@ -418,6 +426,7 @@ resources:
'myother.yaml': 'myother'} 'myother.yaml': 'myother'}
with mock.patch('heat.engine.stack.Stack') as mock_stack: with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock() stk.update = mock.Mock()
self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stk mock_stack.load.return_value = stk
mock_stack.validate.return_value = None mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(), result = self.man.update_stack(self.ctx, stk.identifier(),
@ -464,6 +473,7 @@ resources:
'resource_registry': {'resources': {}}} 'resource_registry': {'resources': {}}}
with mock.patch('heat.engine.stack.Stack') as mock_stack: with mock.patch('heat.engine.stack.Stack') as mock_stack:
stk.update = mock.Mock() stk.update = mock.Mock()
self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stk mock_stack.load.return_value = stk
mock_stack.validate.return_value = None mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stk.identifier(), result = self.man.update_stack(self.ctx, stk.identifier(),
@ -864,6 +874,7 @@ resources:
stack.status = stack.COMPLETE stack.status = stack.COMPLETE
with mock.patch('heat.engine.stack.Stack') as mock_stack: with mock.patch('heat.engine.stack.Stack') as mock_stack:
self.patchobject(service, 'NotifyEvent')
mock_stack.load.return_value = stack mock_stack.load.return_value = stack
mock_stack.validate.return_value = None mock_stack.validate.return_value = None
result = self.man.update_stack(self.ctx, stack.identifier(), result = self.man.update_stack(self.ctx, stack.identifier(),

View File

@ -0,0 +1,12 @@
---
fixes:
- |
Previously, the suspend, resume, and check API calls for all stacks, and
the update, restore, and delete API calls for non-convergence stacks,
returned immediately after starting the stack operation. This meant that
for a client reading the state immediately when performing the same
operation twice in a row, it could have misinterpreted a previous state as
the latest unless careful reference were made to the updated_at timestamp.
Stacks are now guaranteed to have moved to the ``IN_PROGRESS`` state before
any of these APIs return (except in the case of deleting a non-convergence
stack where another operation was already in progress).

View File

@ -104,7 +104,7 @@ commands = bandit -r heat -x tests --skip B101,B104,B107,B110,B310,B311,B404,B41
[flake8] [flake8]
show-source = true show-source = true
exclude=.*,dist,*lib/python*,*egg,build,*convergence/scenarios/* exclude=.*,dist,*lib/python*,*egg,build,*convergence/scenarios/*
max-complexity=20 max-complexity=24
[hacking] [hacking]
import_exceptions = heat.common.i18n import_exceptions = heat.common.i18n