From d23ebb606571428c12863f7a5e671290b210c056 Mon Sep 17 00:00:00 2001 From: Angus Salkeld Date: Tue, 28 Jul 2015 18:43:19 +1000 Subject: [PATCH] Convergence: clarify what "data" is Mostly in worker we have arguments called "data", it is not clear if these are serialized or not (and if they have adopt data in them). 1. split adopt data out (add RPC support for the new argument) 2. name arguments "resource_data" for deserialized data 3. name arguments "rpc_data" for serialized data 4. make sure all data into client.check_resource() is serialized Change-Id: Ie6bd0e45d2857d3a23235776c2b96cce02cb711a --- heat/engine/resource.py | 2 - heat/engine/stack.py | 6 +-- heat/engine/worker.py | 61 ++++++++++++++----------- heat/rpc/worker_client.py | 13 ++++-- heat/tests/engine/test_engine_worker.py | 34 +++++++------- heat/tests/test_engine_service.py | 24 +++++----- 6 files changed, 75 insertions(+), 65 deletions(-) diff --git a/heat/engine/resource.py b/heat/engine/resource.py index 116e41a8f..074b91a90 100644 --- a/heat/engine/resource.py +++ b/heat/engine/resource.py @@ -273,8 +273,6 @@ class Resource(object): def special_stack(tmpl, swap_template): # TODO(sirushtim): Load stack from cache stk = stack_mod.Stack.load(context, db_res.stack_id) - stk.adopt_stack_data = data.get('adopt_stack_data') - # NOTE(sirushtim): Because on delete/cleanup operations, we simply # update with another template, the stack object won't have the # template of the previous stack-run. diff --git a/heat/engine/stack.py b/heat/engine/stack.py index 641e264ef..77d77f0f8 100755 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -999,11 +999,11 @@ class Stack(collections.Mapping): LOG.info(_LI("Triggering resource %(rsrc_id)s " "for %(is_update)s update"), {'rsrc_id': rsrc_id, 'is_update': is_update}) - input_data = {'input_data': {}, - 'adopt_stack_data': self.adopt_stack_data} + input_data = sync_point.serialize_input_data({}) self.worker_client.check_resource(self.context, rsrc_id, self.current_traversal, - input_data, is_update) + input_data, is_update, + self.adopt_stack_data) def rollback(self): old_tmpl_id = self.prev_raw_template_id diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 74dc9239b..0a5408495 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -45,7 +45,7 @@ class WorkerService(service.Service): or expect replies from these messages. """ - RPC_API_VERSION = '1.1' + RPC_API_VERSION = '1.2' def __init__(self, host, @@ -60,12 +60,14 @@ class WorkerService(service.Service): self._rpc_client = rpc_client.WorkerClient() self._rpc_server = None + self.target = None def start(self): target = oslo_messaging.Target( version=self.RPC_API_VERSION, server=self.host, topic=self.topic) + self.target = target LOG.info(_LI("Starting %(topic)s (%(version)s) in engine %(engine)s."), {'topic': self.topic, 'version': self.RPC_API_VERSION, @@ -121,19 +123,15 @@ class WorkerService(service.Service): else: stack.purge_db() - def _load_resource(self, cnxt, resource_id, data, is_update): - adopt_data = data.get('adopt_stack_data') - data = dict(sync_point.deserialize_input_data(data)) - + def _load_resource(self, cnxt, resource_id, resource_data, is_update): if is_update: cache_data = {in_data.get( - 'name'): in_data for in_data in data.values() + 'name'): in_data for in_data in resource_data.values() if in_data is not None} else: # no data to resolve in cleanup phase cache_data = {} - cache_data['adopt_stack_data'] = adopt_data rsrc, stack = None, None try: rsrc, stack = resource.Resource.load(cnxt, resource_id, is_update, @@ -143,32 +141,37 @@ class WorkerService(service.Service): return rsrc, stack - def _do_check_resource(self, cnxt, current_traversal, tmpl, data, - is_update, rsrc, stack_id): + def _do_check_resource(self, cnxt, current_traversal, tmpl, resource_data, + is_update, rsrc, stack_id, adopt_stack_data): try: if is_update: try: - check_resource_update(rsrc, tmpl.id, data, self.engine_id) + check_resource_update(rsrc, tmpl.id, resource_data, + self.engine_id) except resource.UpdateReplace: new_res_id = rsrc.make_replacement(tmpl.id) LOG.info("Replacing resource with new id %s", new_res_id) - data = sync_point.serialize_input_data(data) + rpc_data = sync_point.serialize_input_data(resource_data) self._rpc_client.check_resource(cnxt, new_res_id, current_traversal, - data, is_update) + rpc_data, is_update, + adopt_stack_data) return False else: - check_resource_cleanup(rsrc, tmpl.id, data, self.engine_id) + check_resource_cleanup(rsrc, tmpl.id, resource_data, + self.engine_id) return True except resource.UpdateInProgress: if self._try_steal_engine_lock(cnxt, rsrc.id): + rpc_data = sync_point.serialize_input_data(resource_data) self._rpc_client.check_resource(cnxt, rsrc.id, current_traversal, - data, is_update) + rpc_data, is_update, + adopt_stack_data) except exception.ResourceFailure as ex: reason = 'Resource %s failed: %s' % (rsrc.action, six.text_type(ex)) @@ -232,7 +235,8 @@ class WorkerService(service.Service): input_data = _get_input_data(req, fwd) propagate_check_resource( cnxt, self._rpc_client, req, current_traversal, - set(graph[(req, fwd)]), graph_key, input_data, fwd) + set(graph[(req, fwd)]), graph_key, input_data, fwd, + stack.adopt_stack_data) check_stack_complete(cnxt, stack, current_traversal, resource_id, deps, is_update) @@ -251,14 +255,16 @@ class WorkerService(service.Service): @context.request_context def check_resource(self, cnxt, resource_id, current_traversal, data, - is_update): + is_update, adopt_stack_data): ''' Process a node in the dependency graph. The node may be associated with either an update or a cleanup of its associated resource. ''' - rsrc, stack = self._load_resource(cnxt, resource_id, data, is_update) + resource_data = dict(sync_point.deserialize_input_data(data)) + rsrc, stack = self._load_resource(cnxt, resource_id, resource_data, + is_update) if rsrc is None: return @@ -268,6 +274,7 @@ class WorkerService(service.Service): return tmpl = stack.t + stack.adopt_stack_data = adopt_stack_data if is_update: if (rsrc.replaced_by is not None and @@ -275,8 +282,10 @@ class WorkerService(service.Service): return check_resource_done = self._do_check_resource(cnxt, current_traversal, - tmpl, data, is_update, - rsrc, stack.id) + tmpl, resource_data, + is_update, + rsrc, stack.id, + adopt_stack_data) if check_resource_done: # initiate check on next set of resources from graph @@ -327,20 +336,20 @@ def check_stack_complete(cnxt, stack, current_traversal, sender_id, deps, def propagate_check_resource(cnxt, rpc_client, next_res_id, current_traversal, predecessors, sender_key, - sender_data, is_update): + sender_data, is_update, adopt_stack_data): ''' Trigger processing of a node if all of its dependencies are satisfied. ''' def do_check(entity_id, data): rpc_client.check_resource(cnxt, entity_id, current_traversal, - data, is_update) + data, is_update, adopt_stack_data) sync_point.sync(cnxt, next_res_id, current_traversal, is_update, do_check, predecessors, {sender_key: sender_data}) -def check_resource_update(rsrc, template_id, data, engine_id): +def check_resource_update(rsrc, template_id, resource_data, engine_id): ''' Create or update the Resource if appropriate. ''' @@ -350,13 +359,13 @@ def check_resource_update(rsrc, template_id, data, engine_id): resource.Resource.COMPLETE, resource.Resource.FAILED ])): - rsrc.create_convergence(template_id, data, engine_id) + rsrc.create_convergence(template_id, resource_data, engine_id) else: - rsrc.update_convergence(template_id, data, engine_id) + rsrc.update_convergence(template_id, resource_data, engine_id) -def check_resource_cleanup(rsrc, template_id, data, engine_id): +def check_resource_cleanup(rsrc, template_id, resource_data, engine_id): ''' Delete the Resource if appropriate. ''' - rsrc.delete_convergence(template_id, data, engine_id) + rsrc.delete_convergence(template_id, resource_data, engine_id) diff --git a/heat/rpc/worker_client.py b/heat/rpc/worker_client.py index b8fa5f7ea..0edbc8302 100644 --- a/heat/rpc/worker_client.py +++ b/heat/rpc/worker_client.py @@ -28,6 +28,7 @@ class WorkerClient(object): 1.0 - Initial version. 1.1 - Added check_resource. + 1.2 - Add adopt data argument to check_resource. ''' BASE_RPC_API_VERSION = '1.0' @@ -50,8 +51,10 @@ class WorkerClient(object): client.cast(ctxt, method, **kwargs) def check_resource(self, ctxt, resource_id, - current_traversal, data, is_update): - self.cast(ctxt, self.make_msg( - 'check_resource', resource_id=resource_id, - current_traversal=current_traversal, data=data, - is_update=is_update)) + current_traversal, data, is_update, adopt_stack_data): + self.cast(ctxt, + self.make_msg( + 'check_resource', resource_id=resource_id, + current_traversal=current_traversal, data=data, + is_update=is_update, adopt_stack_data=adopt_stack_data), + version='1.2') diff --git a/heat/tests/engine/test_engine_worker.py b/heat/tests/engine/test_engine_worker.py index 593355a57..ccd9efd07 100644 --- a/heat/tests/engine/test_engine_worker.py +++ b/heat/tests/engine/test_engine_worker.py @@ -32,7 +32,7 @@ class WorkerServiceTest(common.HeatTestCase): def test_make_sure_rpc_version(self): self.assertEqual( - '1.1', + '1.2', worker.WorkerService.RPC_API_VERSION, ('RPC version is changed, please update this test to new version ' 'and make sure additional test cases are added for RPC APIs ' @@ -128,14 +128,14 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): self.worker.check_resource( self.ctx, 'non-existant-id', self.stack.current_traversal, {}, - True) + True, None) for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]: self.assertFalse(mocked.called) def test_stale_traversal( self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): self.worker.check_resource(self.ctx, self.resource.id, - 'stale-traversal', {}, True) + 'stale-traversal', {}, True, None) for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]: self.assertFalse(mocked.called) @@ -143,7 +143,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): self.worker.check_resource( self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) mock_cru.assert_called_once_with(self.resource, self.resource.stack.t.id, {}, self.worker.engine_id) @@ -168,7 +168,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): mock_cru.side_effect = resource.UpdateReplace self.worker.check_resource( self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) mock_cru.assert_called_once_with(self.resource, self.resource.stack.t.id, {}, self.worker.engine_id) @@ -185,7 +185,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): mock_tsl.return_value = True self.worker.check_resource( self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) mock_cru.assert_called_once_with(self.resource, self.resource.stack.t.id, {}, self.worker.engine_id) @@ -237,7 +237,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): dummy_ex, self.resource, action=self.resource.UPDATE) self.worker.check_resource(self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) s = self.stack.load(self.ctx, stack_id=self.stack.id) self.assertEqual((s.UPDATE, s.FAILED), (s.action, s.status)) self.assertEqual('Resource UPDATE failed: ' @@ -257,7 +257,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): dummy_ex, self.resource, action=self.resource.UPDATE) self.worker.check_resource(self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) s = self.stack.load(self.ctx, stack_id=self.stack.id) self.assertEqual((s.UPDATE, s.FAILED), (s.action, s.status)) self.assertEqual('Resource UPDATE failed: ' @@ -275,7 +275,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): dummy_ex, self.resource, action=self.resource.UPDATE) self.worker.check_resource(self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) self.assertTrue(self.worker._trigger_rollback.called) # make sure the rollback is called on given stack call_args, call_kwargs = self.worker._trigger_rollback.call_args @@ -294,7 +294,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): dummy_ex, self.resource, action=self.resource.UPDATE) self.worker.check_resource(self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) self.assertTrue(self.worker._trigger_rollback.called) # make sure the rollback is called on given stack call_args, call_kwargs = self.worker._trigger_rollback.call_args @@ -312,7 +312,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): dummy_ex, self.resource, action=self.stack.CREATE) self.worker.check_resource(self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) self.assertFalse(self.worker._trigger_rollback.called) def test_rollback_not_re_triggered_for_a_rolling_back_stack( @@ -328,7 +328,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): dummy_ex, self.resource, action=self.stack.CREATE) self.worker.check_resource(self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) self.assertFalse(self.worker._trigger_rollback.called) def test_resource_update_failure_purges_db_for_stack_failure( @@ -342,7 +342,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): dummy_ex, self.resource, action=self.resource.UPDATE) self.worker.check_resource(self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) self.assertTrue(self.stack.purge_db.called) def test_resource_cleanup_failure_purges_db_for_stack_failure( @@ -357,7 +357,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase): dummy_ex, self.resource, action=self.resource.UPDATE) self.worker.check_resource(self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) self.assertTrue(self.stack.purge_db.called) @mock.patch.object(worker.WorkerService, '_retrigger_check_resource') @@ -432,7 +432,7 @@ class CheckWorkflowCleanupTest(common.HeatTestCase): self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid): self.worker.check_resource( self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) self.assertFalse(mock_cru.called) mock_crc.assert_called_once_with( self.resource, self.resource.stack.t.id, @@ -443,7 +443,7 @@ class CheckWorkflowCleanupTest(common.HeatTestCase): mock_crc.side_effect = resource.UpdateInProgress self.worker.check_resource( self.ctx, self.resource.id, self.stack.current_traversal, {}, - self.is_update) + self.is_update, None) mock_crc.assert_called_once_with(self.resource, self.resource.stack.t.id, {}, self.worker.engine_id) @@ -504,7 +504,7 @@ class MiscMethodsTest(common.HeatTestCase): worker.propagate_check_resource( self.ctx, mock.ANY, mock.ANY, self.stack.current_traversal, mock.ANY, - mock.ANY, {}, True) + mock.ANY, {}, True, None) self.assertTrue(mock_sync.called) @mock.patch.object(resource.Resource, 'create_convergence') diff --git a/heat/tests/test_engine_service.py b/heat/tests/test_engine_service.py index 13aaff953..5a9a48607 100644 --- a/heat/tests/test_engine_service.py +++ b/heat/tests/test_engine_service.py @@ -222,8 +222,8 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): expected_calls.append( mock.call.worker_client.WorkerClient.check_resource( stack.context, rsrc_id, stack.current_traversal, - {'input_data': {}, 'adopt_stack_data': None}, - is_update)) + {'input_data': []}, + is_update, None)) self.assertEqual(expected_calls, mock_cr.mock_calls) def test_conv_string_five_instance_stack_create(self, mock_cr): @@ -279,8 +279,8 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): expected_calls.append( mock.call.worker_client.WorkerClient.check_resource( stack.context, rsrc_id, stack.current_traversal, - {'input_data': {}, 'adopt_stack_data': None}, - is_update)) + {'input_data': []}, + is_update, None)) self.assertEqual(expected_calls, mock_cr.mock_calls) def _mock_conv_update_requires(self, stack, conv_deps): @@ -415,16 +415,16 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): expected_calls.append( mock.call.worker_client.WorkerClient.check_resource( stack.context, rsrc_id, stack.current_traversal, - {'input_data': {}, 'adopt_stack_data': None}, - is_update)) + {'input_data': []}, + is_update, None)) leaves = curr_stack.convergence_dependencies.leaves() for rsrc_id, is_update in leaves: expected_calls.append( mock.call.worker_client.WorkerClient.check_resource( curr_stack.context, rsrc_id, curr_stack.current_traversal, - {'input_data': {}, 'adopt_stack_data': None}, - is_update)) + {'input_data': []}, + is_update, None)) self.assertEqual(expected_calls, mock_cr.mock_calls) def test_conv_empty_template_stack_update_delete(self, mock_cr): @@ -496,16 +496,16 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase): expected_calls.append( mock.call.worker_client.WorkerClient.check_resource( stack.context, rsrc_id, stack.current_traversal, - {'input_data': {}, 'adopt_stack_data': None}, - is_update)) + {'input_data': []}, + is_update, None)) leaves = curr_stack.convergence_dependencies.leaves() for rsrc_id, is_update in leaves: expected_calls.append( mock.call.worker_client.WorkerClient.check_resource( curr_stack.context, rsrc_id, curr_stack.current_traversal, - {'input_data': {}, 'adopt_stack_data': None}, - is_update)) + {'input_data': []}, + is_update, None)) self.assertEqual(expected_calls, mock_cr.mock_calls) def test_mark_complete_purges_db(self, mock_cr):