Do proper dependency calculations

Change-Id: Ic8a9bd2105ebfff3604a45b45eac39da3b1bbcf5
Signed-off-by: Zane Bitter <zbitter@redhat.com>
This commit is contained in:
Zane Bitter 2012-06-14 11:08:51 +02:00
parent f74219a52d
commit 067e037064
6 changed files with 491 additions and 119 deletions

View File

@ -52,6 +52,8 @@ class CloudWatchAlarm(Resource):
'Megabits/Second', 'Gigabits/Second', 'Terabits/Second', 'Megabits/Second', 'Gigabits/Second', 'Terabits/Second',
'Count/Second', None]}} 'Count/Second', None]}}
strict_dependency = False
def __init__(self, name, json_snippet, stack): def __init__(self, name, json_snippet, stack):
super(CloudWatchAlarm, self).__init__(name, json_snippet, stack) super(CloudWatchAlarm, self).__init__(name, json_snippet, stack)
self.instance_id = '' self.instance_id = ''
@ -96,6 +98,3 @@ class CloudWatchAlarm(Resource):
def FnGetRefId(self): def FnGetRefId(self):
return unicode(self.name) return unicode(self.name)
def strict_dependency(self):
return False

201
heat/engine/dependencies.py Normal file
View File

@ -0,0 +1,201 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import itertools
from heat.common import exception
class CircularDependencyException(exception.OpenstackException):
message = _("Circular Dependency Found: %(cycle)s")
class Dependencies(object):
'''Helper class for calculating a dependency graph'''
class Node(object):
def __init__(self, requires=None, required_by=None):
'''
Initialise the node, optionally with a set of keys this node
requires and/or a set of keys that this node is required by.
'''
self.require = requires and requires.copy() or set()
self.satisfy = required_by and required_by.copy() or set()
def copy(self):
'''Make a copy of the node'''
return Dependencies.Node(self.require, self.satisfy)
def reverse_copy(self):
'''Make a copy of the node with the edge directions reversed'''
return Dependencies.Node(self.satisfy, self.require)
def required_by(self, source=None):
'''
List the keys that require this node, and optionally add a
new one
'''
if source is not None:
self.satisfy.add(source)
return iter(self.satisfy)
def requires(self, target):
'''Add a key that this node requires'''
self.require.add(target)
def __isub__(self, target):
'''Remove a key that this node requires'''
self.require.remove(target)
return self
def __nonzero__(self):
'''Test if this node is a leaf (requires nothing)'''
return bool(self.require)
def stem(self):
'''Test if this node is a stem (required by nothing)'''
return not bool(self.satisfy)
def disjoint(self):
'''Test if this node is both a lead and a stem'''
return self and self.stem()
def __len__(self):
'''Count the number of keys required by this node'''
return len(self.require)
def __iter__(self):
'''Iterate over the keys required by this node'''
return iter(self.require)
def __str__(self):
'''Return a human-readable string representation of the node'''
return '{%s}' % ', '.join(str(n) for n in self)
def __repr__(self):
'''Return a string representation of the node'''
return repr(self.require)
def __init__(self, edges=[]):
'''
Initialise, optionally with a list of edges, in the form of
(requirer, required) tuples.
'''
self.deps = collections.defaultdict(self.Node)
for e in edges:
self += e
def __iadd__(self, edge):
'''Add another edge, in the form of a (requirer, required) tuple'''
requirer, required = edge
if required is None:
# Just ensure the node is created by accessing the defaultdict
self.deps[requirer]
else:
self.deps[required].required_by(requirer)
self.deps[requirer].requires(required)
return self
def __getitem__(self, last):
'''
Return a partial dependency graph consisting of the specified node and
all those that require it only.
'''
if last not in self.deps:
raise KeyError
def get_edges(key):
def requirer_edges(rqr):
# Concatenate the dependency on the current node with the
# recursive generated list
return itertools.chain([(rqr, key)], get_edges(rqr))
# Get the edge list for each node that requires the current node
edge_lists = itertools.imap(requirer_edges,
self.deps[key].required_by())
# Combine the lists into one long list
return itertools.chain.from_iterable(edge_lists)
if self.deps[last].stem():
# Nothing requires this, so just add the node itself
edges = [(last, None)]
else:
edges = get_edges(last)
return Dependencies(edges)
@staticmethod
def _deps_to_str(deps):
'''Convert the given dependency graph to a human-readable string'''
pairs = ('%s: %s' % (str(k), str(v)) for k, v in deps.items())
return '{%s}' % ', '.join(pairs)
def __str__(self):
'''
Return a human-readable string representation of the dependency graph
'''
return self._deps_to_str(self.deps)
def _edges(self):
'''Return an iterator over all of the edges in the graph'''
def outgoing_edges(rqr, node):
if node.disjoint():
yield (rqr, None)
else:
for rqd in node:
yield (rqr, rqd)
return (outgoing_edges(*item) for item in self.deps.iteritems())
def __repr__(self):
'''Return a string representation of the object'''
return 'Dependencies([%s])' % ', '.join(repr(e) for e in edges)
def _toposort(self, deps):
'''Generate a topological sort of a dependency graph'''
def next_leaf():
for leaf, node in deps.items():
if not node:
return leaf, node
# There are nodes remaining, but no more leaves: a cycle
cycle = self._deps_to_str(deps)
raise CircularDependencyException(cycle=cycle)
for iteration in xrange(len(deps)):
leaf, node = next_leaf()
yield leaf
# Remove the node and all edges connected to it before continuing
# to look for more leaves
for src in node.required_by():
deps[src] -= leaf
del deps[leaf]
def _mapgraph(self, func):
'''Map the supplied function onto every node in the graph.'''
return dict((k, func(n)) for k, n in self.deps.items())
def __iter__(self):
'''Return a topologically sorted iterator'''
deps = self._mapgraph(lambda n: n.copy())
return self._toposort(deps)
def __reversed__(self):
'''Return a reverse topologically sorted iterator'''
rev_deps = self._mapgraph(lambda n: n.reverse_copy())
return self._toposort(rev_deps)

