diff --git a/heat/engine/check_resource.py b/heat/engine/check_resource.py index 94fe73dcd7..c41584507f 100644 --- a/heat/engine/check_resource.py +++ b/heat/engine/check_resource.py @@ -195,7 +195,7 @@ class CheckResource(object): stack): deps = stack.convergence_dependencies graph = deps.graph() - graph_key = (resource_id, is_update) + graph_key = parser.ConvergenceNode(resource_id, is_update) if graph_key not in graph and rsrc.replaces is not None: # If we are a replacement, impersonate the replaced resource for @@ -204,10 +204,10 @@ class CheckResource(object): # graph. Our real resource ID is sent in the input_data, so the # dependencies will get updated to point to this resource in time # for the next traversal. - graph_key = (rsrc.replaces, is_update) + graph_key = parser.ConvergenceNode(rsrc.replaces, is_update) - def _get_input_data(req, fwd, input_forward_data=None): - if fwd: + def _get_input_data(req_node, input_forward_data=None): + if req_node.is_update: if input_forward_data is None: return rsrc.node_data().as_dict() else: @@ -216,7 +216,7 @@ class CheckResource(object): else: # Don't send data if initiating clean-up for self i.e. # initiating delete of a replaced resource - if req not in graph_key: + if req_node.rsrc_id != graph_key.rsrc_id: # send replaced resource as needed_by if it exists return (rsrc.replaced_by if rsrc.replaced_by is not None @@ -225,13 +225,14 @@ class CheckResource(object): try: input_forward_data = None - for req, fwd in deps.required_by(graph_key): - input_data = _get_input_data(req, fwd, input_forward_data) - if fwd: + for req_node in deps.required_by(graph_key): + input_data = _get_input_data(req_node, input_forward_data) + if req_node.is_update: input_forward_data = input_data propagate_check_resource( - cnxt, self._rpc_client, req, current_traversal, - set(graph[(req, fwd)]), graph_key, input_data, fwd, + cnxt, self._rpc_client, req_node.rsrc_id, + current_traversal, set(graph[req_node]), + graph_key, input_data, req_node.is_update, stack.adopt_stack_data) if is_update: if input_forward_data is None: @@ -241,7 +242,7 @@ class CheckResource(object): else: rsrc.store_attributes() check_stack_complete(cnxt, stack, current_traversal, - graph_key[0], deps, graph_key[1]) + graph_key.rsrc_id, deps, graph_key.is_update) except exception.EntityNotFound as e: if e.entity == "Sync Point": # Reload the stack to determine the current traversal, and diff --git a/heat/engine/stack.py b/heat/engine/stack.py index 7512789d30..0c3ecf6669 100644 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -57,6 +57,10 @@ from heat.rpc import worker_client as rpc_worker_client LOG = logging.getLogger(__name__) +ConvergenceNode = collections.namedtuple('ConvergenceNode', + ['rsrc_id', 'is_update']) + + class ForcedCancel(Exception): """Exception raised to cancel task execution.""" @@ -1442,27 +1446,32 @@ class Stack(collections.Mapping): def _compute_convg_dependencies(self, existing_resources, current_template_deps, current_resources): def make_graph_key(rsrc): - return current_resources[rsrc.name].id, True + return ConvergenceNode(current_resources[rsrc.name].id, True) + dep = current_template_deps.translate(make_graph_key) if existing_resources: for rsrc_id, rsrc in existing_resources.items(): - dep += (rsrc_id, False), None + dep += ConvergenceNode(rsrc_id, False), None for requirement in rsrc.requires: if requirement in existing_resources: - dep += (requirement, False), (rsrc_id, False) + dep += (ConvergenceNode(requirement, False), + ConvergenceNode(rsrc_id, False)) if rsrc.replaces in existing_resources: - dep += (rsrc.replaces, False), (rsrc_id, False) + dep += (ConvergenceNode(rsrc.replaces, False), + ConvergenceNode(rsrc_id, False)) - if (rsrc.id, True) in dep: - dep += (rsrc_id, False), (rsrc_id, True) + if ConvergenceNode(rsrc.id, True) in dep: + dep += (ConvergenceNode(rsrc_id, False), + ConvergenceNode(rsrc_id, True)) self._convg_deps = dep @property def convergence_dependencies(self): if self._convg_deps is None: - current_deps = ([tuple(i), (tuple(j) if j is not None else None)] + current_deps = ((ConvergenceNode(*i), + ConvergenceNode(*j) if j is not None else None) for i, j in self.current_deps['edges']) self._convg_deps = dependencies.Dependencies(edges=current_deps) diff --git a/heat/engine/sync_point.py b/heat/engine/sync_point.py index a8feea4af1..0ea8dd2048 100644 --- a/heat/engine/sync_point.py +++ b/heat/engine/sync_point.py @@ -74,7 +74,7 @@ def update_input_data(context, entity_id, current_traversal, def str_pack_tuple(t): - return u'tuple:' + str(t) + return u'tuple:' + str(tuple(t)) def _str_unpack_tuple(s): diff --git a/heat/engine/worker.py b/heat/engine/worker.py index 4bfcfab443..2e38e7acb5 100644 --- a/heat/engine/worker.py +++ b/heat/engine/worker.py @@ -145,7 +145,7 @@ class WorkerService(object): def _retrigger_replaced(self, is_update, rsrc, stack, check_resource): graph = stack.convergence_dependencies.graph() - key = (rsrc.id, is_update) + key = parser.ConvergenceNode(rsrc.id, is_update) if key not in graph and rsrc.replaces is not None: # This resource replaces old one and is not needed in # current traversal. You need to mark the resource as @@ -154,9 +154,9 @@ class WorkerService(object): db_api.resource_update_and_save(stack.context, rsrc.id, values) # The old resource might be in the graph (a rollback case); # just re-trigger it. - key = (rsrc.replaces, is_update) + key = parser.ConvergenceNode(rsrc.replaces, is_update) check_resource.retrigger_check_resource(stack.context, is_update, - key[0], stack) + key.rsrc_id, stack) @context.request_context @log_exceptions