Use a namedtuple for convergence graph nodes

The node key in the convergence graph is a (resource id, update/!cleanup)
tuple. Sometimes it would be convenient to access the members by name, so
convert to a namedtuple.

Change-Id: Id8c159b0137df091e96f1f8d2312395d4a5664ee
This commit is contained in:
Zane Bitter 2017-09-20 13:34:53 -04:00
parent d64de0e4e1
commit 6a176a270c
4 changed files with 32 additions and 22 deletions

View File

@ -195,7 +195,7 @@ class CheckResource(object):
stack):
deps = stack.convergence_dependencies
graph = deps.graph()
graph_key = (resource_id, is_update)
graph_key = parser.ConvergenceNode(resource_id, is_update)
if graph_key not in graph and rsrc.replaces is not None:
# If we are a replacement, impersonate the replaced resource for
@ -204,10 +204,10 @@ class CheckResource(object):
# graph. Our real resource ID is sent in the input_data, so the
# dependencies will get updated to point to this resource in time
# for the next traversal.
graph_key = (rsrc.replaces, is_update)
graph_key = parser.ConvergenceNode(rsrc.replaces, is_update)
def _get_input_data(req, fwd, input_forward_data=None):
if fwd:
def _get_input_data(req_node, input_forward_data=None):
if req_node.is_update:
if input_forward_data is None:
return rsrc.node_data().as_dict()
else:
@ -216,7 +216,7 @@ class CheckResource(object):
else:
# Don't send data if initiating clean-up for self i.e.
# initiating delete of a replaced resource
if req not in graph_key:
if req_node.rsrc_id != graph_key.rsrc_id:
# send replaced resource as needed_by if it exists
return (rsrc.replaced_by
if rsrc.replaced_by is not None
@ -225,13 +225,14 @@ class CheckResource(object):
try:
input_forward_data = None
for req, fwd in deps.required_by(graph_key):
input_data = _get_input_data(req, fwd, input_forward_data)
if fwd:
for req_node in deps.required_by(graph_key):
input_data = _get_input_data(req_node, input_forward_data)
if req_node.is_update:
input_forward_data = input_data
propagate_check_resource(
cnxt, self._rpc_client, req, current_traversal,
set(graph[(req, fwd)]), graph_key, input_data, fwd,
cnxt, self._rpc_client, req_node.rsrc_id,
current_traversal, set(graph[req_node]),
graph_key, input_data, req_node.is_update,
stack.adopt_stack_data)
if is_update:
if input_forward_data is None:
@ -241,7 +242,7 @@ class CheckResource(object):
else:
rsrc.store_attributes()
check_stack_complete(cnxt, stack, current_traversal,
graph_key[0], deps, graph_key[1])
graph_key.rsrc_id, deps, graph_key.is_update)
except exception.EntityNotFound as e:
if e.entity == "Sync Point":
# Reload the stack to determine the current traversal, and

View File

@ -57,6 +57,10 @@ from heat.rpc import worker_client as rpc_worker_client
LOG = logging.getLogger(__name__)
ConvergenceNode = collections.namedtuple('ConvergenceNode',
['rsrc_id', 'is_update'])
class ForcedCancel(Exception):
"""Exception raised to cancel task execution."""
@ -1442,27 +1446,32 @@ class Stack(collections.Mapping):
def _compute_convg_dependencies(self, existing_resources,
current_template_deps, current_resources):
def make_graph_key(rsrc):
return current_resources[rsrc.name].id, True
return ConvergenceNode(current_resources[rsrc.name].id, True)
dep = current_template_deps.translate(make_graph_key)
if existing_resources:
for rsrc_id, rsrc in existing_resources.items():
dep += (rsrc_id, False), None
dep += ConvergenceNode(rsrc_id, False), None
for requirement in rsrc.requires:
if requirement in existing_resources:
dep += (requirement, False), (rsrc_id, False)
dep += (ConvergenceNode(requirement, False),
ConvergenceNode(rsrc_id, False))
if rsrc.replaces in existing_resources:
dep += (rsrc.replaces, False), (rsrc_id, False)
dep += (ConvergenceNode(rsrc.replaces, False),
ConvergenceNode(rsrc_id, False))
if (rsrc.id, True) in dep:
dep += (rsrc_id, False), (rsrc_id, True)
if ConvergenceNode(rsrc.id, True) in dep:
dep += (ConvergenceNode(rsrc_id, False),
ConvergenceNode(rsrc_id, True))
self._convg_deps = dep
@property
def convergence_dependencies(self):
if self._convg_deps is None:
current_deps = ([tuple(i), (tuple(j) if j is not None else None)]
current_deps = ((ConvergenceNode(*i),
ConvergenceNode(*j) if j is not None else None)
for i, j in self.current_deps['edges'])
self._convg_deps = dependencies.Dependencies(edges=current_deps)

View File

@ -74,7 +74,7 @@ def update_input_data(context, entity_id, current_traversal,
def str_pack_tuple(t):
return u'tuple:' + str(t)
return u'tuple:' + str(tuple(t))
def _str_unpack_tuple(s):

View File

@ -145,7 +145,7 @@ class WorkerService(object):
def _retrigger_replaced(self, is_update, rsrc, stack, check_resource):
graph = stack.convergence_dependencies.graph()
key = (rsrc.id, is_update)
key = parser.ConvergenceNode(rsrc.id, is_update)
if key not in graph and rsrc.replaces is not None:
# This resource replaces old one and is not needed in
# current traversal. You need to mark the resource as
@ -154,9 +154,9 @@ class WorkerService(object):
db_api.resource_update_and_save(stack.context, rsrc.id, values)
# The old resource might be in the graph (a rollback case);
# just re-trigger it.
key = (rsrc.replaces, is_update)
key = parser.ConvergenceNode(rsrc.replaces, is_update)
check_resource.retrigger_check_resource(stack.context, is_update,
key[0], stack)
key.rsrc_id, stack)
@context.request_context
@log_exceptions