View File

@ -381,7 +381,7 @@ class EngineManager(manager.Manager):
""" """
stack = db_api.stack_get(context, stack_name) stack = db_api.stack_get(context, stack_name)
if stack: if stack:
return [r.name for r in stack.resources] return [res.name for res in stack]
else: else:
return None return None
@ -464,7 +464,7 @@ class EngineManager(manager.Manager):
s.raw_template.parsed_template.template, s.raw_template.parsed_template.template,
s.id) s.id)
for a in wr.rule[action_map[new_state]]: for a in wr.rule[action_map[new_state]]:
ps.resources[a].alarm() ps[a].alarm()
wr.last_evaluated = now wr.last_evaluated = now

View File

@ -17,8 +17,10 @@ import eventlet
import json import json
import itertools import itertools
import logging import logging
from heat.common import exception from heat.common import exception
from heat.engine import checkeddict from heat.engine import checkeddict
from heat.engine import dependencies
from heat.engine.resources import Resource from heat.engine.resources import Resource
from heat.db import api as db_api from heat.db import api as db_api
@ -69,12 +71,45 @@ class Stack(object):
if parms is not None: if parms is not None:
self._apply_user_parameters(parms) self._apply_user_parameters(parms)
self.resources = {} self.resources = dict((name,
for rname, rdesc in self.t['Resources'].items(): Resource(name, data, self))
res = Resource(rname, rdesc, self) for (name, data) in self.t['Resources'].items())
self.resources[rname] = res
self.calulate_dependencies(res.t, res) self.dependencies = dependencies.Dependencies()
for resource in self.resources.values():
resource.add_dependencies(self.dependencies)
def __iter__(self):
'''
Return an iterator over this template's resources in the order that
they should be started.
'''
return iter(self.dependencies)
def __reversed__(self):
'''
Return an iterator over this template's resources in the order that
they should be stopped.
'''
return reversed(self.dependencies)
def __len__(self):
'''Return the number of resources'''
return len(self.resources)
def __getitem__(self, key):
'''Get the resource with the specified name.'''
return self.resources[key]
def __contains__(self, key):
'''Determine whether the stack contains the specified resource'''
return key in self.resources
def keys(self):
return self.resources.keys()
def __str__(self):
return 'Stack "%s"' % self.name
def validate(self): def validate(self):
''' '''
@ -84,26 +119,16 @@ class Stack(object):
# TODO(sdake) Should return line number of invalid reference # TODO(sdake) Should return line number of invalid reference
response = None response = None
try:
order = self.get_create_order()
except KeyError as ex:
res = 'A Ref operation referenced a non-existent key '\
'[%s]' % str(ex)
response = {'ValidateTemplateResult': { for res in self:
'Description': 'Malformed Query Response [%s]' % (res),
'Parameters': []}}
return response
for r in order:
try: try:
res = self.resources[r].validate() result = res.validate()
except Exception as ex: except Exception as ex:
logger.exception('validate') logger.exception('validate')
res = str(ex) result = str(ex)
finally:
if res: if result:
err_str = 'Malformed Query Response %s' % (res) err_str = 'Malformed Query Response %s' % result
response = {'ValidateTemplateResult': { response = {'ValidateTemplateResult': {
'Description': err_str, 'Description': err_str,
'Parameters': []}} 'Parameters': []}}
@ -123,33 +148,6 @@ class Stack(object):
response['ValidateTemplateResult']['Parameters'].append(res) response['ValidateTemplateResult']['Parameters'].append(res)
return response return response
def resource_append_deps(self, resource, order_list):
'''
For the given resource first append it's dependancies then
it's self to order_list.
'''
for r in resource.depends_on:
self.resource_append_deps(self.resources[r], order_list)
if not resource.name in order_list:
order_list.append(resource.name)
def get_create_order(self):
'''
return a list of Resource names in the correct order
for startup.
'''
order = []
for r in self.t['Resources']:
if self.t['Resources'][r]['Type'] == 'AWS::EC2::Volume' or \
self.t['Resources'][r]['Type'] == 'AWS::EC2::EIP':
if len(self.resources[r].depends_on) == 0:
order.append(r)
for r in self.t['Resources']:
self.resource_append_deps(self.resources[r], order)
return order
def update_parsed_template(self): def update_parsed_template(self):
''' '''
Update the parsed template after each resource has been Update the parsed template after each resource has been
@ -180,9 +178,8 @@ class Stack(object):
def create_blocking(self): def create_blocking(self):
''' '''
create all the resources in the order specified by get_create_order Create the stack and all of the resources.
''' '''
order = self.get_create_order()
self.status_set(self.IN_PROGRESS, 'Stack creation started') self.status_set(self.IN_PROGRESS, 'Stack creation started')
stack_status = self.CREATE_COMPLETE stack_status = self.CREATE_COMPLETE
@ -197,8 +194,7 @@ class Stack(object):
logger.exception('create timeout conversion') logger.exception('create timeout conversion')
tmo = eventlet.Timeout(secs_tmo) tmo = eventlet.Timeout(secs_tmo)
try: try:
for r in order: for res in self:
res = self.resources[r]
if stack_status != self.CREATE_FAILED: if stack_status != self.CREATE_FAILED:
try: try:
res.create() res.create()
@ -236,24 +232,21 @@ class Stack(object):
def delete_blocking(self): def delete_blocking(self):
''' '''
delete all the resources in the reverse order specified by Delete all of the resources, and then the stack itself.
get_create_order().
''' '''
order = self.get_create_order()
failed = False failed = False
self.status_set(self.DELETE_IN_PROGRESS) self.status_set(self.DELETE_IN_PROGRESS)
for r in reversed(order): for res in reversed(self):
res = self.resources[r]
try: try:
res.delete() res.delete()
except Exception as ex: except Exception as ex:
failed = True failed = True
res.state_set(res.DELETE_FAILED) res.state_set(res.DELETE_FAILED)
logger.error('delete: %s' % str(ex)) logger.error('delete: %s' % str(ex))
else:
try: try:
re = db_api.resource_get(self.context, self.resources[r].id) db_api.resource_get(self.context, res.id).delete()
re.delete()
except Exception as ex: except Exception as ex:
# don't fail the delete if the db entry has # don't fail the delete if the db entry has
# not been created yet. # not been created yet.
@ -292,23 +285,20 @@ class Stack(object):
if stack: if stack:
self.parsed_template_id = stack.raw_template.parsed_template.id self.parsed_template_id = stack.raw_template.parsed_template.id
order = [] deps = self.dependencies[self[resource_name]]
self.resource_append_deps(self.resources[resource_name], order)
failed = False failed = False
for r in reversed(order): for res in reversed(deps):
res = self.resources[r]
try: try:
res.delete() res.delete()
re = db_api.resource_get(self.context, self.resources[r].id) re = db_api.resource_get(self.context, res.id)
re.delete() re.delete()
except Exception as ex: except Exception as ex:
failed = True failed = True
res.state_set(res.DELETE_FAILED) res.state_set(res.DELETE_FAILED)
logger.error('delete: %s' % str(ex)) logger.error('delete: %s' % str(ex))
for r in order: for res in deps:
res = self.resources[r]
if not failed: if not failed:
try: try:
res.create() res.create()
@ -331,26 +321,6 @@ class Stack(object):
pool = eventlet.GreenPool() pool = eventlet.GreenPool()
pool.spawn_n(self.restart_resource_blocking, resource_name) pool.spawn_n(self.restart_resource_blocking, resource_name)
def calulate_dependencies(self, s, r):
if isinstance(s, dict):
for i in s:
if i == 'Fn::GetAtt':
#print '%s seems to depend on %s' % (r.name, s[i][0])
#r.depends_on.append(s[i][0])
pass
elif i == 'Ref':
#print '%s Refences %s' % (r.name, s[i])
if r.strict_dependency():
r.depends_on.append(s[i])
elif i == 'DependsOn':
#print '%s DependsOn on %s' % (r.name, s[i])
r.depends_on.append(s[i])
else:
self.calulate_dependencies(s[i], r)
elif isinstance(s, list):
for index, item in enumerate(s):
self.calulate_dependencies(item, r)
def _apply_user_parameters(self, parms): def _apply_user_parameters(self, parms):
for p in parms: for p in parms:
if 'Parameters.member.' in p and 'ParameterKey' in p: if 'Parameters.member.' in p and 'ParameterKey' in p:
@ -358,7 +328,7 @@ class Stack(object):
try: try:
key_name = 'Parameters.member.%s.ParameterKey' % s[2] key_name = 'Parameters.member.%s.ParameterKey' % s[2]
value_name = 'Parameters.member.%s.ParameterValue' % s[2] value_name = 'Parameters.member.%s.ParameterValue' % s[2]
logger.debug('appling user parameter %s=%s' % logger.debug('applying user parameter %s=%s' %
(key_name, value_name)) (key_name, value_name))
self.parms[parms[key_name]] = parms[value_name] self.parms[parms[key_name]] = parms[value_name]
except Exception: except Exception:
@ -402,15 +372,15 @@ class Stack(object):
{ "Fn::GetAtt" : [ "DBInstance", "Endpoint.Address" ] } { "Fn::GetAtt" : [ "DBInstance", "Endpoint.Address" ] }
''' '''
def match_ref(key, value): def match_ref(key, value):
return key == 'Ref' and value in self.resources return key == 'Ref' and value in self
def handle_ref(arg): def handle_ref(arg):
return self.resources[arg].FnGetRefId() return self[arg].FnGetRefId()
def handle_getatt(args): def handle_getatt(args):
resource, att = args resource, att = args
try: try:
return self.resources[resource].FnGetAtt(att) return self[resource].FnGetAtt(att)
except KeyError: except KeyError:
raise exception.InvalidTemplateAttribute(resource=resource, raise exception.InvalidTemplateAttribute(resource=resource,
key=att) key=att)

