Add interrupt points for convergence check-resource operations

This allows a convergence operation to be cancelled at an appropriate point
(i.e. between steps in a task) by sending a message to a queue.

Note that there's no code yet to actually cancel any operations
(specifically, sending a cancel message to the stack will _not_ cause the
check_resource operations to be cancelled under convergence).

Change-Id: I9469c31de5e40334083ef1dd20243f2f6779549e
Related-Bug: #1545063
Co-Authored-By: Anant Patil <anant.patil@hpe.com>
changes/76/343076/14
Zane Bitter 6 years ago committed by Anant Patil
parent 084d0eb20f
commit 9c79ee4d69
  1. 53
      heat/engine/check_resource.py
  2. 33
      heat/engine/resource.py
  3. 18
      heat/engine/worker.py
  4. 55
      heat/tests/engine/test_check_resource.py
  5. 49
      heat/tests/engine/test_engine_worker.py
  6. 15
      heat/tests/test_resource.py

@ -15,29 +15,45 @@
import six
import eventlet.queue
import functools
from oslo_log import log as logging
from heat.common import exception
from heat.common.i18n import _LE
from heat.common.i18n import _LI
from heat.engine import resource
from heat.engine import scheduler
from heat.engine import stack as parser
from heat.engine import sync_point
from heat.objects import resource as resource_objects
from heat.rpc import api as rpc_api
from heat.rpc import listener_client
LOG = logging.getLogger(__name__)
class CancelOperation(BaseException):
"""Exception to cancel an in-progress operation on a resource.
This exception is raised when operations on a resource are cancelled.
"""
def __init__(self):
return super(CancelOperation, self).__init__('user triggered cancel')
class CheckResource(object):
def __init__(self,
engine_id,
rpc_client,
thread_group_mgr):
thread_group_mgr,
msg_queue):
self.engine_id = engine_id
self._rpc_client = rpc_client
self.thread_group_mgr = thread_group_mgr
self.msg_queue = msg_queue
def _try_steal_engine_lock(self, cnxt, resource_id):
rs_obj = resource_objects.Resource.get_obj(cnxt,
@ -92,7 +108,7 @@ class CheckResource(object):
try:
check_resource_update(rsrc, tmpl.id, resource_data,
self.engine_id,
stack)
stack, self.msg_queue)
except resource.UpdateReplace:
new_res_id = rsrc.make_replacement(tmpl.id)
LOG.info(_LI("Replacing resource with new id %s"),
@ -107,7 +123,8 @@ class CheckResource(object):
else:
check_resource_cleanup(rsrc, tmpl.id, resource_data,
self.engine_id, stack.time_remaining())
self.engine_id,
stack.time_remaining(), self.msg_queue)
return True
except exception.UpdateInProgress:
@ -136,6 +153,8 @@ class CheckResource(object):
if stack.current_traversal != current_traversal:
return
self._handle_stack_timeout(cnxt, stack)
except CancelOperation:
pass
return False
@ -338,18 +357,36 @@ def propagate_check_resource(cnxt, rpc_client, next_res_id,
{sender_key: sender_data})
def _check_for_message(msg_queue):
if msg_queue is None:
return
try:
message = msg_queue.get_nowait()
except eventlet.queue.Empty:
return
if message == rpc_api.THREAD_CANCEL:
raise CancelOperation
LOG.error(_LE('Unknown message "%s" received'), message)
def check_resource_update(rsrc, template_id, resource_data, engine_id,
stack):
stack, msg_queue):
"""Create or update the Resource if appropriate."""
check_message = functools.partial(_check_for_message, msg_queue)
if rsrc.action == resource.Resource.INIT:
rsrc.create_convergence(template_id, resource_data, engine_id,
stack.time_remaining())
stack.time_remaining(), check_message)
else:
rsrc.update_convergence(template_id, resource_data, engine_id,
stack.time_remaining(), stack)
stack.time_remaining(), stack,
check_message)
def check_resource_cleanup(rsrc, template_id, resource_data, engine_id,
timeout):
timeout, msg_queue):
"""Delete the Resource if appropriate."""
rsrc.delete_convergence(template_id, resource_data, engine_id, timeout)
check_message = functools.partial(_check_for_message, msg_queue)
rsrc.delete_convergence(template_id, resource_data, engine_id, timeout,
check_message)

