Convergence: clarify what "data" is

Mostly in worker we have arguments called "data", it is not clear
if these are serialized or not (and if they have adopt data in them).

1. split adopt data out (add RPC support for the new argument)
2. name arguments "resource_data" for deserialized data
3. name arguments "rpc_data" for serialized data
4. make sure all data into client.check_resource() is serialized

Change-Id: Ie6bd0e45d2857d3a23235776c2b96cce02cb711a
changes/91/206291/3
Angus Salkeld 7 years ago committed by Sirushti Murugesan
parent 14897230fb
commit d23ebb6065
  1. 2
      heat/engine/resource.py
  2. 6
      heat/engine/stack.py
  3. 61
      heat/engine/worker.py
  4. 13
      heat/rpc/worker_client.py
  5. 34
      heat/tests/engine/test_engine_worker.py
  6. 24
      heat/tests/test_engine_service.py

@ -273,8 +273,6 @@ class Resource(object):
def special_stack(tmpl, swap_template):
# TODO(sirushtim): Load stack from cache
stk = stack_mod.Stack.load(context, db_res.stack_id)
stk.adopt_stack_data = data.get('adopt_stack_data')
# NOTE(sirushtim): Because on delete/cleanup operations, we simply
# update with another template, the stack object won't have the
# template of the previous stack-run.

@ -999,11 +999,11 @@ class Stack(collections.Mapping):
LOG.info(_LI("Triggering resource %(rsrc_id)s "
"for %(is_update)s update"),
{'rsrc_id': rsrc_id, 'is_update': is_update})
input_data = {'input_data': {},
'adopt_stack_data': self.adopt_stack_data}
input_data = sync_point.serialize_input_data({})
self.worker_client.check_resource(self.context, rsrc_id,
self.current_traversal,
input_data, is_update)
input_data, is_update,
self.adopt_stack_data)
def rollback(self):
old_tmpl_id = self.prev_raw_template_id

