diff --git a/heat/engine/service.py b/heat/engine/service.py index 45c3b6fdcb..9dc9c14000 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -184,6 +184,11 @@ class ThreadGroupManager(object): stack.ROLLBACK, stack.UPDATE)): stack.persist_state_and_release_lock(lock.engine_id) + + notify = kwargs.get('notify') + if notify is not None: + assert not notify.signalled() + notify.signal() else: lock.release() @@ -243,6 +248,38 @@ class ThreadGroupManager(object): msg_queue.put_nowait(message) +class NotifyEvent(object): + def __init__(self): + self._queue = eventlet.queue.LightQueue(1) + self._signalled = False + + def signalled(self): + return self._signalled + + def signal(self): + """Signal the event.""" + if self._signalled: + return + self._signalled = True + + self._queue.put(None) + # Yield control so that the waiting greenthread will get the message + # as soon as possible, so that the API handler can respond to the user. + # Another option would be to set the queue length to 0 (which would + # cause put() to block until the event has been seen, but many unit + # tests run in a single greenthread and would thus deadlock. + eventlet.sleep(0) + + def wait(self): + """Wait for the event.""" + try: + # There's no timeout argument to eventlet.event.Event available + # until eventlet 0.22.1, so use a queue. + self._queue.get(timeout=cfg.CONF.rpc_response_timeout) + except eventlet.queue.Empty: + LOG.warning('Timed out waiting for operation to start') + + @profiler.trace_cls("rpc") class EngineListener(object): """Listen on an AMQP queue named for the engine. @@ -978,14 +1015,17 @@ class EngineService(service.ServiceBase): new_stack=updated_stack) else: msg_queue = eventlet.queue.LightQueue() + stored_event = NotifyEvent() th = self.thread_group_mgr.start_with_lock(cnxt, current_stack, self.engine_id, current_stack.update, updated_stack, - msg_queue=msg_queue) + msg_queue=msg_queue, + notify=stored_event) th.link(self.thread_group_mgr.remove_msg_queue, current_stack.id, msg_queue) self.thread_group_mgr.add_msg_queue(current_stack.id, msg_queue) + stored_event.wait() return dict(current_stack.identifier()) @context.request_context @@ -1371,8 +1411,11 @@ class EngineService(service.ServiceBase): # Successfully acquired lock if acquire_result is None: self.thread_group_mgr.stop_timers(stack.id) + stored = NotifyEvent() self.thread_group_mgr.start_with_acquired_lock(stack, lock, - stack.delete) + stack.delete, + notify=stored) + stored.wait() return # Current engine has the lock @@ -1977,30 +2020,28 @@ class EngineService(service.ServiceBase): @context.request_context def stack_suspend(self, cnxt, stack_identity): """Handle request to perform suspend action on a stack.""" - def _stack_suspend(stack): - LOG.debug("suspending stack %s", stack.name) - stack.suspend() - s = self._get_stack(cnxt, stack_identity) stack = parser.Stack.load(cnxt, stack=s) self.resource_enforcer.enforce_stack(stack, is_registered_policy=True) + stored_event = NotifyEvent() self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, - _stack_suspend, stack) + stack.suspend, + notify=stored_event) + stored_event.wait() @context.request_context def stack_resume(self, cnxt, stack_identity): """Handle request to perform a resume action on a stack.""" - def _stack_resume(stack): - LOG.debug("resuming stack %s", stack.name) - stack.resume() - s = self._get_stack(cnxt, stack_identity) stack = parser.Stack.load(cnxt, stack=s) self.resource_enforcer.enforce_stack(stack, is_registered_policy=True) + stored_event = NotifyEvent() self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, - _stack_resume, stack) + stack.resume, + notify=stored_event) + stored_event.wait() @context.request_context def stack_snapshot(self, cnxt, stack_identity, name): @@ -2072,15 +2113,13 @@ class EngineService(service.ServiceBase): stack = parser.Stack.load(cnxt, stack=s) LOG.info("Checking stack %s", stack.name) + stored_event = NotifyEvent() self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, - stack.check) + stack.check, notify=stored_event) + stored_event.wait() @context.request_context def stack_restore(self, cnxt, stack_identity, snapshot_id): - def _stack_restore(stack, snapshot): - LOG.debug("restoring stack %s", stack.name) - stack.restore(snapshot) - s = self._get_stack(cnxt, stack_identity) stack = parser.Stack.load(cnxt, stack=s) self.resource_enforcer.enforce_stack(stack, is_registered_policy=True) @@ -2096,8 +2135,11 @@ class EngineService(service.ServiceBase): action=stack.RESTORE, new_stack=new_stack) else: + stored_event = NotifyEvent() self.thread_group_mgr.start_with_lock( - cnxt, stack, self.engine_id, _stack_restore, stack, snapshot) + cnxt, stack, self.engine_id, stack.restore, snapshot, + notify=stored_event) + stored_event.wait() @context.request_context def stack_list_snapshots(self, cnxt, stack_identity): diff --git a/heat/engine/stack.py b/heat/engine/stack.py index 467a395eb8..1b2bb01667 100644 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -1116,7 +1116,8 @@ class Stack(collections.Mapping): @scheduler.wrappertask def stack_task(self, action, reverse=False, post_func=None, - aggregate_exceptions=False, pre_completion_func=None): + aggregate_exceptions=False, pre_completion_func=None, + notify=None): """A task to perform an action on the stack. All of the resources are traversed in forward or reverse dependency @@ -1140,9 +1141,13 @@ class Stack(collections.Mapping): 'Failed stack pre-ops: %s' % six.text_type(e)) if callable(post_func): post_func() + # No need to call notify.signal(), because persistence of the + # state is always deferred here. return self.state_set(action, self.IN_PROGRESS, 'Stack %s started' % action) + if notify is not None: + notify.signal() stack_status = self.COMPLETE reason = 'Stack %s completed successfully' % action @@ -1201,12 +1206,13 @@ class Stack(collections.Mapping): @profiler.trace('Stack.check', hide_args=False) @reset_state_on_error - def check(self): + def check(self, notify=None): self.updated_time = oslo_timeutils.utcnow() checker = scheduler.TaskRunner( self.stack_task, self.CHECK, post_func=self.supports_check_action, - aggregate_exceptions=True) + aggregate_exceptions=True, + notify=notify) checker() def supports_check_action(self): @@ -1274,7 +1280,7 @@ class Stack(collections.Mapping): @profiler.trace('Stack.update', hide_args=False) @reset_state_on_error - def update(self, newstack, msg_queue=None): + def update(self, newstack, msg_queue=None, notify=None): """Update the stack. Compare the current stack with newstack, @@ -1289,7 +1295,7 @@ class Stack(collections.Mapping): """ self.updated_time = oslo_timeutils.utcnow() updater = scheduler.TaskRunner(self.update_task, newstack, - msg_queue=msg_queue) + msg_queue=msg_queue, notify=notify) updater() @profiler.trace('Stack.converge_stack', hide_args=False) @@ -1535,11 +1541,14 @@ class Stack(collections.Mapping): self.state_set(self.action, self.FAILED, six.text_type(reason)) @scheduler.wrappertask - def update_task(self, newstack, action=UPDATE, msg_queue=None): + def update_task(self, newstack, action=UPDATE, + msg_queue=None, notify=None): if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE): LOG.error("Unexpected action %s passed to update!", action) self.state_set(self.UPDATE, self.FAILED, "Invalid action %s" % action) + if notify is not None: + notify.signal() return try: @@ -1548,6 +1557,8 @@ class Stack(collections.Mapping): except Exception as e: self.state_set(action, self.FAILED, e.args[0] if e.args else 'Failed stack pre-ops: %s' % six.text_type(e)) + if notify is not None: + notify.signal() return if self.status == self.IN_PROGRESS: if action == self.ROLLBACK: @@ -1556,6 +1567,8 @@ class Stack(collections.Mapping): reason = _('Attempted to %s an IN_PROGRESS ' 'stack') % action self.reset_stack_and_resources_in_progress(reason) + if notify is not None: + notify.signal() return # Save a copy of the new template. To avoid two DB writes @@ -1569,6 +1582,10 @@ class Stack(collections.Mapping): self.status_reason = 'Stack %s started' % action self._send_notification_and_add_event() self.store() + # Notify the caller that the state is stored + if notify is not None: + notify.signal() + if prev_tmpl_id is not None: raw_template_object.RawTemplate.delete(self.context, prev_tmpl_id) @@ -1836,7 +1853,7 @@ class Stack(collections.Mapping): @profiler.trace('Stack.delete', hide_args=False) @reset_state_on_error - def delete(self, action=DELETE, backup=False, abandon=False): + def delete(self, action=DELETE, backup=False, abandon=False, notify=None): """Delete all of the resources, and then the stack itself. The action parameter is used to differentiate between a user @@ -1852,12 +1869,16 @@ class Stack(collections.Mapping): LOG.error("Unexpected action %s passed to delete!", action) self.state_set(self.DELETE, self.FAILED, "Invalid action %s" % action) + if notify is not None: + notify.signal() return stack_status = self.COMPLETE reason = 'Stack %s completed successfully' % action self.state_set(action, self.IN_PROGRESS, 'Stack %s started' % action) + if notify is not None: + notify.signal() backup_stack = self._backup_stack(False) if backup_stack: @@ -1921,7 +1942,7 @@ class Stack(collections.Mapping): @profiler.trace('Stack.suspend', hide_args=False) @reset_state_on_error - def suspend(self): + def suspend(self, notify=None): """Suspend the stack. Invokes handle_suspend for all stack resources. @@ -1932,6 +1953,7 @@ class Stack(collections.Mapping): other than move to SUSPEND_COMPLETE, so the resources must implement handle_suspend for this to have any effect. """ + LOG.debug("Suspending stack %s", self) # No need to suspend if the stack has been suspended if self.state == (self.SUSPEND, self.COMPLETE): LOG.info('%s is already suspended', self) @@ -1941,12 +1963,13 @@ class Stack(collections.Mapping): sus_task = scheduler.TaskRunner( self.stack_task, action=self.SUSPEND, - reverse=True) + reverse=True, + notify=notify) sus_task(timeout=self.timeout_secs()) @profiler.trace('Stack.resume', hide_args=False) @reset_state_on_error - def resume(self): + def resume(self, notify=None): """Resume the stack. Invokes handle_resume for all stack resources. @@ -1957,6 +1980,7 @@ class Stack(collections.Mapping): other than move to RESUME_COMPLETE, so the resources must implement handle_resume for this to have any effect. """ + LOG.debug("Resuming stack %s", self) # No need to resume if the stack has been resumed if self.state == (self.RESUME, self.COMPLETE): LOG.info('%s is already resumed', self) @@ -1966,7 +1990,8 @@ class Stack(collections.Mapping): sus_task = scheduler.TaskRunner( self.stack_task, action=self.RESUME, - reverse=False) + reverse=False, + notify=notify) sus_task(timeout=self.timeout_secs()) @profiler.trace('Stack.snapshot', hide_args=False) @@ -2028,16 +2053,17 @@ class Stack(collections.Mapping): return newstack, template @reset_state_on_error - def restore(self, snapshot): + def restore(self, snapshot, notify=None): """Restore the given snapshot. Invokes handle_restore on all resources. """ + LOG.debug("Restoring stack %s", self) self.updated_time = oslo_timeutils.utcnow() newstack = self.restore_data(snapshot)[0] updater = scheduler.TaskRunner(self.update_task, newstack, - action=self.RESTORE) + action=self.RESTORE, notify=notify) updater() def get_availability_zones(self): diff --git a/heat/tests/engine/service/test_stack_action.py b/heat/tests/engine/service/test_stack_action.py index 1f01f3ac45..ccec6bcacc 100644 --- a/heat/tests/engine/service/test_stack_action.py +++ b/heat/tests/engine/service/test_stack_action.py @@ -44,12 +44,14 @@ class StackServiceActionsTest(common.HeatTestCase): thread = mock.MagicMock() mock_link = self.patchobject(thread, 'link') mock_start.return_value = thread + self.patchobject(service, 'NotifyEvent') result = self.man.stack_suspend(self.ctx, stk.identifier()) self.assertIsNone(result) mock_load.assert_called_once_with(self.ctx, stack=s) mock_link.assert_called_once_with(mock.ANY) - mock_start.assert_called_once_with(stk.id, mock.ANY, stk) + mock_start.assert_called_once_with(stk.id, stk.suspend, + notify=mock.ANY) stk.delete() @@ -64,13 +66,14 @@ class StackServiceActionsTest(common.HeatTestCase): thread = mock.MagicMock() mock_link = self.patchobject(thread, 'link') mock_start.return_value = thread + self.patchobject(service, 'NotifyEvent') result = self.man.stack_resume(self.ctx, stk.identifier()) self.assertIsNone(result) mock_load.assert_called_once_with(self.ctx, stack=mock.ANY) mock_link.assert_called_once_with(mock.ANY) - mock_start.assert_called_once_with(stk.id, mock.ANY, stk) + mock_start.assert_called_once_with(stk.id, stk.resume, notify=mock.ANY) stk.delete() @@ -108,6 +111,7 @@ class StackServiceActionsTest(common.HeatTestCase): stk = utils.parse_stack(t, stack_name=stack_name) stk.check = mock.Mock() + self.patchobject(service, 'NotifyEvent') mock_load.return_value = stk mock_start.side_effect = self._mock_thread_start diff --git a/heat/tests/engine/service/test_stack_events.py b/heat/tests/engine/service/test_stack_events.py index 1c3bfa1e34..fa0e8e1d18 100644 --- a/heat/tests/engine/service/test_stack_events.py +++ b/heat/tests/engine/service/test_stack_events.py @@ -12,6 +12,8 @@ # under the License. import mock +from oslo_config import cfg +from oslo_messaging import conffixture from heat.engine import resource as res from heat.engine.resources.aws.ec2 import instance as instances @@ -94,6 +96,7 @@ class StackEventTest(common.HeatTestCase): @tools.stack_context('service_event_list_deleted_resource') @mock.patch.object(instances.Instance, 'handle_delete') def test_event_list_deleted_resource(self, mock_delete): + self.useFixture(conffixture.ConfFixture(cfg.CONF)) mock_delete.return_value = None res._register_class('GenericResourceType', @@ -103,7 +106,7 @@ class StackEventTest(common.HeatTestCase): thread.link = mock.Mock(return_value=None) def run(stack_id, func, *args, **kwargs): - func(*args) + func(*args, **kwargs) return thread self.eng.thread_group_mgr.start = run diff --git a/heat/tests/engine/service/test_stack_update.py b/heat/tests/engine/service/test_stack_update.py index 8848da0779..76e5348149 100644 --- a/heat/tests/engine/service/test_stack_update.py +++ b/heat/tests/engine/service/test_stack_update.py @@ -15,6 +15,7 @@ import uuid import eventlet.queue import mock from oslo_config import cfg +from oslo_messaging import conffixture from oslo_messaging.rpc import dispatcher import six @@ -43,6 +44,7 @@ class ServiceStackUpdateTest(common.HeatTestCase): def setUp(self): super(ServiceStackUpdateTest, self).setUp() + self.useFixture(conffixture.ConfFixture(cfg.CONF)) self.ctx = utils.dummy_context() self.man = service.EngineService('a-host', 'a-topic') self.man.thread_group_mgr = tools.DummyThreadGroupManager() @@ -68,7 +70,8 @@ class ServiceStackUpdateTest(common.HeatTestCase): mock_validate = self.patchobject(stk, 'validate', return_value=None) msgq_mock = mock.Mock() - self.patchobject(eventlet.queue, 'LightQueue', return_value=msgq_mock) + self.patchobject(eventlet.queue, 'LightQueue', + side_effect=[msgq_mock, eventlet.queue.LightQueue()]) # do update api_args = {'timeout_mins': 60, rpc_api.PARAM_CONVERGE: True} @@ -122,7 +125,8 @@ class ServiceStackUpdateTest(common.HeatTestCase): self.patchobject(environment, 'Environment', return_value=stk.env) self.patchobject(stk, 'validate', return_value=None) self.patchobject(eventlet.queue, 'LightQueue', - return_value=mock.Mock()) + side_effect=[mock.Mock(), + eventlet.queue.LightQueue()]) mock_merge = self.patchobject(env_util, 'merge_environments') @@ -160,7 +164,8 @@ class ServiceStackUpdateTest(common.HeatTestCase): mock_validate = self.patchobject(stk, 'validate', return_value=None) msgq_mock = mock.Mock() - self.patchobject(eventlet.queue, 'LightQueue', return_value=msgq_mock) + self.patchobject(eventlet.queue, 'LightQueue', + side_effect=[msgq_mock, eventlet.queue.LightQueue()]) # do update api_args = {'timeout_mins': 60, rpc_api.PARAM_CONVERGE: False} @@ -219,6 +224,7 @@ class ServiceStackUpdateTest(common.HeatTestCase): t['parameters']['newparam'] = {'type': 'number'} with mock.patch('heat.engine.stack.Stack') as mock_stack: stk.update = mock.Mock() + self.patchobject(service, 'NotifyEvent') mock_stack.load.return_value = stk mock_stack.validate.return_value = None result = self.man.update_stack(self.ctx, stk.identifier(), @@ -275,7 +281,8 @@ resources: rpc_api.PARAM_CONVERGE: False} with mock.patch('heat.engine.stack.Stack') as mock_stack: - stk.update = mock.Mock() + loaded_stack.update = mock.Mock() + self.patchobject(service, 'NotifyEvent') mock_stack.load.return_value = loaded_stack mock_stack.validate.return_value = None result = self.man.update_stack(self.ctx, stk.identifier(), @@ -318,6 +325,7 @@ resources: t['parameters']['newparam'] = {'type': 'number'} with mock.patch('heat.engine.stack.Stack') as mock_stack: stk.update = mock.Mock() + self.patchobject(service, 'NotifyEvent') mock_stack.load.return_value = stk mock_stack.validate.return_value = None result = self.man.update_stack(self.ctx, stk.identifier(), @@ -418,6 +426,7 @@ resources: 'myother.yaml': 'myother'} with mock.patch('heat.engine.stack.Stack') as mock_stack: stk.update = mock.Mock() + self.patchobject(service, 'NotifyEvent') mock_stack.load.return_value = stk mock_stack.validate.return_value = None result = self.man.update_stack(self.ctx, stk.identifier(), @@ -464,6 +473,7 @@ resources: 'resource_registry': {'resources': {}}} with mock.patch('heat.engine.stack.Stack') as mock_stack: stk.update = mock.Mock() + self.patchobject(service, 'NotifyEvent') mock_stack.load.return_value = stk mock_stack.validate.return_value = None result = self.man.update_stack(self.ctx, stk.identifier(), @@ -864,6 +874,7 @@ resources: stack.status = stack.COMPLETE with mock.patch('heat.engine.stack.Stack') as mock_stack: + self.patchobject(service, 'NotifyEvent') mock_stack.load.return_value = stack mock_stack.validate.return_value = None result = self.man.update_stack(self.ctx, stack.identifier(), diff --git a/releasenotes/notes/legacy-client-races-ba7a60cef5ec1694.yaml b/releasenotes/notes/legacy-client-races-ba7a60cef5ec1694.yaml new file mode 100644 index 0000000000..6371f2c096 --- /dev/null +++ b/releasenotes/notes/legacy-client-races-ba7a60cef5ec1694.yaml @@ -0,0 +1,12 @@ +--- +fixes: + - | + Previously, the suspend, resume, and check API calls for all stacks, and + the update, restore, and delete API calls for non-convergence stacks, + returned immediately after starting the stack operation. This meant that + for a client reading the state immediately when performing the same + operation twice in a row, it could have misinterpreted a previous state as + the latest unless careful reference were made to the updated_at timestamp. + Stacks are now guaranteed to have moved to the ``IN_PROGRESS`` state before + any of these APIs return (except in the case of deleting a non-convergence + stack where another operation was already in progress). diff --git a/tox.ini b/tox.ini index f0bacdfda9..aa0304d458 100644 --- a/tox.ini +++ b/tox.ini @@ -104,7 +104,7 @@ commands = bandit -r heat -x tests --skip B101,B104,B107,B110,B310,B311,B404,B41 [flake8] show-source = true exclude=.*,dist,*lib/python*,*egg,build,*convergence/scenarios/* -max-complexity=20 +max-complexity=24 [hacking] import_exceptions = heat.common.i18n