@ -748,10 +748,14 @@ class Resource(object):
failure = exception.ResourceFailure(ex, self, action)
self.state_set(action, self.FAILED, six.text_type(failure))
raise failure
except: # noqa
except BaseException as exc:
with excutils.save_and_reraise_exception():
try:
self.state_set(action, self.FAILED, '%s aborted' % action)
reason = six.text_type(exc)
msg = '%s aborted' % action
if reason:
msg += ' (%s)' % reason
self.state_set(action, self.FAILED, msg)
except Exception:
LOG.exception(_LE('Error marking resource as failed'))
else:
@ -847,7 +851,7 @@ class Resource(object):
return self
def create_convergence(self, template_id, resource_data, engine_id,
timeout):
timeout, progress_callback=None):
"""Creates the resource by invoking the scheduler TaskRunner."""
with self.lock(engine_id):
self.requires = list(
@ -860,7 +864,8 @@ class Resource(object):
else:
adopt_data = self.stack._adopt_kwargs(self)
runner = scheduler.TaskRunner(self.adopt, **adopt_data)
runner(timeout=timeout)
runner(timeout=timeout, progress_callback=progress_callback)
def _validate_external_resource(self, external_id):
if self.entity:
@ -1085,7 +1090,7 @@ class Resource(object):
raise UpdateReplace(self.name)
def update_convergence(self, template_id, resource_data, engine_id,
timeout, new_stack):
timeout, new_stack, progress_callback=None):
"""Update the resource synchronously.
Persist the resource's current_template_id to template_id and
@ -1125,11 +1130,15 @@ class Resource(object):
runner = scheduler.TaskRunner(self.update, new_res_def)
try:
runner(timeout=timeout)
update_tmpl_id_and_requires()
except exception.ResourceFailure:
runner(timeout=timeout, progress_callback=progress_callback)
update_tmpl_id_and_requires()
except exception.UpdateReplace:
raise
except BaseException:
with excutils.save_and_reraise_exception():
update_tmpl_id_and_requires()
else:
update_tmpl_id_and_requires()
def preview_update(self, after, before, after_props, before_props,
prev_resource, check_init_complete=False):
@ -1517,7 +1526,8 @@ class Resource(object):
expected_engine_id=None
)
def delete_convergence(self, template_id, input_data, engine_id, timeout):
def delete_convergence(self, template_id, input_data, engine_id, timeout,
progress_callback=None):
"""Destroys the resource if it doesn't belong to given template.
The given template is suppose to be the current template being
@ -1541,9 +1551,8 @@ class Resource(object):
pass
else:
runner = scheduler.TaskRunner(self.delete)
runner(timeout=timeout)
# update needed_by and replaces of replacement resource
runner(timeout=timeout,
progress_callback=progress_callback)
self._update_replacement_data(template_id)
def handle_delete(self):

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet.queue
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
@ -124,11 +126,17 @@ class WorkerService(service.Service):
LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal)
return
cr = check_resource.CheckResource(self.engine_id, self._rpc_client,
self.thread_group_mgr)
cr.check(cnxt, resource_id, current_traversal, resource_data,
is_update, adopt_stack_data, rsrc, stack)
msg_queue = eventlet.queue.LightQueue()
try:
self.thread_group_mgr.add_msg_queue(stack.id, msg_queue)
cr = check_resource.CheckResource(self.engine_id, self._rpc_client,
self.thread_group_mgr, msg_queue)
cr.check(cnxt, resource_id, current_traversal, resource_data,
is_update, adopt_stack_data, rsrc, stack)
finally:
self.thread_group_mgr.remove_msg_queue(None,
stack.id, msg_queue)
@context.request_context
def cancel_check_resource(self, cnxt, stack_id):

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import eventlet
import mock
from oslo_config import cfg
@ -25,6 +26,7 @@ from heat.engine import scheduler
from heat.engine import stack
from heat.engine import sync_point
from heat.engine import worker
from heat.rpc import api as rpc_api
from heat.rpc import worker_client
from heat.tests import common
from heat.tests.engine import tools
@ -49,7 +51,8 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
thread_group_mgr)
self.cr = check_resource.CheckResource(self.worker.engine_id,
self.worker._rpc_client,
self.worker.thread_group_mgr)
self.worker.thread_group_mgr,
mock.Mock())
self.worker._rpc_client = worker_client.WorkerClient()
self.ctx = utils.dummy_context()
self.stack = tools.get_stack(
@ -89,7 +92,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id,
{}, self.worker.engine_id,
mock.ANY)
mock.ANY, mock.ANY)
self.assertFalse(mock_crc.called)
expected_calls = []
@ -118,7 +121,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id,
{}, self.worker.engine_id,
mock.ANY)
mock.ANY, mock.ANY)
self.assertTrue(mock_mr.called)
self.assertFalse(mock_crc.called)
self.assertFalse(mock_pcr.called)
@ -140,7 +143,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id,
{}, self.worker.engine_id,
mock.ANY)
mock.ANY, mock.ANY)
mock_ss.assert_called_once_with(self.resource.action,
resource.Resource.FAILED,
mock.ANY)
@ -507,6 +510,18 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
{}, self.is_update, {})
self.assertTrue(mock_hst.called)
def test_check_resource_does_not_propagate_on_cancel(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
# ensure when check_resource is cancelled, the next set of
# resources are not propagated.
mock_cru.side_effect = check_resource.CancelOperation
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal,
{}, self.is_update, {})
self.assertFalse(mock_pcr.called)
self.assertFalse(mock_csc.called)
self.assertFalse(mock_cid.called)
@mock.patch.object(check_resource, 'construct_input_data')
@mock.patch.object(check_resource, 'check_stack_complete')
@ -546,7 +561,7 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
mock_crc.assert_called_once_with(
self.resource, self.resource.stack.t.id,
{}, self.worker.engine_id,
tr())
tr(), mock.ANY)
@mock.patch.object(stack.Stack, 'time_remaining')
def test_is_cleanup_traversal_raise_update_inprogress(
@ -559,11 +574,23 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
mock_crc.assert_called_once_with(self.resource,
self.resource.stack.t.id,
{}, self.worker.engine_id,
tr())
tr(), mock.ANY)
self.assertFalse(mock_cru.called)
self.assertFalse(mock_pcr.called)
self.assertFalse(mock_csc.called)
def test_check_resource_does_not_propagate_on_cancelling_cleanup(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
# ensure when check_resource is cancelled, the next set of
# resources are not propagated.
mock_crc.side_effect = check_resource.CancelOperation
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal,
{}, self.is_update, {})
self.assertFalse(mock_pcr.called)
self.assertFalse(mock_csc.called)
self.assertFalse(mock_cid.called)
class MiscMethodsTest(common.HeatTestCase):
def setUp(self):
@ -649,7 +676,7 @@ class MiscMethodsTest(common.HeatTestCase):
self.resource.action = 'INIT'
check_resource.check_resource_update(
self.resource, self.resource.stack.t.id, {}, 'engine-id',
self.stack)
self.stack, None)
self.assertTrue(mock_create.called)
self.assertFalse(mock_update.called)
@ -660,7 +687,7 @@ class MiscMethodsTest(common.HeatTestCase):
self.resource.action = 'CREATE'
check_resource.check_resource_update(
self.resource, self.resource.stack.t.id, {}, 'engine-id',
self.stack)
self.stack, None)
self.assertFalse(mock_create.called)
self.assertTrue(mock_update.called)
@ -671,7 +698,7 @@ class MiscMethodsTest(common.HeatTestCase):
self.resource.action = 'UPDATE'
check_resource.check_resource_update(
self.resource, self.resource.stack.t.id, {}, 'engine-id',
self.stack)
self.stack, None)
self.assertFalse(mock_create.called)
self.assertTrue(mock_update.called)
@ -680,5 +707,13 @@ class MiscMethodsTest(common.HeatTestCase):
self.resource.current_template_id = 'new-template-id'
check_resource.check_resource_cleanup(
self.resource, self.resource.stack.t.id, {}, 'engine-id',
self.stack.timeout_secs())
self.stack.timeout_secs(), None)
self.assertTrue(mock_delete.called)
def test_check_message_raises_cancel_exception(self):
# ensure CancelOperation is raised on receiving
# rpc_api.THREAD_CANCEL message
msg_queue = eventlet.queue.LightQueue()
msg_queue.put_nowait(rpc_api.THREAD_CANCEL)
self.assertRaises(check_resource.CancelOperation,
check_resource._check_for_message, msg_queue)

