diff --git a/heat/engine/cloud_watch.py b/heat/engine/cloud_watch.py index 2d8c7a0b5d..e0e9015df2 100644 --- a/heat/engine/cloud_watch.py +++ b/heat/engine/cloud_watch.py @@ -52,6 +52,8 @@ class CloudWatchAlarm(Resource): 'Megabits/Second', 'Gigabits/Second', 'Terabits/Second', 'Count/Second', None]}} + strict_dependency = False + def __init__(self, name, json_snippet, stack): super(CloudWatchAlarm, self).__init__(name, json_snippet, stack) self.instance_id = '' @@ -96,6 +98,3 @@ class CloudWatchAlarm(Resource): def FnGetRefId(self): return unicode(self.name) - - def strict_dependency(self): - return False diff --git a/heat/engine/dependencies.py b/heat/engine/dependencies.py new file mode 100644 index 0000000000..1f900d4f89 --- /dev/null +++ b/heat/engine/dependencies.py @@ -0,0 +1,201 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import itertools + +from heat.common import exception + + +class CircularDependencyException(exception.OpenstackException): + message = _("Circular Dependency Found: %(cycle)s") + + +class Dependencies(object): + '''Helper class for calculating a dependency graph''' + + class Node(object): + def __init__(self, requires=None, required_by=None): + ''' + Initialise the node, optionally with a set of keys this node + requires and/or a set of keys that this node is required by. + ''' + self.require = requires and requires.copy() or set() + self.satisfy = required_by and required_by.copy() or set() + + def copy(self): + '''Make a copy of the node''' + return Dependencies.Node(self.require, self.satisfy) + + def reverse_copy(self): + '''Make a copy of the node with the edge directions reversed''' + return Dependencies.Node(self.satisfy, self.require) + + def required_by(self, source=None): + ''' + List the keys that require this node, and optionally add a + new one + ''' + if source is not None: + self.satisfy.add(source) + return iter(self.satisfy) + + def requires(self, target): + '''Add a key that this node requires''' + self.require.add(target) + + def __isub__(self, target): + '''Remove a key that this node requires''' + self.require.remove(target) + return self + + def __nonzero__(self): + '''Test if this node is a leaf (requires nothing)''' + return bool(self.require) + + def stem(self): + '''Test if this node is a stem (required by nothing)''' + return not bool(self.satisfy) + + def disjoint(self): + '''Test if this node is both a lead and a stem''' + return self and self.stem() + + def __len__(self): + '''Count the number of keys required by this node''' + return len(self.require) + + def __iter__(self): + '''Iterate over the keys required by this node''' + return iter(self.require) + + def __str__(self): + '''Return a human-readable string representation of the node''' + return '{%s}' % ', '.join(str(n) for n in self) + + def __repr__(self): + '''Return a string representation of the node''' + return repr(self.require) + + def __init__(self, edges=[]): + ''' + Initialise, optionally with a list of edges, in the form of + (requirer, required) tuples. + ''' + self.deps = collections.defaultdict(self.Node) + for e in edges: + self += e + + def __iadd__(self, edge): + '''Add another edge, in the form of a (requirer, required) tuple''' + requirer, required = edge + + if required is None: + # Just ensure the node is created by accessing the defaultdict + self.deps[requirer] + else: + self.deps[required].required_by(requirer) + self.deps[requirer].requires(required) + + return self + + def __getitem__(self, last): + ''' + Return a partial dependency graph consisting of the specified node and + all those that require it only. + ''' + if last not in self.deps: + raise KeyError + + def get_edges(key): + def requirer_edges(rqr): + # Concatenate the dependency on the current node with the + # recursive generated list + return itertools.chain([(rqr, key)], get_edges(rqr)) + + # Get the edge list for each node that requires the current node + edge_lists = itertools.imap(requirer_edges, + self.deps[key].required_by()) + # Combine the lists into one long list + return itertools.chain.from_iterable(edge_lists) + + if self.deps[last].stem(): + # Nothing requires this, so just add the node itself + edges = [(last, None)] + else: + edges = get_edges(last) + + return Dependencies(edges) + + @staticmethod + def _deps_to_str(deps): + '''Convert the given dependency graph to a human-readable string''' + pairs = ('%s: %s' % (str(k), str(v)) for k, v in deps.items()) + return '{%s}' % ', '.join(pairs) + + def __str__(self): + ''' + Return a human-readable string representation of the dependency graph + ''' + return self._deps_to_str(self.deps) + + def _edges(self): + '''Return an iterator over all of the edges in the graph''' + def outgoing_edges(rqr, node): + if node.disjoint(): + yield (rqr, None) + else: + for rqd in node: + yield (rqr, rqd) + return (outgoing_edges(*item) for item in self.deps.iteritems()) + + def __repr__(self): + '''Return a string representation of the object''' + return 'Dependencies([%s])' % ', '.join(repr(e) for e in edges) + + def _toposort(self, deps): + '''Generate a topological sort of a dependency graph''' + def next_leaf(): + for leaf, node in deps.items(): + if not node: + return leaf, node + + # There are nodes remaining, but no more leaves: a cycle + cycle = self._deps_to_str(deps) + raise CircularDependencyException(cycle=cycle) + + for iteration in xrange(len(deps)): + leaf, node = next_leaf() + yield leaf + + # Remove the node and all edges connected to it before continuing + # to look for more leaves + for src in node.required_by(): + deps[src] -= leaf + del deps[leaf] + + def _mapgraph(self, func): + '''Map the supplied function onto every node in the graph.''' + return dict((k, func(n)) for k, n in self.deps.items()) + + def __iter__(self): + '''Return a topologically sorted iterator''' + deps = self._mapgraph(lambda n: n.copy()) + return self._toposort(deps) + + def __reversed__(self): + '''Return a reverse topologically sorted iterator''' + rev_deps = self._mapgraph(lambda n: n.reverse_copy()) + return self._toposort(rev_deps) diff --git a/heat/engine/manager.py b/heat/engine/manager.py index 69e98dd915..8f7ae77e22 100644 --- a/heat/engine/manager.py +++ b/heat/engine/manager.py @@ -381,7 +381,7 @@ class EngineManager(manager.Manager): """ stack = db_api.stack_get(context, stack_name) if stack: - return [r.name for r in stack.resources] + return [res.name for res in stack] else: return None @@ -464,7 +464,7 @@ class EngineManager(manager.Manager): s.raw_template.parsed_template.template, s.id) for a in wr.rule[action_map[new_state]]: - ps.resources[a].alarm() + ps[a].alarm() wr.last_evaluated = now diff --git a/heat/engine/parser.py b/heat/engine/parser.py index 8494e5c7d0..6001900882 100644 --- a/heat/engine/parser.py +++ b/heat/engine/parser.py @@ -17,8 +17,10 @@ import eventlet import json import itertools import logging + from heat.common import exception from heat.engine import checkeddict +from heat.engine import dependencies from heat.engine.resources import Resource from heat.db import api as db_api @@ -69,45 +71,68 @@ class Stack(object): if parms is not None: self._apply_user_parameters(parms) - self.resources = {} - for rname, rdesc in self.t['Resources'].items(): - res = Resource(rname, rdesc, self) - self.resources[rname] = res + self.resources = dict((name, + Resource(name, data, self)) + for (name, data) in self.t['Resources'].items()) - self.calulate_dependencies(res.t, res) + self.dependencies = dependencies.Dependencies() + for resource in self.resources.values(): + resource.add_dependencies(self.dependencies) + + def __iter__(self): + ''' + Return an iterator over this template's resources in the order that + they should be started. + ''' + return iter(self.dependencies) + + def __reversed__(self): + ''' + Return an iterator over this template's resources in the order that + they should be stopped. + ''' + return reversed(self.dependencies) + + def __len__(self): + '''Return the number of resources''' + return len(self.resources) + + def __getitem__(self, key): + '''Get the resource with the specified name.''' + return self.resources[key] + + def __contains__(self, key): + '''Determine whether the stack contains the specified resource''' + return key in self.resources + + def keys(self): + return self.resources.keys() + + def __str__(self): + return 'Stack "%s"' % self.name def validate(self): ''' - http://docs.amazonwebservices.com/AWSCloudFormation/latest/ \ - APIReference/API_ValidateTemplate.html + http://docs.amazonwebservices.com/AWSCloudFormation/latest/\ + APIReference/API_ValidateTemplate.html ''' # TODO(sdake) Should return line number of invalid reference response = None - try: - order = self.get_create_order() - except KeyError as ex: - res = 'A Ref operation referenced a non-existent key '\ - '[%s]' % str(ex) - response = {'ValidateTemplateResult': { - 'Description': 'Malformed Query Response [%s]' % (res), - 'Parameters': []}} - return response - - for r in order: + for res in self: try: - res = self.resources[r].validate() + result = res.validate() except Exception as ex: logger.exception('validate') - res = str(ex) - finally: - if res: - err_str = 'Malformed Query Response %s' % (res) - response = {'ValidateTemplateResult': { + result = str(ex) + + if result: + err_str = 'Malformed Query Response %s' % result + response = {'ValidateTemplateResult': { 'Description': err_str, 'Parameters': []}} - return response + return response if response is None: response = {'ValidateTemplateResult': { @@ -123,33 +148,6 @@ class Stack(object): response['ValidateTemplateResult']['Parameters'].append(res) return response - def resource_append_deps(self, resource, order_list): - ''' - For the given resource first append it's dependancies then - it's self to order_list. - ''' - for r in resource.depends_on: - self.resource_append_deps(self.resources[r], order_list) - if not resource.name in order_list: - order_list.append(resource.name) - - def get_create_order(self): - ''' - return a list of Resource names in the correct order - for startup. - ''' - order = [] - for r in self.t['Resources']: - if self.t['Resources'][r]['Type'] == 'AWS::EC2::Volume' or \ - self.t['Resources'][r]['Type'] == 'AWS::EC2::EIP': - if len(self.resources[r].depends_on) == 0: - order.append(r) - - for r in self.t['Resources']: - self.resource_append_deps(self.resources[r], order) - - return order - def update_parsed_template(self): ''' Update the parsed template after each resource has been @@ -180,9 +178,8 @@ class Stack(object): def create_blocking(self): ''' - create all the resources in the order specified by get_create_order + Create the stack and all of the resources. ''' - order = self.get_create_order() self.status_set(self.IN_PROGRESS, 'Stack creation started') stack_status = self.CREATE_COMPLETE @@ -197,8 +194,7 @@ class Stack(object): logger.exception('create timeout conversion') tmo = eventlet.Timeout(secs_tmo) try: - for r in order: - res = self.resources[r] + for res in self: if stack_status != self.CREATE_FAILED: try: res.create() @@ -236,31 +232,28 @@ class Stack(object): def delete_blocking(self): ''' - delete all the resources in the reverse order specified by - get_create_order(). + Delete all of the resources, and then the stack itself. ''' - order = self.get_create_order() failed = False self.status_set(self.DELETE_IN_PROGRESS) - for r in reversed(order): - res = self.resources[r] + for res in reversed(self): try: res.delete() except Exception as ex: failed = True res.state_set(res.DELETE_FAILED) logger.error('delete: %s' % str(ex)) - try: - re = db_api.resource_get(self.context, self.resources[r].id) - re.delete() - except Exception as ex: - # don't fail the delete if the db entry has - # not been created yet. - if 'not found' not in str(ex): - failed = True - res.state_set(res.DELETE_FAILED) - logger.error('delete: %s' % str(ex)) + else: + try: + db_api.resource_get(self.context, res.id).delete() + except Exception as ex: + # don't fail the delete if the db entry has + # not been created yet. + if 'not found' not in str(ex): + failed = True + res.state_set(res.DELETE_FAILED) + logger.error('delete: %s' % str(ex)) self.status_set(failed and self.DELETE_FAILED or self.DELETE_COMPLETE) if not failed: @@ -292,23 +285,20 @@ class Stack(object): if stack: self.parsed_template_id = stack.raw_template.parsed_template.id - order = [] - self.resource_append_deps(self.resources[resource_name], order) + deps = self.dependencies[self[resource_name]] failed = False - for r in reversed(order): - res = self.resources[r] + for res in reversed(deps): try: res.delete() - re = db_api.resource_get(self.context, self.resources[r].id) + re = db_api.resource_get(self.context, res.id) re.delete() except Exception as ex: failed = True res.state_set(res.DELETE_FAILED) logger.error('delete: %s' % str(ex)) - for r in order: - res = self.resources[r] + for res in deps: if not failed: try: res.create() @@ -331,26 +321,6 @@ class Stack(object): pool = eventlet.GreenPool() pool.spawn_n(self.restart_resource_blocking, resource_name) - def calulate_dependencies(self, s, r): - if isinstance(s, dict): - for i in s: - if i == 'Fn::GetAtt': - #print '%s seems to depend on %s' % (r.name, s[i][0]) - #r.depends_on.append(s[i][0]) - pass - elif i == 'Ref': - #print '%s Refences %s' % (r.name, s[i]) - if r.strict_dependency(): - r.depends_on.append(s[i]) - elif i == 'DependsOn': - #print '%s DependsOn on %s' % (r.name, s[i]) - r.depends_on.append(s[i]) - else: - self.calulate_dependencies(s[i], r) - elif isinstance(s, list): - for index, item in enumerate(s): - self.calulate_dependencies(item, r) - def _apply_user_parameters(self, parms): for p in parms: if 'Parameters.member.' in p and 'ParameterKey' in p: @@ -358,8 +328,8 @@ class Stack(object): try: key_name = 'Parameters.member.%s.ParameterKey' % s[2] value_name = 'Parameters.member.%s.ParameterValue' % s[2] - logger.debug('appling user parameter %s=%s' % - (key_name, value_name)) + logger.debug('applying user parameter %s=%s' % + (key_name, value_name)) self.parms[parms[key_name]] = parms[value_name] except Exception: logger.error('Could not apply parameter %s' % p) @@ -402,15 +372,15 @@ class Stack(object): { "Fn::GetAtt" : [ "DBInstance", "Endpoint.Address" ] } ''' def match_ref(key, value): - return key == 'Ref' and value in self.resources + return key == 'Ref' and value in self def handle_ref(arg): - return self.resources[arg].FnGetRefId() + return self[arg].FnGetRefId() def handle_getatt(args): resource, att = args try: - return self.resources[resource].FnGetAtt(att) + return self[resource].FnGetAtt(att) except KeyError: raise exception.InvalidTemplateAttribute(resource=resource, key=att) diff --git a/heat/engine/resources.py b/heat/engine/resources.py index 82b791540a..5a0f0d0117 100644 --- a/heat/engine/resources.py +++ b/heat/engine/resources.py @@ -42,6 +42,9 @@ class Resource(object): UPDATE_FAILED = 'UPDATE_FAILED' UPDATE_COMPLETE = 'UPDATE_COMPLETE' + # If True, this resource must be created before it can be referenced. + strict_dependency = True + def __new__(cls, name, json, stack): '''Create a new Resource of the appropriate class for its type.''' @@ -55,7 +58,6 @@ class Resource(object): return ResourceClass(name, json, stack) def __init__(self, name, json_snippet, stack): - self.depends_on = [] self.references = [] self.stack = stack self.name = name @@ -82,6 +84,26 @@ class Resource(object): def parsed_template(self): return self.stack.resolve_runtime_data(self.t) + def __str__(self): + return '%s "%s"' % (self.__class__.__name__, self.name) + + def _add_dependencies(self, deps, fragment): + if isinstance(fragment, dict): + for key, value in fragment.items(): + if key in ('DependsOn', 'Ref'): + target = self.stack.resources[value] + if key == 'DependsOn' or target.strict_dependency: + deps += (self, target) + elif key != 'Fn::GetAtt': + self._add_dependencies(deps, value) + elif isinstance(fragment, list): + for item in fragment: + self._add_dependencies(deps, item) + + def add_dependencies(self, deps): + self._add_dependencies(deps, self.t) + deps += (self, None) + def keystone(self): if self._keystone: return self._keystone @@ -187,8 +209,8 @@ class Resource(object): def FnGetRefId(self): ''' - http://docs.amazonwebservices.com/AWSCloudFormation/latest/UserGuide/ \ - intrinsic-function-reference-ref.html + http://docs.amazonwebservices.com/AWSCloudFormation/latest/UserGuide/\ + intrinsic-function-reference-ref.html ''' if self.instance_id is not None: return unicode(self.instance_id) @@ -197,24 +219,18 @@ class Resource(object): def FnGetAtt(self, key): ''' - http://docs.amazonwebservices.com/AWSCloudFormation/latest/UserGuide/ \ + http://docs.amazonwebservices.com/AWSCloudFormation/latest/UserGuide/\ intrinsic-function-reference-getatt.html ''' return unicode(self.name) def FnBase64(self, data): ''' - http://docs.amazonwebservices.com/AWSCloudFormation/latest/UserGuide/ \ + http://docs.amazonwebservices.com/AWSCloudFormation/latest/UserGuide/\ intrinsic-function-reference-base64.html ''' return base64.b64encode(data) - def strict_dependency(self): - ''' - If True, this resource must be created before it can be referenced. - ''' - return True - class GenericResource(Resource): properties_schema = {} diff --git a/heat/tests/test_dependencies.py b/heat/tests/test_dependencies.py new file mode 100644 index 0000000000..ddfd996687 --- /dev/null +++ b/heat/tests/test_dependencies.py @@ -0,0 +1,186 @@ +import nose +import unittest +from nose.plugins.attrib import attr + +from heat.engine.dependencies import Dependencies +from heat.engine.dependencies import CircularDependencyException + + +@attr(tag=['unit', 'dependencies']) +@attr(speed='fast') +class dependenciesTest(unittest.TestCase): + + def _dep_test(self, func, checkorder, deps): + nodes = set.union(*[set(e) for e in deps]) + + d = Dependencies(deps) + order = list(func(d)) + + for n in nodes: + self.assertTrue(n in order, '"%s" is not in the sequence' % n) + self.assertEqual(order.count(n), 1) + + self.assertEqual(len(order), len(nodes)) + + for l, f in deps: + checkorder(order.index(f), order.index(l)) + + def _dep_test_fwd(self, *deps): + def assertLess(a, b): + self.assertTrue(a < b, + '"%s" is not less than "%s"' % (str(a), str(b))) + self._dep_test(iter, assertLess, deps) + + def _dep_test_rev(self, *deps): + def assertGreater(a, b): + self.assertTrue(a > b, + '"%s" is not greater than "%s"' % (str(a), str(b))) + self._dep_test(reversed, assertGreater, deps) + + def test_single_node(self): + d = Dependencies([('only', None)]) + l = list(iter(d)) + self.assertEqual(len(l), 1) + self.assertEqual(l[0], 'only') + + def test_disjoint(self): + d = Dependencies([('1', None), ('2', None)]) + l = list(iter(d)) + self.assertEqual(len(l), 2) + self.assertTrue('1' in l) + self.assertTrue('2' in l) + + def test_single_fwd(self): + self._dep_test_fwd(('second', 'first')) + + def test_single_rev(self): + self._dep_test_rev(('second', 'first')) + + def test_chain_fwd(self): + self._dep_test_fwd(('third', 'second'), ('second', 'first')) + + def test_chain_rev(self): + self._dep_test_rev(('third', 'second'), ('second', 'first')) + + def test_diamond_fwd(self): + self._dep_test_fwd(('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'first'), ('mid2', 'first')) + + def test_diamond_rev(self): + self._dep_test_rev(('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'first'), ('mid2', 'first')) + + def test_complex_fwd(self): + self._dep_test_fwd(('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'mid3'), ('mid1', 'first'), + ('mid3', 'first'), ('mid2', 'first')) + + def test_complex_rev(self): + self._dep_test_rev(('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'mid3'), ('mid1', 'first'), + ('mid3', 'first'), ('mid2', 'first')) + + def test_many_edges_fwd(self): + self._dep_test_fwd(('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'e2'), ('mid1', 'mid3'), + ('mid2', 'mid3'), + ('mid3', 'e3')) + + def test_many_edges_rev(self): + self._dep_test_rev(('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'e2'), ('mid1', 'mid3'), + ('mid2', 'mid3'), + ('mid3', 'e3')) + + def test_dbldiamond_fwd(self): + self._dep_test_fwd(('last', 'a1'), ('last', 'a2'), + ('a1', 'b1'), ('a2', 'b1'), ('a2', 'b2'), + ('b1', 'first'), ('b2', 'first')) + + def test_dbldiamond_rev(self): + self._dep_test_rev(('last', 'a1'), ('last', 'a2'), + ('a1', 'b1'), ('a2', 'b1'), ('a2', 'b2'), + ('b1', 'first'), ('b2', 'first')) + + def test_circular_fwd(self): + d = Dependencies([('first', 'second'), + ('second', 'third'), + ('third', 'first')]) + self.assertRaises(CircularDependencyException, list, iter(d)) + + def test_circular_rev(self): + d = Dependencies([('first', 'second'), + ('second', 'third'), + ('third', 'first')]) + self.assertRaises(CircularDependencyException, list, reversed(d)) + + def test_self_ref(self): + d = Dependencies([('node', 'node')]) + self.assertRaises(CircularDependencyException, list, iter(d)) + + def test_complex_circular_fwd(self): + d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'e2'), ('mid1', 'mid3'), + ('mid2', 'mid3'), + ('mid3', 'e3'), + ('e3', 'mid1')]) + self.assertRaises(CircularDependencyException, list, iter(d)) + + def test_complex_circular_rev(self): + d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'e2'), ('mid1', 'mid3'), + ('mid2', 'mid3'), + ('mid3', 'e3'), + ('e3', 'mid1')]) + self.assertRaises(CircularDependencyException, list, reversed(d)) + + def test_noexist_partial(self): + d = Dependencies([('foo', 'bar')]) + get = lambda i: d[i] + self.assertRaises(KeyError, get, 'baz') + + def test_single_partial(self): + d = Dependencies([('last', 'first')]) + p = d['last'] + l = list(iter(p)) + self.assertEqual(len(l), 1) + self.assertEqual(l[0], 'last') + + def test_simple_partial(self): + d = Dependencies([('last', 'middle'), ('middle', 'first')]) + p = d['middle'] + order = list(iter(p)) + self.assertEqual(len(order), 2) + for n in ('last', 'middle'): + self.assertTrue(n in order, + "'%s' not found in dependency order" % n) + self.assertTrue(order.index('last') > order.index('middle')) + + def test_simple_multilevel_partial(self): + d = Dependencies([('last', 'middle'), + ('middle', 'target'), + ('target', 'first')]) + p = d['target'] + order = list(iter(p)) + self.assertEqual(len(order), 3) + for n in ('last', 'middle', 'target'): + self.assertTrue(n in order, + "'%s' not found in dependency order" % n) + + def test_complex_partial(self): + d = Dependencies([('last', 'e1'), ('last', 'mid1'), ('last', 'mid2'), + ('mid1', 'e2'), ('mid1', 'mid3'), + ('mid2', 'mid3'), + ('mid3', 'e3')]) + p = d['mid3'] + order = list(iter(p)) + self.assertEqual(len(order), 4) + for n in ('last', 'mid1', 'mid2', 'mid3'): + self.assertTrue(n in order, + "'%s' not found in dependency order" % n) + + +# allows testing of the test directly, shown below +if __name__ == '__main__': + sys.argv.append(__file__) + nose.main()