convergence: sync_point fixes

This is a merge of 4 reviews:
I52f1611d34def3474acba0e5eee054e11c5fc5ad
Ic374a38c9d76763be341d3a80f53fa396c9c2256
Iecd21ccb4392369f66fa1b3a0cf55aad754aeac4
I77b81097d2dcf01efa540237ed5ae14896ed1670

- make sure sender is a tuple (otherwise the serialization
  function in sync_point breaks.)
- Update updated_time on any lifecycle operation(CREATE/UPDATE/DELETE)
  over a stack.
- adjust sync_point logic to account for deletes
   Done by having only a single stack sync point
   for both updates and deletes.
- Serialize/deserialize input_data for RPC
- Make GraphKey's the norm in convergence worker
- move temp_update_requires functionality to tests
  During intial stages of convergence to simulate the entire cycle
  some part of worker code was written in stack.py.
  Now that the convergence worker is implemented, this code needs to
  be executed only in tests.
- Fix dictionary structure that's passed to resoure.(create/update)
- Temporarily disable loading cache_data for stack to help fix other
  issues.

Change-Id: Iecd21ccb4392369f66fa1b3a0cf55aad754aeac4
Co-Authored-by: Sirushti Murugesan <sirushti.murugesan@hp.com>
Co-Authored-by: Rakesh H S <rh-s@hp.com>
changes/90/188690/12
Angus Salkeld 7 years ago committed by Anant Patil
parent 26fdc00fb8
commit ad104c51bf
  1. 7
      heat/engine/dependencies.py
  2. 3
      heat/engine/resource.py
  3. 37
      heat/engine/stack.py
  4. 2
      heat/engine/sync_point.py
  5. 34
      heat/engine/worker.py
  6. 8
      heat/tests/test_dependencies.py
  7. 47
      heat/tests/test_engine_service.py
  8. 6
      heat/tests/test_engine_worker.py
  9. 2
      heat/tests/test_stack.py
  10. 4
      heat/tests/test_sync_point.py