@ -15,8 +15,10 @@
import mock
from heat.engine import check_resource
from heat.engine import worker
from heat.tests import common
from heat.tests import utils
class WorkerServiceTest(common.HeatTestCase):
@ -84,3 +86,50 @@ class WorkerServiceTest(common.HeatTestCase):
self.worker.stop()
mock_rpc_server.stop.assert_called_once_with()
mock_rpc_server.wait.assert_called_once_with()
@mock.patch.object(check_resource, 'load_resource')
@mock.patch.object(check_resource.CheckResource, 'check')
def test_check_resource_adds_and_removes_msg_queue(self,
mock_check,
mock_load_resource):
mock_tgm = mock.MagicMock()
mock_tgm.add_msg_queue = mock.Mock(return_value=None)
mock_tgm.remove_msg_queue = mock.Mock(return_value=None)
self.worker = worker.WorkerService('host-1',
'topic-1',
'engine_id',
mock_tgm)
ctx = utils.dummy_context()
current_traversal = 'something'
fake_res = mock.MagicMock()
fake_res.current_traversal = current_traversal
mock_load_resource.return_value = (fake_res, fake_res, fake_res)
self.worker.check_resource(ctx, mock.Mock(), current_traversal,
{}, mock.Mock(), mock.Mock())
self.assertTrue(mock_tgm.add_msg_queue.called)
self.assertTrue(mock_tgm.remove_msg_queue.called)
@mock.patch.object(check_resource, 'load_resource')
@mock.patch.object(check_resource.CheckResource, 'check')
def test_check_resource_adds_and_removes_msg_queue_on_exception(
self, mock_check, mock_load_resource):
# even if the check fails; the message should be removed
mock_tgm = mock.MagicMock()
mock_tgm.add_msg_queue = mock.Mock(return_value=None)
mock_tgm.remove_msg_queue = mock.Mock(return_value=None)
self.worker = worker.WorkerService('host-1',
'topic-1',
'engine_id',
mock_tgm)
ctx = utils.dummy_context()
current_traversal = 'something'
fake_res = mock.MagicMock()
fake_res.current_traversal = current_traversal
mock_load_resource.return_value = (fake_res, fake_res, fake_res)
mock_check.side_effect = BaseException
self.assertRaises(BaseException, self.worker.check_resource,
ctx, mock.Mock(), current_traversal, {},
mock.Mock(), mock.Mock())
self.assertTrue(mock_tgm.add_msg_queue.called)
# ensure remove is also called
self.assertTrue(mock_tgm.remove_msg_queue.called)

