diff --git a/heat/engine/check_resource.py b/heat/engine/check_resource.py index 5b8ac4024b..1cd0bcaa79 100644 --- a/heat/engine/check_resource.py +++ b/heat/engine/check_resource.py @@ -15,29 +15,45 @@ import six +import eventlet.queue +import functools + from oslo_log import log as logging from heat.common import exception +from heat.common.i18n import _LE from heat.common.i18n import _LI from heat.engine import resource from heat.engine import scheduler from heat.engine import stack as parser from heat.engine import sync_point from heat.objects import resource as resource_objects +from heat.rpc import api as rpc_api from heat.rpc import listener_client LOG = logging.getLogger(__name__) +class CancelOperation(BaseException): + """Exception to cancel an in-progress operation on a resource. + + This exception is raised when operations on a resource are cancelled. + """ + def __init__(self): + return super(CancelOperation, self).__init__('user triggered cancel') + + class CheckResource(object): def __init__(self, engine_id, rpc_client, - thread_group_mgr): + thread_group_mgr, + msg_queue): self.engine_id = engine_id self._rpc_client = rpc_client self.thread_group_mgr = thread_group_mgr + self.msg_queue = msg_queue def _try_steal_engine_lock(self, cnxt, resource_id): rs_obj = resource_objects.Resource.get_obj(cnxt, @@ -92,7 +108,7 @@ class CheckResource(object): try: check_resource_update(rsrc, tmpl.id, resource_data, self.engine_id, - stack) + stack, self.msg_queue) except resource.UpdateReplace: new_res_id = rsrc.make_replacement(tmpl.id) LOG.info(_LI("Replacing resource with new id %s"), @@ -107,7 +123,8 @@ class CheckResource(object): else: check_resource_cleanup(rsrc, tmpl.id, resource_data, - self.engine_id, stack.time_remaining()) + self.engine_id, + stack.time_remaining(), self.msg_queue) return True except exception.UpdateInProgress: @@ -136,6 +153,8 @@ class CheckResource(object): if stack.current_traversal != current_traversal: return self._handle_stack_timeout(cnxt, stack) + except CancelOperation: + pass return False @@ -338,18 +357,36 @@ def propagate_check_resource(cnxt, rpc_client, next_res_id, {sender_key: sender_data}) +def _check_for_message(msg_queue): + if msg_queue is None: + return + try: + message = msg_queue.get_nowait() + except eventlet.queue.Empty: + return + + if message == rpc_api.THREAD_CANCEL: + raise CancelOperation + + LOG.error(_LE('Unknown message "%s" received'), message) + + def check_resource_update(rsrc, template_id, resource_data, engine_id, - stack): + stack, msg_queue): """Create or update the Resource if appropriate.""" + check_message = functools.partial(_check_for_message, msg_queue) if rsrc.action == resource.Resource.INIT: rsrc.create_convergence(template_id, resource_data, engine_id, - stack.time_remaining()) + stack.time_remaining(), check_message) else: rsrc.update_convergence(template_id, resource_data, engine_id, - stack.time_remaining(), stack) + stack.time_remaining(), stack, + check_message) def check_resource_cleanup(rsrc, template_id, resource_data, engine_id, - timeout): + timeout, msg_queue): """Delete the Resource if appropriate.""" - rsrc.delete_convergence(template_id, resource_data, engine_id, timeout) + check_message = functools.partial(_check_for_message, msg_queue) + rsrc.delete_convergence(template_id, resource_data, engine_id, timeout, + check_message) diff --git a/heat/engine/resource.py b/heat/engine/resource.py index d00f0f1833..cbaf6c7bfe 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -753,10 +753,14 @@ class Resource(object): failure = exception.ResourceFailure(ex, self, action) self.state_set(action, self.FAILED, six.text_type(failure)) raise failure - except: # noqa + except BaseException as exc: with excutils.save_and_reraise_exception(): try: - self.state_set(action, self.FAILED, '%s aborted' % action) + reason = six.text_type(exc) + msg = '%s aborted' % action + if reason: + msg += ' (%s)' % reason + self.state_set(action, self.FAILED, msg) except Exception: LOG.exception(_LE('Error marking resource as failed')) else: @@ -852,7 +856,7 @@ class Resource(object): return self def create_convergence(self, template_id, resource_data, engine_id, - timeout): + timeout, progress_callback=None): """Creates the resource by invoking the scheduler TaskRunner.""" with self.lock(engine_id): self.requires = list( @@ -865,7 +869,8 @@ class Resource(object): else: adopt_data = self.stack._adopt_kwargs(self) runner = scheduler.TaskRunner(self.adopt, **adopt_data) - runner(timeout=timeout) + + runner(timeout=timeout, progress_callback=progress_callback) def _validate_external_resource(self, external_id): if self.entity: @@ -1090,7 +1095,7 @@ class Resource(object): raise UpdateReplace(self.name) def update_convergence(self, template_id, resource_data, engine_id, - timeout, new_stack): + timeout, new_stack, progress_callback=None): """Update the resource synchronously. Persist the resource's current_template_id to template_id and @@ -1130,11 +1135,15 @@ class Resource(object): runner = scheduler.TaskRunner(self.update, new_res_def) try: - runner(timeout=timeout) - update_tmpl_id_and_requires() - except exception.ResourceFailure: + runner(timeout=timeout, progress_callback=progress_callback) update_tmpl_id_and_requires() + except exception.UpdateReplace: raise + except BaseException: + with excutils.save_and_reraise_exception(): + update_tmpl_id_and_requires() + else: + update_tmpl_id_and_requires() def preview_update(self, after, before, after_props, before_props, prev_resource, check_init_complete=False): @@ -1522,7 +1531,8 @@ class Resource(object): expected_engine_id=None ) - def delete_convergence(self, template_id, input_data, engine_id, timeout): + def delete_convergence(self, template_id, input_data, engine_id, timeout, + progress_callback=None): """Destroys the resource if it doesn't belong to given template. The given template is suppose to be the current template being @@ -1546,9 +1556,8 @@ class Resource(object): pass else: runner = scheduler.TaskRunner(self.delete) - runner(timeout=timeout) - - # update needed_by and replaces of replacement resource + runner(timeout=timeout, + progress_callback=progress_callback) self._update_replacement_data(template_id) def handle_delete(self): diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 7702774de4..f434b9ff99 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet.queue + from oslo_log import log as logging import oslo_messaging from oslo_service import service @@ -124,11 +126,17 @@ class WorkerService(service.Service): LOG.debug('[%s] Traversal cancelled; stopping.', current_traversal) return - cr = check_resource.CheckResource(self.engine_id, self._rpc_client, - self.thread_group_mgr) + msg_queue = eventlet.queue.LightQueue() + try: + self.thread_group_mgr.add_msg_queue(stack.id, msg_queue) + cr = check_resource.CheckResource(self.engine_id, self._rpc_client, + self.thread_group_mgr, msg_queue) - cr.check(cnxt, resource_id, current_traversal, resource_data, - is_update, adopt_stack_data, rsrc, stack) + cr.check(cnxt, resource_id, current_traversal, resource_data, + is_update, adopt_stack_data, rsrc, stack) + finally: + self.thread_group_mgr.remove_msg_queue(None, + stack.id, msg_queue) @context.request_context def cancel_check_resource(self, cnxt, stack_id): diff --git a/heat/tests/engine/test_check_resource.py b/heat/tests/engine/test_check_resource.py index 50b475f9aa..307c16a06d 100644 --- a/heat/tests/engine/test_check_resource.py +++ b/heat/tests/engine/test_check_resource.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import eventlet import mock from oslo_config import cfg @@ -25,6 +26,7 @@ from heat.engine import scheduler from heat.engine import stack from heat.engine import sync_point from heat.engine import worker +from heat.rpc import api as rpc_api from heat.rpc import worker_client from heat.tests import common from heat.tests.engine import tools @@ -49,7 +51,8 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): thread_group_mgr) self.cr = check_resource.CheckResource(self.worker.engine_id, self.worker._rpc_client, - self.worker.thread_group_mgr) + self.worker.thread_group_mgr, + mock.Mock()) self.worker._rpc_client = worker_client.WorkerClient() self.ctx = utils.dummy_context() self.stack = tools.get_stack( @@ -89,7 +92,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): mock_cru.assert_called_once_with(self.resource, self.resource.stack.t.id, {}, self.worker.engine_id, - mock.ANY) + mock.ANY, mock.ANY) self.assertFalse(mock_crc.called) expected_calls = [] @@ -118,7 +121,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): mock_cru.assert_called_once_with(self.resource, self.resource.stack.t.id, {}, self.worker.engine_id, - mock.ANY) + mock.ANY, mock.ANY) self.assertTrue(mock_mr.called) self.assertFalse(mock_crc.called) self.assertFalse(mock_pcr.called) @@ -140,7 +143,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): mock_cru.assert_called_once_with(self.resource, self.resource.stack.t.id, {}, self.worker.engine_id, - mock.ANY) + mock.ANY, mock.ANY) mock_ss.assert_called_once_with(self.resource.action, resource.Resource.FAILED, mock.ANY) @@ -507,6 +510,18 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): {}, self.is_update, {}) self.assertTrue(mock_hst.called) + def test_check_resource_does_not_propagate_on_cancel( + self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + # ensure when check_resource is cancelled, the next set of + # resources are not propagated. + mock_cru.side_effect = check_resource.CancelOperation + self.worker.check_resource(self.ctx, self.resource.id, + self.stack.current_traversal, + {}, self.is_update, {}) + self.assertFalse(mock_pcr.called) + self.assertFalse(mock_csc.called) + self.assertFalse(mock_cid.called) + @mock.patch.object(check_resource, 'construct_input_data') @mock.patch.object(check_resource, 'check_stack_complete') @@ -546,7 +561,7 @@ class CheckWorkflowCleanupTest(common.HeatTestCase): mock_crc.assert_called_once_with( self.resource, self.resource.stack.t.id, {}, self.worker.engine_id, - tr()) + tr(), mock.ANY) @mock.patch.object(stack.Stack, 'time_remaining') def test_is_cleanup_traversal_raise_update_inprogress( @@ -559,11 +574,23 @@ class CheckWorkflowCleanupTest(common.HeatTestCase): mock_crc.assert_called_once_with(self.resource, self.resource.stack.t.id, {}, self.worker.engine_id, - tr()) + tr(), mock.ANY) self.assertFalse(mock_cru.called) self.assertFalse(mock_pcr.called) self.assertFalse(mock_csc.called) + def test_check_resource_does_not_propagate_on_cancelling_cleanup( + self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): + # ensure when check_resource is cancelled, the next set of + # resources are not propagated. + mock_crc.side_effect = check_resource.CancelOperation + self.worker.check_resource(self.ctx, self.resource.id, + self.stack.current_traversal, + {}, self.is_update, {}) + self.assertFalse(mock_pcr.called) + self.assertFalse(mock_csc.called) + self.assertFalse(mock_cid.called) + class MiscMethodsTest(common.HeatTestCase): def setUp(self): @@ -649,7 +676,7 @@ class MiscMethodsTest(common.HeatTestCase): self.resource.action = 'INIT' check_resource.check_resource_update( self.resource, self.resource.stack.t.id, {}, 'engine-id', - self.stack) + self.stack, None) self.assertTrue(mock_create.called) self.assertFalse(mock_update.called) @@ -660,7 +687,7 @@ class MiscMethodsTest(common.HeatTestCase): self.resource.action = 'CREATE' check_resource.check_resource_update( self.resource, self.resource.stack.t.id, {}, 'engine-id', - self.stack) + self.stack, None) self.assertFalse(mock_create.called) self.assertTrue(mock_update.called) @@ -671,7 +698,7 @@ class MiscMethodsTest(common.HeatTestCase): self.resource.action = 'UPDATE' check_resource.check_resource_update( self.resource, self.resource.stack.t.id, {}, 'engine-id', - self.stack) + self.stack, None) self.assertFalse(mock_create.called) self.assertTrue(mock_update.called) @@ -680,5 +707,13 @@ class MiscMethodsTest(common.HeatTestCase): self.resource.current_template_id = 'new-template-id' check_resource.check_resource_cleanup( self.resource, self.resource.stack.t.id, {}, 'engine-id', - self.stack.timeout_secs()) + self.stack.timeout_secs(), None) self.assertTrue(mock_delete.called) + + def test_check_message_raises_cancel_exception(self): + # ensure CancelOperation is raised on receiving + # rpc_api.THREAD_CANCEL message + msg_queue = eventlet.queue.LightQueue() + msg_queue.put_nowait(rpc_api.THREAD_CANCEL) + self.assertRaises(check_resource.CancelOperation, + check_resource._check_for_message, msg_queue) diff --git a/heat/tests/engine/test_engine_worker.py b/heat/tests/engine/test_engine_worker.py index 684285131e..e45488236b 100644 --- a/heat/tests/engine/test_engine_worker.py +++ b/heat/tests/engine/test_engine_worker.py @@ -15,8 +15,10 @@ import mock +from heat.engine import check_resource from heat.engine import worker from heat.tests import common +from heat.tests import utils class WorkerServiceTest(common.HeatTestCase): @@ -84,3 +86,50 @@ class WorkerServiceTest(common.HeatTestCase): self.worker.stop() mock_rpc_server.stop.assert_called_once_with() mock_rpc_server.wait.assert_called_once_with() + + @mock.patch.object(check_resource, 'load_resource') + @mock.patch.object(check_resource.CheckResource, 'check') + def test_check_resource_adds_and_removes_msg_queue(self, + mock_check, + mock_load_resource): + mock_tgm = mock.MagicMock() + mock_tgm.add_msg_queue = mock.Mock(return_value=None) + mock_tgm.remove_msg_queue = mock.Mock(return_value=None) + self.worker = worker.WorkerService('host-1', + 'topic-1', + 'engine_id', + mock_tgm) + ctx = utils.dummy_context() + current_traversal = 'something' + fake_res = mock.MagicMock() + fake_res.current_traversal = current_traversal + mock_load_resource.return_value = (fake_res, fake_res, fake_res) + self.worker.check_resource(ctx, mock.Mock(), current_traversal, + {}, mock.Mock(), mock.Mock()) + self.assertTrue(mock_tgm.add_msg_queue.called) + self.assertTrue(mock_tgm.remove_msg_queue.called) + + @mock.patch.object(check_resource, 'load_resource') + @mock.patch.object(check_resource.CheckResource, 'check') + def test_check_resource_adds_and_removes_msg_queue_on_exception( + self, mock_check, mock_load_resource): + # even if the check fails; the message should be removed + mock_tgm = mock.MagicMock() + mock_tgm.add_msg_queue = mock.Mock(return_value=None) + mock_tgm.remove_msg_queue = mock.Mock(return_value=None) + self.worker = worker.WorkerService('host-1', + 'topic-1', + 'engine_id', + mock_tgm) + ctx = utils.dummy_context() + current_traversal = 'something' + fake_res = mock.MagicMock() + fake_res.current_traversal = current_traversal + mock_load_resource.return_value = (fake_res, fake_res, fake_res) + mock_check.side_effect = BaseException + self.assertRaises(BaseException, self.worker.check_resource, + ctx, mock.Mock(), current_traversal, {}, + mock.Mock(), mock.Mock()) + self.assertTrue(mock_tgm.add_msg_queue.called) + # ensure remove is also called + self.assertTrue(mock_tgm.remove_msg_queue.called) diff --git a/heat/tests/test_resource.py b/heat/tests/test_resource.py index acb061b7fd..aa7d124b18 100644 --- a/heat/tests/test_resource.py +++ b/heat/tests/test_resource.py @@ -1891,12 +1891,13 @@ class ResourceTest(common.HeatTestCase): self._assert_resource_lock(res.id, None, None) res_data = {(1, True): {u'id': 1, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} + pcb = mock.Mock() res.create_convergence(self.stack.t.id, res_data, 'engine-007', - 60) + 60, pcb) mock_init.assert_called_once_with(res.create) - mock_call.assert_called_once_with(timeout=60) + mock_call.assert_called_once_with(timeout=60, progress_callback=pcb) self.assertEqual(self.stack.t.id, res.current_template_id) self.assertItemsEqual([1, 3], res.requires) self._assert_resource_lock(res.id, None, 2) @@ -1995,12 +1996,13 @@ class ResourceTest(common.HeatTestCase): res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}}, (2, True): {u'id': 3, u'name': 'B', 'attrs': {}}} + pcb = mock.Mock() res.update_convergence(new_temp.id, res_data, 'engine-007', 120, - new_stack) + new_stack, pcb) expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name] mock_init.assert_called_once_with(res.update, expected_rsrc_def) - mock_call.assert_called_once_with(timeout=120) + mock_call.assert_called_once_with(timeout=120, progress_callback=pcb) self.assertEqual(new_temp.id, res.current_template_id) self.assertItemsEqual([3, 4], res.requires) self._assert_resource_lock(res.id, None, 2) @@ -2220,10 +2222,11 @@ class ResourceTest(common.HeatTestCase): res.handle_delete = mock.Mock(return_value=None) res._update_replacement_data = mock.Mock() self._assert_resource_lock(res.id, None, None) - res.delete_convergence(2, {}, 'engine-007', 20) + pcb = mock.Mock() + res.delete_convergence(2, {}, 'engine-007', 20, pcb) mock_init.assert_called_once_with(res.delete) - mock_call.assert_called_once_with(timeout=20) + mock_call.assert_called_once_with(timeout=20, progress_callback=pcb) self.assertTrue(res._update_replacement_data.called) def test_delete_convergence_does_not_delete_same_template_resource(self):