From 8b732241df6007c3005b14413ee1fe047eb4d108 Mon Sep 17 00:00:00 2001 From: Zane Bitter Date: Fri, 30 Aug 2013 20:50:11 +0200 Subject: [PATCH] Do updates based on a single dependency list This ensures that updates proceed in the correct order with maximum parallelism, for arbitrarily complex dependency graphs. Change-Id: Ia11f4cfef58a3045199e1e5e49050cb1646f5057 --- heat/engine/update.py | 119 +++++++++++++++++++++++++----------------- 1 file changed, 72 insertions(+), 47 deletions(-) diff --git a/heat/engine/update.py b/heat/engine/update.py index 2b1ac7b214..7c8821f3d0 100644 --- a/heat/engine/update.py +++ b/heat/engine/update.py @@ -15,6 +15,7 @@ from heat.db import api as db_api +from heat.engine import dependencies from heat.engine import resource from heat.engine import scheduler @@ -50,32 +51,29 @@ class StackUpdate(object): def __call__(self): """Return a co-routine that updates the stack.""" - existing_deps = self.existing_stack.dependencies - new_deps = self.new_stack.dependencies - cleanup_prev = scheduler.DependencyTaskGroup( self.previous_stack.dependencies, self._remove_backup_resource, reverse=True) - cleanup = scheduler.DependencyTaskGroup(existing_deps, - self._remove_old_resource, - reverse=True) - create_new = scheduler.DependencyTaskGroup(new_deps, - self._create_new_resource) - update = scheduler.DependencyTaskGroup(new_deps, - self._update_resource) + + update = scheduler.DependencyTaskGroup(self.dependencies(), + self._resource_update) if not self.rollback: yield cleanup_prev() - yield create_new() try: yield update() finally: prev_deps = self.previous_stack._get_dependencies( self.previous_stack.resources.itervalues()) self.previous_stack.dependencies = prev_deps - yield cleanup() + + def _resource_update(self, res): + if res.name in self.new_stack and self.new_stack[res.name] is res: + return self._process_new_resource_update(res) + else: + return self._process_existing_resource_update(res) @scheduler.wrappertask def _remove_backup_resource(self, prev_res): @@ -84,27 +82,6 @@ class StackUpdate(object): logger.debug("Deleting backup resource %s" % prev_res.name) yield prev_res.destroy() - @scheduler.wrappertask - def _remove_old_resource(self, existing_res): - res_name = existing_res.name - - if res_name in self.previous_stack: - yield self._remove_backup_resource(self.previous_stack[res_name]) - - if res_name not in self.new_stack: - logger.debug("resource %s not found in updated stack" - % res_name + " definition, deleting") - yield existing_res.destroy() - del self.existing_stack.resources[res_name] - - @scheduler.wrappertask - def _create_new_resource(self, new_res): - res_name = new_res.name - if res_name not in self.existing_stack: - logger.debug("resource %s not found in current stack" - % res_name + " definition, adding") - yield self._create_resource(new_res) - @staticmethod def _exchange_stacks(existing_res, prev_res): db_api.resource_exchange_stacks(existing_res.stack.context, @@ -147,24 +124,72 @@ class StackUpdate(object): yield new_res.create() @scheduler.wrappertask - def _update_resource(self, new_res): + def _process_new_resource_update(self, new_res): res_name = new_res.name - if res_name not in self.existing_snippets: - return - - # Compare resolved pre/post update resource snippets, - # note the new resource snippet is resolved in the context - # of the existing stack (which is the stack being updated) - existing_snippet = self.existing_snippets[res_name] - new_snippet = self.existing_stack.resolve_runtime_data(new_res.t) - - if new_snippet != existing_snippet: + if res_name in self.existing_stack: + existing_res = self.existing_stack[res_name] try: - yield self.existing_stack[res_name].update(new_snippet, - existing_snippet) + yield self._update_in_place(existing_res, + new_res) except resource.UpdateReplace: - yield self._create_resource(new_res) + pass else: logger.info("Resource %s for stack %s updated" % (res_name, self.existing_stack.name)) + return + + yield self._create_resource(new_res) + + @scheduler.wrappertask + def _update_in_place(self, existing_res, new_res): + # Compare resolved pre/post update resource snippets, + # note the new resource snippet is resolved in the context + # of the existing stack (which is the stack being updated) + existing_snippet = self.existing_snippets[existing_res.name] + new_snippet = self.existing_stack.resolve_runtime_data(new_res.t) + + if new_snippet != existing_snippet: + yield existing_res.update(new_snippet, existing_snippet) + + @scheduler.wrappertask + def _process_existing_resource_update(self, existing_res): + res_name = existing_res.name + + if res_name in self.previous_stack: + yield self._remove_backup_resource(self.previous_stack[res_name]) + + if res_name in self.new_stack: + new_res = self.new_stack[res_name] + if new_res.state == (new_res.INIT, new_res.COMPLETE): + # Already updated in-place + return + + if existing_res.stack is not self.previous_stack: + yield existing_res.destroy() + + if res_name not in self.new_stack: + del self.existing_stack.resources[res_name] + + def dependencies(self): + ''' + Return a Dependencies object representing the dependencies between + update operations to move from an existing stack definition to a new + one. + ''' + existing_deps = self.existing_stack.dependencies + new_deps = self.new_stack.dependencies + + def edges(): + # Create/update the new stack's resources in create order + for e in new_deps.graph().edges(): + yield e + # Destroy/cleanup the old stack's resources in delete order + for e in existing_deps.graph(reverse=True).edges(): + yield e + # Don't cleanup old resources until after they have been replaced + for res in self.existing_stack: + if res.name in self.new_stack: + yield (res, self.new_stack[res.name]) + + return dependencies.Dependencies(edges())