@ -1891,12 +1891,13 @@ class ResourceTest(common.HeatTestCase):
self._assert_resource_lock(res.id, None, None)
res_data = {(1, True): {u'id': 1, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
pcb = mock.Mock()
res.create_convergence(self.stack.t.id, res_data, 'engine-007',
60)
60, pcb)
mock_init.assert_called_once_with(res.create)
mock_call.assert_called_once_with(timeout=60)
mock_call.assert_called_once_with(timeout=60, progress_callback=pcb)
self.assertEqual(self.stack.t.id, res.current_template_id)
self.assertItemsEqual([1, 3], res.requires)
self._assert_resource_lock(res.id, None, 2)
@ -1995,12 +1996,13 @@ class ResourceTest(common.HeatTestCase):
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
pcb = mock.Mock()
res.update_convergence(new_temp.id, res_data, 'engine-007', 120,
new_stack)
new_stack, pcb)
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
mock_init.assert_called_once_with(res.update, expected_rsrc_def)
mock_call.assert_called_once_with(timeout=120)
mock_call.assert_called_once_with(timeout=120, progress_callback=pcb)
self.assertEqual(new_temp.id, res.current_template_id)
self.assertItemsEqual([3, 4], res.requires)
self._assert_resource_lock(res.id, None, 2)
@ -2220,10 +2222,11 @@ class ResourceTest(common.HeatTestCase):
res.handle_delete = mock.Mock(return_value=None)
res._update_replacement_data = mock.Mock()
self._assert_resource_lock(res.id, None, None)
res.delete_convergence(2, {}, 'engine-007', 20)
pcb = mock.Mock()
res.delete_convergence(2, {}, 'engine-007', 20, pcb)
mock_init.assert_called_once_with(res.delete)
mock_call.assert_called_once_with(timeout=20)
mock_call.assert_called_once_with(timeout=20, progress_callback=pcb)
self.assertTrue(res._update_replacement_data.called)
def test_delete_convergence_does_not_delete_same_template_resource(self):

Loading…
Cancel
Save