View File

@ -42,6 +42,9 @@ class Resource(object):
UPDATE_FAILED = 'UPDATE_FAILED' UPDATE_FAILED = 'UPDATE_FAILED'
UPDATE_COMPLETE = 'UPDATE_COMPLETE' UPDATE_COMPLETE = 'UPDATE_COMPLETE'
# If True, this resource must be created before it can be referenced.
strict_dependency = True
def __new__(cls, name, json, stack): def __new__(cls, name, json, stack):
'''Create a new Resource of the appropriate class for its type.''' '''Create a new Resource of the appropriate class for its type.'''
@ -55,7 +58,6 @@ class Resource(object):
return ResourceClass(name, json, stack) return ResourceClass(name, json, stack)
def __init__(self, name, json_snippet, stack): def __init__(self, name, json_snippet, stack):
self.depends_on = []
self.references = [] self.references = []
self.stack = stack self.stack = stack
self.name = name self.name = name
@ -82,6 +84,26 @@ class Resource(object):
def parsed_template(self): def parsed_template(self):
return self.stack.resolve_runtime_data(self.t) return self.stack.resolve_runtime_data(self.t)
def __str__(self):
return '%s "%s"' % (self.__class__.__name__, self.name)
def _add_dependencies(self, deps, fragment):
if isinstance(fragment, dict):
for key, value in fragment.items():
if key in ('DependsOn', 'Ref'):
target = self.stack.resources[value]
if key == 'DependsOn' or target.strict_dependency:
deps += (self, target)
elif key != 'Fn::GetAtt':
self._add_dependencies(deps, value)
elif isinstance(fragment, list):
for item in fragment:
self._add_dependencies(deps, item)
def add_dependencies(self, deps):
self._add_dependencies(deps, self.t)
deps += (self, None)
def keystone(self): def keystone(self):
if self._keystone: if self._keystone:
return self._keystone return self._keystone
@ -209,12 +231,6 @@ class Resource(object):
''' '''
return base64.b64encode(data) return base64.b64encode(data)
def strict_dependency(self):
'''
If True, this resource must be created before it can be referenced.
'''
return True
class GenericResource(Resource): class GenericResource(Resource):
properties_schema = {} properties_schema = {}

