Make stack check convergence aware

Change-Id: I6883a1c2694fb77f76c7b93ed02f9263bfb6bb09
Story: #1727142
Task: 17325
This commit is contained in:
Rabi Mishra 2019-02-13 20:53:35 +05:30
parent 61048ef16b
commit 698c10fa56
8 changed files with 95 additions and 24 deletions

View File

@ -158,7 +158,7 @@ class CheckResource(object):
return False
else:
check_resource_cleanup(rsrc, tmpl.id, self.engine_id,
check_resource_cleanup(rsrc, tmpl.id, self.engine_id, stack,
stack.time_remaining(), self.msg_queue)
return True
@ -386,7 +386,10 @@ def check_resource_update(rsrc, template_id, requires, engine_id,
stack, msg_queue):
"""Create or update the Resource if appropriate."""
check_message = functools.partial(_check_for_message, msg_queue)
if rsrc.action == resource.Resource.INIT:
if stack.action == stack.CHECK:
rsrc.check_convergence(engine_id, stack.time_remaining(),
check_message)
elif rsrc.action == resource.Resource.INIT:
rsrc.create_convergence(template_id, requires, engine_id,
stack.time_remaining(), check_message)
else:
@ -395,9 +398,12 @@ def check_resource_update(rsrc, template_id, requires, engine_id,
check_message)
def check_resource_cleanup(rsrc, template_id, engine_id,
def check_resource_cleanup(rsrc, template_id, engine_id, stack,
timeout, msg_queue):
"""Delete the Resource if appropriate."""
if stack.action == stack.CHECK:
return
check_message = functools.partial(_check_for_message, msg_queue)
rsrc.delete_convergence(template_id, engine_id, timeout,
check_message)

View File

@ -1417,6 +1417,12 @@ class Resource(status.ResourceStatus):
else:
raise UpdateReplace(self.name)
def check_convergence(self, engine_id, timeout, progress_callback=None):
"""Check the resource synchronously."""
self._calling_engine_id = engine_id
runner = scheduler.TaskRunner(self.check)
runner(timeout=timeout, progress_callback=progress_callback)
def update_convergence(self, template_id, new_requires, engine_id,
timeout, new_stack, progress_callback=None):
"""Update the resource synchronously.

View File

@ -2151,10 +2151,16 @@ class EngineService(service.ServiceBase):
stack = parser.Stack.load(cnxt, stack=s)
LOG.info("Checking stack %s", stack.name)
stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(cnxt, stack, self.engine_id,
stack.check, notify=stored_event)
stored_event.wait()
if stack.convergence:
stack.thread_group_mgr = self.thread_group_mgr
stack.converge_stack(template=stack.t,
action=stack.CHECK)
else:
stored_event = NotifyEvent()
self.thread_group_mgr.start_with_lock(
cnxt, stack, self.engine_id,
stack.check, notify=stored_event)
stored_event.wait()
@context.request_context
def stack_restore(self, cnxt, stack_identity, snapshot_id):

View File

@ -984,7 +984,8 @@ class Stack(collections.Mapping):
if (self.convergence and
self.action in {self.UPDATE, self.DELETE, self.CREATE,
self.ADOPT, self.ROLLBACK, self.RESTORE}):
self.ADOPT, self.ROLLBACK, self.RESTORE,
self.CHECK}):
# These operations do not use the stack lock in convergence, so
# never defer.
return False
@ -1339,16 +1340,18 @@ class Stack(collections.Mapping):
def converge_stack(self, template, action=UPDATE, new_stack=None,
pre_converge=None):
"""Update the stack template and trigger convergence for resources."""
if action not in [self.CREATE, self.ADOPT]:
# no back-up template for create action
self.prev_raw_template_id = getattr(self.t, 'id', None)
if action != self.CHECK:
if action not in [self.CREATE, self.ADOPT]:
# no back-up template for create action
self.prev_raw_template_id = getattr(self.t, 'id', None)
# switch template and reset dependencies
self.defn = self.defn.clone_with_new_template(
template, self.identifier(), clear_resource_data=True)
# switch template and reset dependencies
self.defn = self.defn.clone_with_new_template(template,
self.identifier(),
clear_resource_data=True)
self.reset_dependencies()
self._resources = None
self.reset_dependencies()
self._resources = None
else:
assert new_stack is None
if action != self.CREATE:
self.updated_time = oslo_timeutils.utcnow()

View File

@ -81,6 +81,19 @@ class Engine(message_processor.MessageProcessor):
srv.create_stack(cnxt, stack_name, hot_tmpl,
params={}, files={}, environment_files=None, args={})
@message_processor.asynchronous
def check_stack(self, stack_name):
cnxt = utils.dummy_context()
db_stack = db_api.stack_get_by_name(cnxt, stack_name)
srv = service.EngineService("host", "engine")
srv.thread_group_mgr = SynchronousThreadGroupManager()
srv.worker_service = self.worker
stack_identity = {'stack_name': stack_name,
'stack_id': db_stack.id,
'tenant': db_stack.tenant,
'path': ''}
srv.stack_check(cnxt, stack_identity)
@message_processor.asynchronous
def update_stack(self, stack_name, scenario_tmpl):
cnxt = utils.dummy_context()

View File

@ -25,8 +25,9 @@ class RealityStore(object):
ret = []
resources = db_api.resource_get_all(self.cntxt)
for res in resources:
if (res.name == logical_name and res.action in ("CREATE", "UPDATE")
and res.status == "COMPLETE"):
if (res.name == logical_name and
res.action in ("CREATE", "UPDATE",
"CHECK") and res.status == "COMPLETE"):
ret.append(res)
return ret
@ -38,7 +39,8 @@ class RealityStore(object):
ret = []
for res in resources:
if res.action in ("CREATE", "UPDATE") and res.status == "COMPLETE":
if res.action in ("CREATE", "UPDATE",
"CHECK") and res.status == "COMPLETE":
ret.append(res)
return ret

View File

@ -0,0 +1,32 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def check_resource_count(expected_count):
test.assertEqual(expected_count, len(reality.all_resources()))
example_template = Template({
'A': RsrcDef({}, []),
'B': RsrcDef({}, []),
'C': RsrcDef({'a': '4alpha'}, ['A', 'B']),
'D': RsrcDef({'c': GetRes('C')}, []),
'E': RsrcDef({'ca': GetAtt('C', 'a')}, []),
})
engine.create_stack('foo', example_template)
engine.noop(5)
engine.call(check_resource_count, 5)
engine.call(verify, example_template)
engine.check_stack('foo')
engine.noop(10)
engine.call(check_resource_count, 5)
engine.call(verify, example_template)

View File

@ -592,21 +592,24 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
self.assertFalse(mock_cru.called)
mock_crc.assert_called_once_with(
self.resource, self.resource.stack.t.id,
self.worker.engine_id,
self.worker.engine_id, self.stack,
tr(), mock.ANY)
@mock.patch.object(resource.Resource, 'load')
@mock.patch.object(stack.Stack, 'time_remaining')
def test_is_cleanup_traversal_raise_update_inprogress(
self, tr, mock_cru, mock_crc, mock_pcr, mock_csc):
self, tr, mock_load, mock_cru, mock_crc, mock_pcr, mock_csc):
mock_crc.side_effect = exception.UpdateInProgress
tr.return_value = 317
mock_load.return_value = self.resource, self.stack, self.stack
self.worker.check_resource(
self.ctx, self.resource.id, self.stack.current_traversal, {},
self.is_update, None)
mock_crc.assert_called_once_with(self.resource,
self.resource.stack.t.id,
self.worker.engine_id,
tr(), mock.ANY)
self.stack, tr(),
mock.ANY)
self.assertFalse(mock_cru.called)
self.assertFalse(mock_pcr.called)
self.assertFalse(mock_csc.called)
@ -738,7 +741,7 @@ class MiscMethodsTest(common.HeatTestCase):
self.resource.current_template_id = 'new-template-id'
check_resource.check_resource_cleanup(
self.resource, self.resource.stack.t.id, 'engine-id',
self.stack.timeout_secs(), None)
self.stack, self.stack.timeout_secs(), None)
self.assertTrue(mock_delete.called)
def test_check_message_raises_cancel_exception(self):