From 698c10fa568eb81e8d0c7d8489fc85d5a4156cd0 Mon Sep 17 00:00:00 2001 From: Rabi Mishra Date: Wed, 13 Feb 2019 20:53:35 +0530 Subject: [PATCH] Make stack check convergence aware Change-Id: I6883a1c2694fb77f76c7b93ed02f9263bfb6bb09 Story: #1727142 Task: 17325 --- heat/engine/check_resource.py | 12 +++++-- heat/engine/resource.py | 6 ++++ heat/engine/service.py | 14 +++++--- heat/engine/stack.py | 23 +++++++------ .../convergence/framework/engine_wrapper.py | 13 ++++++++ heat/tests/convergence/framework/reality.py | 8 +++-- .../convergence/scenarios/basic_check.py | 32 +++++++++++++++++++ heat/tests/engine/test_check_resource.py | 11 ++++--- 8 files changed, 95 insertions(+), 24 deletions(-) create mode 100644 heat/tests/convergence/scenarios/basic_check.py diff --git a/heat/engine/check_resource.py b/heat/engine/check_resource.py index a2f6d842c2..cd3474fab5 100644 --- a/heat/engine/check_resource.py +++ b/heat/engine/check_resource.py @@ -158,7 +158,7 @@ class CheckResource(object): return False else: - check_resource_cleanup(rsrc, tmpl.id, self.engine_id, + check_resource_cleanup(rsrc, tmpl.id, self.engine_id, stack, stack.time_remaining(), self.msg_queue) return True @@ -386,7 +386,10 @@ def check_resource_update(rsrc, template_id, requires, engine_id, stack, msg_queue): """Create or update the Resource if appropriate.""" check_message = functools.partial(_check_for_message, msg_queue) - if rsrc.action == resource.Resource.INIT: + if stack.action == stack.CHECK: + rsrc.check_convergence(engine_id, stack.time_remaining(), + check_message) + elif rsrc.action == resource.Resource.INIT: rsrc.create_convergence(template_id, requires, engine_id, stack.time_remaining(), check_message) else: @@ -395,9 +398,12 @@ def check_resource_update(rsrc, template_id, requires, engine_id, check_message) -def check_resource_cleanup(rsrc, template_id, engine_id, +def check_resource_cleanup(rsrc, template_id, engine_id, stack, timeout, msg_queue): """Delete the Resource if appropriate.""" + if stack.action == stack.CHECK: + return + check_message = functools.partial(_check_for_message, msg_queue) rsrc.delete_convergence(template_id, engine_id, timeout, check_message) diff --git a/heat/engine/resource.py b/heat/engine/resource.py index c383f0abf5..9523da00fc 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -1417,6 +1417,12 @@ class Resource(status.ResourceStatus): else: raise UpdateReplace(self.name) + def check_convergence(self, engine_id, timeout, progress_callback=None): + """Check the resource synchronously.""" + self._calling_engine_id = engine_id + runner = scheduler.TaskRunner(self.check) + runner(timeout=timeout, progress_callback=progress_callback) + def update_convergence(self, template_id, new_requires, engine_id, timeout, new_stack, progress_callback=None): """Update the resource synchronously. diff --git a/heat/engine/service.py b/heat/engine/service.py index 5c02798d07..1749d3da70 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -2151,10 +2151,16 @@ class EngineService(service.ServiceBase): stack = parser.Stack.load(cnxt, stack=s) LOG.info("Checking stack %s", stack.name) - stored_event = NotifyEvent() - self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id, - stack.check, notify=stored_event) - stored_event.wait() + if stack.convergence: + stack.thread_group_mgr = self.thread_group_mgr + stack.converge_stack(template=stack.t, + action=stack.CHECK) + else: + stored_event = NotifyEvent() + self.thread_group_mgr.start_with_lock( + cnxt, stack, self.engine_id, + stack.check, notify=stored_event) + stored_event.wait() @context.request_context def stack_restore(self, cnxt, stack_identity, snapshot_id): diff --git a/heat/engine/stack.py b/heat/engine/stack.py index f0887bbd61..f69a469e8f 100644 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -984,7 +984,8 @@ class Stack(collections.Mapping): if (self.convergence and self.action in {self.UPDATE, self.DELETE, self.CREATE, - self.ADOPT, self.ROLLBACK, self.RESTORE}): + self.ADOPT, self.ROLLBACK, self.RESTORE, + self.CHECK}): # These operations do not use the stack lock in convergence, so # never defer. return False @@ -1339,16 +1340,18 @@ class Stack(collections.Mapping): def converge_stack(self, template, action=UPDATE, new_stack=None, pre_converge=None): """Update the stack template and trigger convergence for resources.""" - if action not in [self.CREATE, self.ADOPT]: - # no back-up template for create action - self.prev_raw_template_id = getattr(self.t, 'id', None) + if action != self.CHECK: + if action not in [self.CREATE, self.ADOPT]: + # no back-up template for create action + self.prev_raw_template_id = getattr(self.t, 'id', None) + # switch template and reset dependencies + self.defn = self.defn.clone_with_new_template( + template, self.identifier(), clear_resource_data=True) - # switch template and reset dependencies - self.defn = self.defn.clone_with_new_template(template, - self.identifier(), - clear_resource_data=True) - self.reset_dependencies() - self._resources = None + self.reset_dependencies() + self._resources = None + else: + assert new_stack is None if action != self.CREATE: self.updated_time = oslo_timeutils.utcnow() diff --git a/heat/tests/convergence/framework/engine_wrapper.py b/heat/tests/convergence/framework/engine_wrapper.py index 0200dec539..e137e2f4da 100644 --- a/heat/tests/convergence/framework/engine_wrapper.py +++ b/heat/tests/convergence/framework/engine_wrapper.py @@ -81,6 +81,19 @@ class Engine(message_processor.MessageProcessor): srv.create_stack(cnxt, stack_name, hot_tmpl, params={}, files={}, environment_files=None, args={}) + @message_processor.asynchronous + def check_stack(self, stack_name): + cnxt = utils.dummy_context() + db_stack = db_api.stack_get_by_name(cnxt, stack_name) + srv = service.EngineService("host", "engine") + srv.thread_group_mgr = SynchronousThreadGroupManager() + srv.worker_service = self.worker + stack_identity = {'stack_name': stack_name, + 'stack_id': db_stack.id, + 'tenant': db_stack.tenant, + 'path': ''} + srv.stack_check(cnxt, stack_identity) + @message_processor.asynchronous def update_stack(self, stack_name, scenario_tmpl): cnxt = utils.dummy_context() diff --git a/heat/tests/convergence/framework/reality.py b/heat/tests/convergence/framework/reality.py index 4951541dc4..49c619dfc9 100644 --- a/heat/tests/convergence/framework/reality.py +++ b/heat/tests/convergence/framework/reality.py @@ -25,8 +25,9 @@ class RealityStore(object): ret = [] resources = db_api.resource_get_all(self.cntxt) for res in resources: - if (res.name == logical_name and res.action in ("CREATE", "UPDATE") - and res.status == "COMPLETE"): + if (res.name == logical_name and + res.action in ("CREATE", "UPDATE", + "CHECK") and res.status == "COMPLETE"): ret.append(res) return ret @@ -38,7 +39,8 @@ class RealityStore(object): ret = [] for res in resources: - if res.action in ("CREATE", "UPDATE") and res.status == "COMPLETE": + if res.action in ("CREATE", "UPDATE", + "CHECK") and res.status == "COMPLETE": ret.append(res) return ret diff --git a/heat/tests/convergence/scenarios/basic_check.py b/heat/tests/convergence/scenarios/basic_check.py new file mode 100644 index 0000000000..7073714afd --- /dev/null +++ b/heat/tests/convergence/scenarios/basic_check.py @@ -0,0 +1,32 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +def check_resource_count(expected_count): + test.assertEqual(expected_count, len(reality.all_resources())) + +example_template = Template({ + 'A': RsrcDef({}, []), + 'B': RsrcDef({}, []), + 'C': RsrcDef({'a': '4alpha'}, ['A', 'B']), + 'D': RsrcDef({'c': GetRes('C')}, []), + 'E': RsrcDef({'ca': GetAtt('C', 'a')}, []), +}) +engine.create_stack('foo', example_template) +engine.noop(5) +engine.call(check_resource_count, 5) +engine.call(verify, example_template) + +engine.check_stack('foo') +engine.noop(10) +engine.call(check_resource_count, 5) +engine.call(verify, example_template) diff --git a/heat/tests/engine/test_check_resource.py b/heat/tests/engine/test_check_resource.py index 486c15ea77..54e404229b 100644 --- a/heat/tests/engine/test_check_resource.py +++ b/heat/tests/engine/test_check_resource.py @@ -592,21 +592,24 @@ class CheckWorkflowCleanupTest(common.HeatTestCase): self.assertFalse(mock_cru.called) mock_crc.assert_called_once_with( self.resource, self.resource.stack.t.id, - self.worker.engine_id, + self.worker.engine_id, self.stack, tr(), mock.ANY) + @mock.patch.object(resource.Resource, 'load') @mock.patch.object(stack.Stack, 'time_remaining') def test_is_cleanup_traversal_raise_update_inprogress( - self, tr, mock_cru, mock_crc, mock_pcr, mock_csc): + self, tr, mock_load, mock_cru, mock_crc, mock_pcr, mock_csc): mock_crc.side_effect = exception.UpdateInProgress tr.return_value = 317 + mock_load.return_value = self.resource, self.stack, self.stack self.worker.check_resource( self.ctx, self.resource.id, self.stack.current_traversal, {}, self.is_update, None) mock_crc.assert_called_once_with(self.resource, self.resource.stack.t.id, self.worker.engine_id, - tr(), mock.ANY) + self.stack, tr(), + mock.ANY) self.assertFalse(mock_cru.called) self.assertFalse(mock_pcr.called) self.assertFalse(mock_csc.called) @@ -738,7 +741,7 @@ class MiscMethodsTest(common.HeatTestCase): self.resource.current_template_id = 'new-template-id' check_resource.check_resource_cleanup( self.resource, self.resource.stack.t.id, 'engine-id', - self.stack.timeout_secs(), None) + self.stack, self.stack.timeout_secs(), None) self.assertTrue(mock_delete.called) def test_check_message_raises_cancel_exception(self):