@ -45,7 +45,7 @@ class WorkerService(service.Service):
or expect replies from these messages.
"""
RPC_API_VERSION = '1.1'
RPC_API_VERSION = '1.2'
def __init__(self,
host,
@ -60,12 +60,14 @@ class WorkerService(service.Service):
self._rpc_client = rpc_client.WorkerClient()
self._rpc_server = None
self.target = None
def start(self):
target = oslo_messaging.Target(
version=self.RPC_API_VERSION,
server=self.host,
topic=self.topic)
self.target = target
LOG.info(_LI("Starting %(topic)s (%(version)s) in engine %(engine)s."),
{'topic': self.topic,
'version': self.RPC_API_VERSION,
@ -121,19 +123,15 @@ class WorkerService(service.Service):
else:
stack.purge_db()
def _load_resource(self, cnxt, resource_id, data, is_update):
adopt_data = data.get('adopt_stack_data')
data = dict(sync_point.deserialize_input_data(data))
def _load_resource(self, cnxt, resource_id, resource_data, is_update):
if is_update:
cache_data = {in_data.get(
'name'): in_data for in_data in data.values()
'name'): in_data for in_data in resource_data.values()
if in_data is not None}
else:
# no data to resolve in cleanup phase
cache_data = {}
cache_data['adopt_stack_data'] = adopt_data
rsrc, stack = None, None
try:
rsrc, stack = resource.Resource.load(cnxt, resource_id, is_update,
@ -143,32 +141,37 @@ class WorkerService(service.Service):
return rsrc, stack
def _do_check_resource(self, cnxt, current_traversal, tmpl, data,
is_update, rsrc, stack_id):
def _do_check_resource(self, cnxt, current_traversal, tmpl, resource_data,
is_update, rsrc, stack_id, adopt_stack_data):
try:
if is_update:
try:
check_resource_update(rsrc, tmpl.id, data, self.engine_id)
check_resource_update(rsrc, tmpl.id, resource_data,
self.engine_id)
except resource.UpdateReplace:
new_res_id = rsrc.make_replacement(tmpl.id)
LOG.info("Replacing resource with new id %s", new_res_id)
data = sync_point.serialize_input_data(data)
rpc_data = sync_point.serialize_input_data(resource_data)
self._rpc_client.check_resource(cnxt,
new_res_id,
current_traversal,
data, is_update)
rpc_data, is_update,
adopt_stack_data)
return False
else:
check_resource_cleanup(rsrc, tmpl.id, data, self.engine_id)
check_resource_cleanup(rsrc, tmpl.id, resource_data,
self.engine_id)
return True
except resource.UpdateInProgress:
if self._try_steal_engine_lock(cnxt, rsrc.id):
rpc_data = sync_point.serialize_input_data(resource_data)
self._rpc_client.check_resource(cnxt,
rsrc.id,
current_traversal,
data, is_update)
rpc_data, is_update,
adopt_stack_data)
except exception.ResourceFailure as ex:
reason = 'Resource %s failed: %s' % (rsrc.action,
six.text_type(ex))
@ -232,7 +235,8 @@ class WorkerService(service.Service):
input_data = _get_input_data(req, fwd)
propagate_check_resource(
cnxt, self._rpc_client, req, current_traversal,
set(graph[(req, fwd)]), graph_key, input_data, fwd)
set(graph[(req, fwd)]), graph_key, input_data, fwd,
stack.adopt_stack_data)
check_stack_complete(cnxt, stack, current_traversal,
resource_id, deps, is_update)
@ -251,14 +255,16 @@ class WorkerService(service.Service):
@context.request_context
def check_resource(self, cnxt, resource_id, current_traversal, data,
is_update):
is_update, adopt_stack_data):
'''
Process a node in the dependency graph.
The node may be associated with either an update or a cleanup of its
associated resource.
'''
rsrc, stack = self._load_resource(cnxt, resource_id, data, is_update)
resource_data = dict(sync_point.deserialize_input_data(data))
rsrc, stack = self._load_resource(cnxt, resource_id, resource_data,
is_update)
if rsrc is None:
return
@ -268,6 +274,7 @@ class WorkerService(service.Service):
return
tmpl = stack.t
stack.adopt_stack_data = adopt_stack_data
if is_update:
if (rsrc.replaced_by is not None and
@ -275,8 +282,10 @@ class WorkerService(service.Service):
return
check_resource_done = self._do_check_resource(cnxt, current_traversal,
tmpl, data, is_update,
rsrc, stack.id)
tmpl, resource_data,
is_update,
rsrc, stack.id,
adopt_stack_data)
if check_resource_done:
# initiate check on next set of resources from graph
@ -327,20 +336,20 @@ def check_stack_complete(cnxt, stack, current_traversal, sender_id, deps,
def propagate_check_resource(cnxt, rpc_client, next_res_id,
current_traversal, predecessors, sender_key,
sender_data, is_update):
sender_data, is_update, adopt_stack_data):
'''
Trigger processing of a node if all of its dependencies are satisfied.
'''
def do_check(entity_id, data):
rpc_client.check_resource(cnxt, entity_id, current_traversal,
data, is_update)
data, is_update, adopt_stack_data)
sync_point.sync(cnxt, next_res_id, current_traversal,
is_update, do_check, predecessors,
{sender_key: sender_data})
def check_resource_update(rsrc, template_id, data, engine_id):
def check_resource_update(rsrc, template_id, resource_data, engine_id):
'''
Create or update the Resource if appropriate.
'''
@ -350,13 +359,13 @@ def check_resource_update(rsrc, template_id, data, engine_id):
resource.Resource.COMPLETE,
resource.Resource.FAILED
])):
rsrc.create_convergence(template_id, data, engine_id)
rsrc.create_convergence(template_id, resource_data, engine_id)
else:
rsrc.update_convergence(template_id, data, engine_id)
rsrc.update_convergence(template_id, resource_data, engine_id)
def check_resource_cleanup(rsrc, template_id, data, engine_id):
def check_resource_cleanup(rsrc, template_id, resource_data, engine_id):
'''
Delete the Resource if appropriate.
'''
rsrc.delete_convergence(template_id, data, engine_id)
rsrc.delete_convergence(template_id, resource_data, engine_id)

@ -28,6 +28,7 @@ class WorkerClient(object):
1.0 - Initial version.
1.1 - Added check_resource.
1.2 - Add adopt data argument to check_resource.
'''
BASE_RPC_API_VERSION = '1.0'
@ -50,8 +51,10 @@ class WorkerClient(object):
client.cast(ctxt, method, **kwargs)
def check_resource(self, ctxt, resource_id,
current_traversal, data, is_update):
self.cast(ctxt, self.make_msg(
'check_resource', resource_id=resource_id,
current_traversal=current_traversal, data=data,
is_update=is_update))
current_traversal, data, is_update, adopt_stack_data):
self.cast(ctxt,
self.make_msg(
'check_resource', resource_id=resource_id,
current_traversal=current_traversal, data=data,
is_update=is_update, adopt_stack_data=adopt_stack_data),
version='1.2')

@ -32,7 +32,7 @@ class WorkerServiceTest(common.HeatTestCase):
def test_make_sure_rpc_version(self):
self.assertEqual(
'1.1',
'1.2',
worker.WorkerService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '
@ -128,14 +128,14 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self.worker.check_resource(
self.ctx, 'non-existant-id', self.stack.current_traversal, {},
True)
True, None)
for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]:
self.assertFalse(mocked.called)
def test_stale_traversal(
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self.worker.check_resource(self.ctx, self.resource.id,
'stale-traversal', {}, True)
'stale-traversal', {}, True, None)
for mocked in [mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid]:
self.assertFalse(mocked.called)
@ -143,7 +143,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self.worker.check_resource(
self.ctx, self.resource.id, self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id,
{}, self.worker.engine_id)
@ -168,7 +168,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
mock_cru.side_effect = resource.UpdateReplace
self.worker.check_resource(
self.ctx, self.resource.id, self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id,
{}, self.worker.engine_id)
@ -185,7 +185,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
mock_tsl.return_value = True
self.worker.check_resource(
self.ctx, self.resource.id, self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
mock_cru.assert_called_once_with(self.resource,
self.resource.stack.t.id,
{}, self.worker.engine_id)
@ -237,7 +237,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
dummy_ex, self.resource, action=self.resource.UPDATE)
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
s = self.stack.load(self.ctx, stack_id=self.stack.id)
self.assertEqual((s.UPDATE, s.FAILED), (s.action, s.status))
self.assertEqual('Resource UPDATE failed: '
@ -257,7 +257,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
dummy_ex, self.resource, action=self.resource.UPDATE)
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
s = self.stack.load(self.ctx, stack_id=self.stack.id)
self.assertEqual((s.UPDATE, s.FAILED), (s.action, s.status))
self.assertEqual('Resource UPDATE failed: '
@ -275,7 +275,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
dummy_ex, self.resource, action=self.resource.UPDATE)
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
self.assertTrue(self.worker._trigger_rollback.called)
# make sure the rollback is called on given stack
call_args, call_kwargs = self.worker._trigger_rollback.call_args
@ -294,7 +294,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
dummy_ex, self.resource, action=self.resource.UPDATE)
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
self.assertTrue(self.worker._trigger_rollback.called)
# make sure the rollback is called on given stack
call_args, call_kwargs = self.worker._trigger_rollback.call_args
@ -312,7 +312,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
dummy_ex, self.resource, action=self.stack.CREATE)
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
self.assertFalse(self.worker._trigger_rollback.called)
def test_rollback_not_re_triggered_for_a_rolling_back_stack(
@ -328,7 +328,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
dummy_ex, self.resource, action=self.stack.CREATE)
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
self.assertFalse(self.worker._trigger_rollback.called)
def test_resource_update_failure_purges_db_for_stack_failure(
@ -342,7 +342,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
dummy_ex, self.resource, action=self.resource.UPDATE)
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
self.assertTrue(self.stack.purge_db.called)
def test_resource_cleanup_failure_purges_db_for_stack_failure(
@ -357,7 +357,7 @@ class CheckWorkflowUpdateTest(common.HeatTestCase):
dummy_ex, self.resource, action=self.resource.UPDATE)
self.worker.check_resource(self.ctx, self.resource.id,
self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
self.assertTrue(self.stack.purge_db.called)
@mock.patch.object(worker.WorkerService, '_retrigger_check_resource')
@ -432,7 +432,7 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
self, mock_cru, mock_crc, mock_pcr, mock_csc, mock_cid):
self.worker.check_resource(
self.ctx, self.resource.id, self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
self.assertFalse(mock_cru.called)
mock_crc.assert_called_once_with(
self.resource, self.resource.stack.t.id,
@ -443,7 +443,7 @@ class CheckWorkflowCleanupTest(common.HeatTestCase):
mock_crc.side_effect = resource.UpdateInProgress
self.worker.check_resource(
self.ctx, self.resource.id, self.stack.current_traversal, {},
self.is_update)
self.is_update, None)
mock_crc.assert_called_once_with(self.resource,
self.resource.stack.t.id,
{}, self.worker.engine_id)
@ -504,7 +504,7 @@ class MiscMethodsTest(common.HeatTestCase):
worker.propagate_check_resource(
self.ctx, mock.ANY, mock.ANY,
self.stack.current_traversal, mock.ANY,
mock.ANY, {}, True)
mock.ANY, {}, True, None)
self.assertTrue(mock_sync.called)
@mock.patch.object(resource.Resource, 'create_convergence')

@ -222,8 +222,8 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
expected_calls.append(
mock.call.worker_client.WorkerClient.check_resource(
stack.context, rsrc_id, stack.current_traversal,
{'input_data': {}, 'adopt_stack_data': None},
is_update))
{'input_data': []},
is_update, None))
self.assertEqual(expected_calls, mock_cr.mock_calls)
def test_conv_string_five_instance_stack_create(self, mock_cr):
@ -279,8 +279,8 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
expected_calls.append(
mock.call.worker_client.WorkerClient.check_resource(
stack.context, rsrc_id, stack.current_traversal,
{'input_data': {}, 'adopt_stack_data': None},
is_update))
{'input_data': []},
is_update, None))
self.assertEqual(expected_calls, mock_cr.mock_calls)
def _mock_conv_update_requires(self, stack, conv_deps):
@ -415,16 +415,16 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
expected_calls.append(
mock.call.worker_client.WorkerClient.check_resource(
stack.context, rsrc_id, stack.current_traversal,
{'input_data': {}, 'adopt_stack_data': None},
is_update))
{'input_data': []},
is_update, None))
leaves = curr_stack.convergence_dependencies.leaves()
for rsrc_id, is_update in leaves:
expected_calls.append(
mock.call.worker_client.WorkerClient.check_resource(
curr_stack.context, rsrc_id, curr_stack.current_traversal,
{'input_data': {}, 'adopt_stack_data': None},
is_update))
{'input_data': []},
is_update, None))
self.assertEqual(expected_calls, mock_cr.mock_calls)
def test_conv_empty_template_stack_update_delete(self, mock_cr):
@ -496,16 +496,16 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
expected_calls.append(
mock.call.worker_client.WorkerClient.check_resource(
stack.context, rsrc_id, stack.current_traversal,
{'input_data': {}, 'adopt_stack_data': None},
is_update))
{'input_data': []},
is_update, None))
leaves = curr_stack.convergence_dependencies.leaves()
for rsrc_id, is_update in leaves:
expected_calls.append(
mock.call.worker_client.WorkerClient.check_resource(
curr_stack.context, rsrc_id, curr_stack.current_traversal,
{'input_data': {}, 'adopt_stack_data': None},
is_update))
{'input_data': []},
is_update, None))
self.assertEqual(expected_calls, mock_cr.mock_calls)
def test_mark_complete_purges_db(self, mock_cr):

Loading…
Cancel
Save