@ -256,6 +256,13 @@ class Dependencies(object):
return (requirer for requirer, required in self._graph.items()
if not required)
def roots(self):
'''
Return an iterator over all of the root nodes in the graph.
'''
return (requirer for requirer, required in self.graph(
reverse=True).items() if not required)
def translate(self, transform):
'''
Translate all of the nodes using a transform function.

@ -243,7 +243,8 @@ class Resource(object):
# FIXME(sirushtim): Import this in global space.
from heat.engine import stack as stack_mod
db_res = resource_objects.Resource.get_obj(context, resource_id)
stack = stack_mod.Stack.load(context, db_res.stack_id, cache_data=data)
# TODO(sirushtim): Load stack from cache
stack = stack_mod.Stack.load(context, db_res.stack_id)
# NOTE(sirushtim): Because on delete/cleanup operations, we simply
# update with another template, the stack object won't have the
# template of the previous stack-run.

@ -944,6 +944,7 @@ class Stack(collections.Mapping):
self.t = template
previous_traversal = self.current_traversal
self.current_traversal = uuidutils.generate_uuid()
self.updated_time = datetime.datetime.utcnow()
self.store()
# TODO(later): lifecycle_plugin_utils.do_pre_ops
@ -968,9 +969,7 @@ class Stack(collections.Mapping):
self.id)
# create sync_point entry for stack
sync_point.create(
self.context, self.id, self.current_traversal,
False if self.action in (self.DELETE, self.SUSPEND) else True,
self.id)
self.context, self.id, self.current_traversal, True, self.id)
# Store list of edges
self.current_deps = {
@ -980,14 +979,12 @@ class Stack(collections.Mapping):
for rsrc_id, is_update in self.convergence_dependencies.leaves():
LOG.info(_LI("Triggering resource %(rsrc_id)s "
"for update=%(is_update)s"),
"for %(is_update)s update"),
{'rsrc_id': rsrc_id, 'is_update': is_update})
self.worker_client.check_resource(self.context, rsrc_id,
self.current_traversal,
{}, is_update)
self.temp_update_requires(self.convergence_dependencies)
def _update_or_store_resources(self):
try:
ext_rsrcs_db = resource_objects.Resource.get_all_by_stack(
@ -1059,31 +1056,6 @@ class Stack(collections.Mapping):
dep += (rsrc_id, False), (rsrc_id, True)
return dep
def temp_update_requires(self, conv_deps):
'''updates requires column of resources'''
# This functions should be removed once the dependent patches
# are implemented.
if self.action in (self.CREATE, self.UPDATE):
requires = dict()
for rsrc_id, is_update in conv_deps:
reqs = conv_deps.requires((rsrc_id, is_update))
requires[rsrc_id] = list({id for id, is_update in reqs})
try:
rsrcs_db = resource_objects.Resource.get_all_by_stack(
self.context, self.id)
except exception.NotFound:
rsrcs_db = None
else:
rsrcs_db = {res.id: res for res_name, res in rsrcs_db.items()}
if rsrcs_db:
for id, db_rsrc in rsrcs_db.items():
if id in requires:
resource.Resource.set_requires(
db_rsrc, requires[id]
)
@scheduler.wrappertask
def update_task(self, newstack, action=UPDATE, event=None):
if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):
@ -1592,7 +1564,8 @@ class Stack(collections.Mapping):
return False
def cache_data_resource_id(self, resource_name):
return self.cache_data.get(resource_name, {}).get('id')
return self.cache_data.get(
resource_name, {}).get('physical_resource_id')
def cache_data_resource_attribute(self, resource_name, attribute_key):
return self.cache_data.get(

@ -105,7 +105,7 @@ def sync(cnxt, entity_id, current_traversal, is_update, propagate,
else:
LOG.debug('[%s] Ready %s: Got %s',
key, entity_id, _dump_list(input_data))
propagate(entity_id, input_data)
propagate(entity_id, serialize_input_data(input_data))
class SyncPointNotFound(Exception):

@ -92,9 +92,13 @@ class WorkerService(service.Service):
The node may be associated with either an update or a cleanup of its
associated resource.
'''
data = dict(sync_point.deserialize_input_data(data))
try:
rsrc, stack = resource.Resource.load(cnxt, resource_id, data)
except exception.NotFound:
cache_data = {in_data.get(
'name'): in_data for in_data in data.values()
if in_data is not None}
rsrc, stack = resource.Resource.load(cnxt, resource_id, cache_data)
except (exception.ResourceNotFound, exception.NotFound):
return
tmpl = stack.t
@ -144,10 +148,10 @@ class WorkerService(service.Service):
propagate_check_resource(
cnxt, self._rpc_client, req, current_traversal,
set(graph[(req, fwd)]), graph_key,
input_data if fwd else rsrc.id, fwd)
input_data if fwd else None, fwd)
check_stack_complete(cnxt, rsrc.stack, current_traversal,
rsrc.id, graph, is_update)
rsrc.id, deps, is_update)
except sync_point.SyncPointNotFound:
# NOTE(sirushtim): Implemented by spec
# convergence-concurrent-workflow
@ -167,7 +171,7 @@ def construct_input_data(rsrc):
return input_data
def check_stack_complete(cnxt, stack, current_traversal, sender, graph,
def check_stack_complete(cnxt, stack, current_traversal, sender_id, deps,
is_update):
'''
Mark the stack complete if the update is complete.
@ -175,21 +179,21 @@ def check_stack_complete(cnxt, stack, current_traversal, sender, graph,
Complete is currently in the sense that all desired resources are in
service, not that superfluous ones have been cleaned up.
'''
roots = set(key for (key, fwd), node in graph.items()
if not any(f for k, f in node.required_by()))
roots = set(deps.roots())
if sender not in roots:
if (sender_id, is_update) not in roots:
return
def mark_complete(stack_id, data):
stack.mark_complete(current_traversal)
sync_point.sync(cnxt, stack.id, current_traversal, is_update,
mark_complete, roots, {sender: None})
sender_key = (sender_id, is_update)
sync_point.sync(cnxt, stack.id, current_traversal, True,
mark_complete, roots, {sender_key: None})
def propagate_check_resource(cnxt, rpc_client, next_res_id,
current_traversal, predecessors, sender,
current_traversal, predecessors, sender_key,
sender_data, is_update):
'''
Trigger processing of a node if all of its dependencies are satisfied.
@ -200,19 +204,17 @@ def propagate_check_resource(cnxt, rpc_client, next_res_id,
sync_point.sync(cnxt, next_res_id, current_traversal,
is_update, do_check, predecessors,
{sender: sender_data})
{sender_key: sender_data})
def check_resource_update(rsrc, template_id, data):
'''
Create or update the Resource if appropriate.
'''
input_data = {in_data.name: in_data for in_data in data.values()}
if rsrc.resource_id is None:
rsrc.create_convergence(template_id, input_data)
rsrc.create_convergence(template_id, data)
else:
rsrc.update_convergence(template_id, input_data)
rsrc.update_convergence(template_id, data)
def check_resource_cleanup(rsrc, template_id, data):

@ -234,3 +234,11 @@ class dependenciesTest(common.HeatTestCase):
leaves = sorted(list(d.leaves()))
self.assertEqual(['first1', 'first2'], leaves)
def test_roots(self):
d = dependencies.Dependencies([('last1', 'mid'), ('last2', 'mid'),
('mid', 'first1'), ('mid', 'first2')])
leaves = sorted(list(d.roots()))
self.assertEqual(['last1', 'last2'], leaves)

@ -269,6 +269,25 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
is_update))
self.assertEqual(expected_calls, mock_cr.mock_calls)
def _mock_conv_update_requires(self, stack, conv_deps):
"""Updates requires column of resources.
Required for testing the generation of convergence dependency graph
on an update.
"""
requires = dict()
for rsrc_id, is_update in conv_deps:
reqs = conv_deps.requires((rsrc_id, is_update))
requires[rsrc_id] = list({id for id, is_update in reqs})
rsrcs_db = resource_objects.Resource.get_all_by_stack(
stack.context, stack.id)
for res_name, rsrc in rsrcs_db.items():
if rsrc.id in requires:
rsrcs_db[res_name].requires = requires[rsrc.id]
return rsrcs_db
def test_conv_string_five_instance_stack_update(self, mock_cr):
stack = tools.get_stack('test_stack', utils.dummy_context(),
template=tools.string_template_five,
@ -283,7 +302,13 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
t2 = template_format.parse(string_template_five_update)
template2 = templatem.Template(
t2, env=environment.Environment({'KeyName2': 'test2'}))
curr_stack.converge_stack(template=template2, action=stack.UPDATE)
# on our previous create_complete, worker would have updated the
# rsrc.requires. Mock the same behavior here.
with mock.patch.object(resource_objects.Resource, 'get_all_by_stack',
return_value=self._mock_conv_update_requires(
stack, stack.convergence_dependencies)):
curr_stack.converge_stack(template=template2, action=stack.UPDATE)
self.assertIsNotNone(curr_stack.ext_rsrcs_db)
self.assertEqual('Dependencies(['
@ -337,8 +362,6 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
# check if needed_by are stored properly
# For A & B:
# needed_by=C, F
# TODO(later): when worker is implemented test for current_template_id
# Also test for requires
expected_needed_by = {'A': [3, 8], 'B': [3, 8],
'C': [1, 2],
@ -401,7 +424,12 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
curr_stack_db = stack_object.Stack.get_by_id(stack.context, stack.id)
curr_stack = parser.Stack.load(curr_stack_db._context,
stack=curr_stack_db)
curr_stack.converge_stack(template=template2, action=stack.DELETE)
# on our previous create_complete, worker would have updated the
# rsrc.requires. Mock the same behavior here.
with mock.patch.object(resource_objects.Resource, 'get_all_by_stack',
return_value=self._mock_conv_update_requires(
stack, stack.convergence_dependencies)):
curr_stack.converge_stack(template=template2, action=stack.DELETE)
self.assertIsNotNone(curr_stack.ext_rsrcs_db)
self.assertEqual('Dependencies(['
@ -421,8 +449,6 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
[[4, False], [3, False]]]),
sorted(stack_db.current_deps['edges']))
# TODO(later): when worker is implemented test for current_template_id
# Also test for requires
expected_needed_by = {'A': [3], 'B': [3],
'C': [1, 2],
'D': [], 'E': []}
@ -437,10 +463,13 @@ class StackConvergenceCreateUpdateDeleteTest(common.HeatTestCase):
# check if sync_points are created for cleanup traversal
# [A, B, C, D, E, Stack]
for entity_id in [5, 4, 3, 2, 1, stack_db.id]:
is_update = False
if entity_id == stack_db.id:
is_update = True
sync_point = sync_point_object.SyncPoint.get_by_key(
stack_db._context, entity_id, stack_db.current_traversal, False
)
self.assertIsNotNone(sync_point)
stack_db._context, entity_id, stack_db.current_traversal,
is_update)
self.assertIsNotNone(sync_point, 'entity %s' % entity_id)
self.assertEqual(stack_db.id, sync_point.stack_id)
leaves = stack.convergence_dependencies.leaves()

@ -255,17 +255,17 @@ class MiscMethodsTest(common.HeatTestCase):
def test_check_stack_complete_root(self, mock_sync):
worker.check_stack_complete(
self.ctx, self.stack, self.stack.current_traversal,
self.stack['E'].id, self.stack.convergence_dependencies.graph(),
self.stack['E'].id, self.stack.convergence_dependencies,
True)
mock_sync.assert_called_once_with(
self.ctx, self.stack.id, self.stack.current_traversal, True,
mock.ANY, mock.ANY, {self.stack['E'].id: None})
mock.ANY, mock.ANY, {(self.stack['E'].id, True): None})
@mock.patch.object(sync_point, 'sync')
def test_check_stack_complete_child(self, mock_sync):
worker.check_stack_complete(
self.ctx, self.stack, self.stack.current_traversal,
self.resource.id, self.stack.convergence_dependencies.graph(),
self.resource.id, self.stack.convergence_dependencies,
True)
self.assertFalse(mock_sync.called)

@ -1909,7 +1909,7 @@ class StackTest(common.HeatTestCase):
}
})
cache_data = {'foo': {'id': 'physical-resource-id'}}
cache_data = {'foo': {'physical_resource_id': 'physical-resource-id'}}
tmpl_stack = stack.Stack(self.ctx, 'test', tmpl)
tmpl_stack.store()
lightweight_stack = stack.Stack.load(self.ctx, stack_id=tmpl_stack.id,

@ -62,3 +62,7 @@ class SyncPointTestCase(common.HeatTestCase):
updated_sync_point.input_data)
self.assertEqual({sender: None}, input_data)
self.assertTrue(mock_callback.called)
def test_serialize_input_data(self):
res = sync_point.serialize_input_data({(3L, 8): None})
self.assertEqual({'input_data': [[[3L, 8], None]]}, res)

Loading…
Cancel
Save