View File

@ -0,0 +1,186 @@
import nose
import unittest
from nose.plugins.attrib import attr
from heat.engine.dependencies import Dependencies
from heat.engine.dependencies import CircularDependencyException
@attr(tag=['unit', 'dependencies'])
@attr(speed='fast')
class dependenciesTest(unittest.TestCase):
def _dep_test(self, func, checkorder, deps):
nodes = set.union(*[set(e) for e in deps])
d = Dependencies(deps)
order = list(func(d))
for n in nodes:
self.assertTrue(n in order, '"%s" is not in the sequence' % n)
self.assertEqual(order.count(n), 1)
self.assertEqual(len(order), len(nodes))
for l, f in deps:
checkorder(order.index(f), order.index(l))
def _dep_test_fwd(self, *deps):
def assertLess(a, b):
self.assertTrue(a < b,
'"%s" is not less than "%s"' % (str(a), str(b)))
self._dep_test(iter, assertLess, deps)
def _dep_test_rev(self, *deps):
def assertGreater(a, b):
self.assertTrue(a > b,
'"%s" is not greater than "%s"' % (str(a), str(b)))
self._dep_test(reversed, assertGreater, deps)
def test_single_node(self):
d = Dependencies([('only', None)])
l = list(iter(d))
self.assertEqual(len(l), 1)
self.assertEqual(l[0], 'only')
def test_disjoint(self):
d = Dependencies([('1', None), ('2', None)])
l = list(iter(d))
self.assertEqual(len(l), 2)
self.assertTrue('1' in l)
self.assertTrue('2' in l)
def test_single_fwd(self):
self._dep_test_fwd(('second', 'first'))
def test_single_rev(self):
self._dep_test_rev(('second', 'first'))
def test_chain_fwd(self):
self._dep_test_fwd(('third', 'second'), ('second', 'first'))
def test_chain_rev(self):
self._dep_test_rev(('third', 'second'), ('second', 'first'))
def test_diamond_fwd(self):
self._dep_test_fwd(('last', 'mid1'), ('last', 'mid2'),
('mid1', 'first'), ('mid2', 'first'))
def test_diamond_rev(self):
self._dep_test_rev(('last', 'mid1'), ('last', 'mid2'),
('mid1', 'first'), ('mid2', 'first'))
def test_complex_fwd(self):
self._dep_test_fwd(('last', 'mid1'), ('last', 'mid2'),
('mid1', 'mid3'), ('mid1', 'first'),
('mid3', 'first'), ('mid2', 'first'))
def test_complex_rev(self):
self._dep_test_rev(('last', 'mid1'), ('last', 'mid2'),
('mid1', 'mid3'), ('mid1', 'first'),
('mid3', 'first'), ('mid2', 'first'))
def test_many_edges_fwd(self):
self._dep_test_fwd(('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3'))
def test_many_edges_rev(self):
self._dep_test_rev(('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3'))
def test_dbldiamond_fwd(self):
self._dep_test_fwd(('last', 'a1'), ('last', 'a2'),
('a1', 'b1'), ('a2', 'b1'), ('a2', 'b2'),
('b1', 'first'), ('b2', 'first'))
def test_dbldiamond_rev(self):
self._dep_test_rev(('last', 'a1'), ('last', 'a2'),
('a1', 'b1'), ('a2', 'b1'), ('a2', 'b2'),
('b1', 'first'), ('b2', 'first'))
def test_circular_fwd(self):
d = Dependencies([('first', 'second'),
('second', 'third'),
('third', 'first')])
self.assertRaises(CircularDependencyException, list, iter(d))
def test_circular_rev(self):
d = Dependencies([('first', 'second'),
('second', 'third'),
('third', 'first')])
self.assertRaises(CircularDependencyException, list, reversed(d))
def test_self_ref(self):
d = Dependencies([('node', 'node')])
self.assertRaises(CircularDependencyException, list, iter(d))
def test_complex_circular_fwd(self):
d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3'),
('e3', 'mid1')])
self.assertRaises(CircularDependencyException, list, iter(d))
def test_complex_circular_rev(self):
d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3'),
('e3', 'mid1')])
self.assertRaises(CircularDependencyException, list, reversed(d))
def test_noexist_partial(self):
d = Dependencies([('foo', 'bar')])
get = lambda i: d[i]
self.assertRaises(KeyError, get, 'baz')
def test_single_partial(self):
d = Dependencies([('last', 'first')])
p = d['last']
l = list(iter(p))
self.assertEqual(len(l), 1)
self.assertEqual(l[0], 'last')
def test_simple_partial(self):
d = Dependencies([('last', 'middle'), ('middle', 'first')])
p = d['middle']
order = list(iter(p))
self.assertEqual(len(order), 2)
for n in ('last', 'middle'):
self.assertTrue(n in order,
"'%s' not found in dependency order" % n)
self.assertTrue(order.index('last') > order.index('middle'))
def test_simple_multilevel_partial(self):
d = Dependencies([('last', 'middle'),
('middle', 'target'),
('target', 'first')])
p = d['target']
order = list(iter(p))
self.assertEqual(len(order), 3)
for n in ('last', 'middle', 'target'):
self.assertTrue(n in order,
"'%s' not found in dependency order" % n)
def test_complex_partial(self):
d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'),
('mid1', 'e2'), ('mid1', 'mid3'),
('mid2', 'mid3'),
('mid3', 'e3')])
p = d['mid3']
order = list(iter(p))
self.assertEqual(len(order), 4)
for n in ('last', 'mid1', 'mid2', 'mid3'):
self.assertTrue(n in order,
"'%s' not found in dependency order" % n)
# allows testing of the test directly, shown below
if __name__ == '__main__':
sys.argv.append(__file__)
nose.main()