diff --git a/heat/cmd/manage.py b/heat/cmd/manage.py index 01b816c0f8..e7669b5834 100644 --- a/heat/cmd/manage.py +++ b/heat/cmd/manage.py @@ -22,11 +22,14 @@ from oslo_log import log from six import moves from heat.common import context +from heat.common import exception from heat.common.i18n import _ +from heat.common import messaging from heat.common import service_utils from heat.db import api as db_api from heat.db import utils from heat.objects import service as service_objects +from heat.rpc import client as rpc_client from heat import version @@ -111,6 +114,21 @@ def do_reset_stack_status(): db_api.reset_stack_status(ctxt, CONF.command.stack_id) +def do_migrate(): + messaging.setup() + client = rpc_client.EngineClient() + ctxt = context.get_admin_context() + try: + client.migrate_convergence_1(ctxt, CONF.command.stack_id) + except exception.NotFound: + raise Exception(_("Stack with id %s can not be found.") + % CONF.command.stack_id) + except exception.ActionInProgress: + raise Exception(_("The stack or some of its nested stacks are " + "in progress. Note, that all the stacks should be " + "in COMPLETE state in order to be migrated.")) + + def purge_deleted(): """Remove database records that have been previously soft deleted.""" utils.purge_deleted(CONF.command.age, @@ -141,6 +159,11 @@ def add_command_parsers(subparsers): # positional parameter, can be skipped. default=None parser.add_argument('version', nargs='?') + # migrate-stacks parser + parser = subparsers.add_parser('migrate-convergence-1') + parser.set_defaults(func=do_migrate) + parser.add_argument('stack_id') + # purge_deleted parser parser = subparsers.add_parser('purge_deleted') parser.set_defaults(func=purge_deleted) diff --git a/heat/db/api.py b/heat/db/api.py index 247214eb4c..dd5d0eedf3 100644 --- a/heat/db/api.py +++ b/heat/db/api.py @@ -188,6 +188,10 @@ def stack_get_all_by_owner_id(context, owner_id): return IMPL.stack_get_all_by_owner_id(context, owner_id) +def stack_get_all_by_root_owner_id(context, owner_id): + return IMPL.stack_get_all_by_root_owner_id(context, owner_id) + + def stack_count_all(context, filters=None, show_deleted=False, show_nested=False, show_hidden=False, tags=None, tags_any=None, not_tags=None, diff --git a/heat/db/sqlalchemy/api.py b/heat/db/sqlalchemy/api.py index 8c35c3a7e9..f170b464df 100644 --- a/heat/db/sqlalchemy/api.py +++ b/heat/db/sqlalchemy/api.py @@ -224,7 +224,7 @@ def resource_purge_deleted(context, stack_id): def resource_update(context, resource_id, values, atomic_key, expected_engine_id=None): session = context.session - with session.begin(): + with session.begin(subtransactions=True): if atomic_key is None: values['atomic_key'] = 1 else: @@ -469,6 +469,13 @@ def stack_get_all_by_owner_id(context, owner_id): return results +def stack_get_all_by_root_owner_id(context, owner_id): + for stack in stack_get_all_by_owner_id(context, owner_id): + yield stack + for ch_st in stack_get_all_by_root_owner_id(context, stack.id): + yield ch_st + + def _get_sort_keys(sort_keys, mapping): """Returns an array containing only whitelisted keys @@ -627,7 +634,7 @@ def stack_update(context, stack_id, values, exp_trvsl=None): session = context.session - with session.begin(): + with session.begin(subtransactions=True): rows_updated = (session.query(models.Stack) .filter(models.Stack.id == stack.id) .filter(models.Stack.current_traversal diff --git a/heat/engine/service.py b/heat/engine/service.py index 49788d4da5..d4d36f1c7c 100644 --- a/heat/engine/service.py +++ b/heat/engine/service.py @@ -299,7 +299,7 @@ class EngineService(service.Service): by the RPC caller. """ - RPC_API_VERSION = '1.33' + RPC_API_VERSION = '1.34' def __init__(self, host, topic): super(EngineService, self).__init__() @@ -2221,6 +2221,41 @@ class EngineService(service.Service): for srv in service_objects.Service.get_all(cnxt)] return result + @context.request_context + def migrate_convergence_1(self, ctxt, stack_id): + parent_stack = parser.Stack.load(ctxt, + stack_id=stack_id, + show_deleted=False) + if parent_stack.convergence: + LOG.info(_LI("Convergence was already enabled for stack %s"), + stack_id) + return + db_stacks = stack_object.Stack.get_all_by_root_owner_id( + ctxt, parent_stack.id) + stacks = [parser.Stack.load(ctxt, stack_id=st.id, + stack=st) for st in db_stacks] + stacks.append(parent_stack) + locks = [] + try: + for st in stacks: + lock = stack_lock.StackLock(ctxt, st.id, self.engine_id) + lock.acquire() + locks.append(lock) + sess = ctxt.session + sess.begin(subtransactions=True) + try: + for st in stacks: + if not st.convergence: + st.service_check_defer = True + st.migrate_to_convergence() + sess.commit() + except Exception: + sess.rollback() + raise + finally: + for lock in locks: + lock.release() + def service_manage_report(self): cnxt = context.get_admin_context() diff --git a/heat/engine/stack.py b/heat/engine/stack.py index b09465ad8f..a50f977138 100644 --- a/heat/engine/stack.py +++ b/heat/engine/stack.py @@ -603,7 +603,8 @@ class Stack(collections.Mapping): return stack @profiler.trace('Stack.store', hide_args=False) - def store(self, backup=False, exp_trvsl=None): + def store(self, backup=False, exp_trvsl=None, + ignore_traversal_check=False): """Store the stack in the database and return its ID. If self.id is set, we update the existing stack. @@ -619,7 +620,7 @@ class Stack(collections.Mapping): s['raw_template_id'] = self.t.id if self.id: - if exp_trvsl is None: + if exp_trvsl is None and not ignore_traversal_check: exp_trvsl = self.current_traversal if self.convergence: @@ -1352,6 +1353,16 @@ class Stack(collections.Mapping): rsrcs[existing_rsrc_db.name] = existing_rsrc_db return rsrcs + def set_resource_deps(self): + curr_name_translated_dep = self.dependencies.translate(lambda res: + res.id) + ext_rsrcs_db = self.db_active_resources_get() + for r in self.dependencies: + r.needed_by = list(curr_name_translated_dep.required_by(r.id)) + r.requires = list(curr_name_translated_dep.requires(r.id)) + resource.Resource.set_needed_by(ext_rsrcs_db[r.id], r.needed_by) + resource.Resource.set_requires(ext_rsrcs_db[r.id], r.requires) + def _compute_convg_dependencies(self, existing_resources, current_template_deps, current_resources): def make_graph_key(rsrc): @@ -2064,3 +2075,19 @@ class Stack(collections.Mapping): return self.time_elapsed() > self.timeout_secs() return False + + def migrate_to_convergence(self): + values = {'current_template_id': self.t.id} + db_rsrcs = self.db_active_resources_get() + if db_rsrcs is not None: + for res in db_rsrcs.values(): + res.update_and_save(values=values) + self.set_resource_deps() + self.current_traversal = uuidutils.generate_uuid() + self.convergence = True + prev_raw_template_id = self.prev_raw_template_id + self.prev_raw_template_id = None + self.store(ignore_traversal_check=True) + if prev_raw_template_id: + raw_template_object.RawTemplate.delete(self.context, + prev_raw_template_id) diff --git a/heat/objects/stack.py b/heat/objects/stack.py index ee19cf2b2e..65280fca88 100644 --- a/heat/objects/stack.py +++ b/heat/objects/stack.py @@ -15,10 +15,9 @@ """Stack object.""" -import six - from oslo_versionedobjects import base from oslo_versionedobjects import fields +import six from heat.common import exception from heat.common.i18n import _ @@ -145,6 +144,16 @@ class Stack( except exception.NotFound: pass + @classmethod + def get_all_by_root_owner_id(cls, context, root_owner_id): + db_stacks = db_api.stack_get_all_by_root_owner_id(context, + root_owner_id) + for db_stack in db_stacks: + try: + yield cls._from_db_object(context, cls(context), db_stack) + except exception.NotFound: + pass + @classmethod def count_all(cls, context, **kwargs): return db_api.stack_count_all(context, **kwargs) diff --git a/heat/rpc/client.py b/heat/rpc/client.py index 2ce1a6c584..7f539538d8 100644 --- a/heat/rpc/client.py +++ b/heat/rpc/client.py @@ -56,6 +56,7 @@ class EngineClient(object): 1.32 - Add get_files call 1.33 - Remove tenant_safe from list_stacks, count_stacks and list_software_configs + 1.34 - Add migrate_convergence_1 call """ BASE_RPC_API_VERSION = '1.0' @@ -846,3 +847,14 @@ class EngineClient(object): self.make_msg('export_stack', stack_identity=stack_identity), version='1.22') + + def migrate_convergence_1(self, ctxt, stack_id): + """Migrate the stack to convergence engine + + :param ctxt: RPC context + :param stack_name: Name of the stack you want to migrate + """ + return self.call(ctxt, + self.make_msg('migrate_convergence_1', + stack_id=stack_id), + version='1.34') diff --git a/heat/tests/db/test_sqlalchemy_api.py b/heat/tests/db/test_sqlalchemy_api.py index e388c4816b..9b9f7515cd 100644 --- a/heat/tests/db/test_sqlalchemy_api.py +++ b/heat/tests/db/test_sqlalchemy_api.py @@ -1881,6 +1881,36 @@ class DBAPIStackTest(common.HeatTestCase): parent_stack2.id) self.assertEqual(2, len(stack2_children)) + def test_stack_get_all_by_root_owner_id(self): + parent_stack1 = create_stack(self.ctx, self.template, self.user_creds) + parent_stack2 = create_stack(self.ctx, self.template, self.user_creds) + for i in range(3): + lvl1_st = create_stack(self.ctx, self.template, self.user_creds, + owner_id=parent_stack1.id) + for j in range(2): + create_stack(self.ctx, self.template, self.user_creds, + owner_id=lvl1_st.id) + for i in range(2): + lvl1_st = create_stack(self.ctx, self.template, self.user_creds, + owner_id=parent_stack2.id) + for j in range(4): + lvl2_st = create_stack(self.ctx, self.template, + self.user_creds, owner_id=lvl1_st.id) + for k in range(3): + create_stack(self.ctx, self.template, + self.user_creds, owner_id=lvl2_st.id) + + stack1_children = db_api.stack_get_all_by_root_owner_id( + self.ctx, + parent_stack1.id) + # 3 stacks on the first level + 6 stack on the second + self.assertEqual(9, len(list(stack1_children))) + stack2_children = db_api.stack_get_all_by_root_owner_id( + self.ctx, + parent_stack2.id) + # 2 + 8 + 24 + self.assertEqual(34, len(list(stack2_children))) + def test_stack_get_all_with_regular_tenant(self): values = [ {'tenant': UUID1}, diff --git a/heat/tests/engine/service/test_service_engine.py b/heat/tests/engine/service/test_service_engine.py index 0aa2d41b3b..858ab0891c 100644 --- a/heat/tests/engine/service/test_service_engine.py +++ b/heat/tests/engine/service/test_service_engine.py @@ -40,7 +40,7 @@ class ServiceEngineTest(common.HeatTestCase): def test_make_sure_rpc_version(self): self.assertEqual( - '1.33', + '1.34', service.EngineService.RPC_API_VERSION, ('RPC version is changed, please update this test to new version ' 'and make sure additional test cases are added for RPC APIs ' diff --git a/heat/tests/test_convg_stack.py b/heat/tests/test_convg_stack.py index 269acc04fa..3e2353da98 100644 --- a/heat/tests/test_convg_stack.py +++ b/heat/tests/test_convg_stack.py @@ -873,3 +873,36 @@ class TestConvgComputeDependencies(common.HeatTestCase): '((4, False), (3, False)), ' '((5, False), (3, False))])', repr(self.stack._convg_deps)) + + +class TestConvergenceMigration(common.HeatTestCase): + def test_migration_to_convergence_engine(self): + self.ctx = utils.dummy_context() + self.stack = tools.get_stack('test_stack_convg', self.ctx, + template=tools.string_template_five) + self.stack.store() + for r in self.stack.resources.values(): + r._store() + self.stack.migrate_to_convergence() + self.stack = self.stack.load(self.ctx, self.stack.id) + + self.assertTrue(self.stack.convergence) + self.assertIsNone(self.stack.prev_raw_template_id) + exp_required_by = {'A': ['C'], 'B': ['C'], 'C': ['D', 'E'], + 'D': [], 'E': []} + exp_requires = {'A': [], 'B': [], 'C': ['A', 'B'], 'D': ['C'], + 'E': ['C']} + exp_tmpl_id = self.stack.t.id + + def id_to_name(ids): + names = [] + for r in self.stack.resources.values(): + if r.id in ids: + names.append(r.name) + return names + for r in self.stack.resources.values(): + self.assertEqual(sorted(exp_required_by[r.name]), + sorted(r.required_by())) + self.assertEqual(sorted(exp_requires[r.name]), + sorted(id_to_name(r.requires))) + self.assertEqual(exp_tmpl_id, r.current_template_id)