Migrate stacks from legacy to convergence engine

Run `heat-manage migrate-convergence-1 <stack_id>` to migrate
legacy stack to convergence engine.

Heat engine is used for doing migration i.e. migration can't
be done offline.

Change-Id: Ie7c2498b37937438f16d154b154b3a6ecbf9ff74
Implements-bp: convergence-migrate-stack
This commit is contained in:
Oleksii Chuprykov 2016-02-16 19:18:53 +02:00 committed by Peter Razumovsky
parent 110cf140b1
commit 68944d2230
10 changed files with 188 additions and 8 deletions

View File

@ -22,11 +22,14 @@ from oslo_log import log
from six import moves
from heat.common import context
from heat.common import exception
from heat.common.i18n import _
from heat.common import messaging
from heat.common import service_utils
from heat.db import api as db_api
from heat.db import utils
from heat.objects import service as service_objects
from heat.rpc import client as rpc_client
from heat import version
@ -111,6 +114,21 @@ def do_reset_stack_status():
db_api.reset_stack_status(ctxt, CONF.command.stack_id)
def do_migrate():
messaging.setup()
client = rpc_client.EngineClient()
ctxt = context.get_admin_context()
try:
client.migrate_convergence_1(ctxt, CONF.command.stack_id)
except exception.NotFound:
raise Exception(_("Stack with id %s can not be found.")
% CONF.command.stack_id)
except exception.ActionInProgress:
raise Exception(_("The stack or some of its nested stacks are "
"in progress. Note, that all the stacks should be "
"in COMPLETE state in order to be migrated."))
def purge_deleted():
"""Remove database records that have been previously soft deleted."""
utils.purge_deleted(CONF.command.age,
@ -141,6 +159,11 @@ def add_command_parsers(subparsers):
# positional parameter, can be skipped. default=None
parser.add_argument('version', nargs='?')
# migrate-stacks parser
parser = subparsers.add_parser('migrate-convergence-1')
parser.set_defaults(func=do_migrate)
parser.add_argument('stack_id')
# purge_deleted parser
parser = subparsers.add_parser('purge_deleted')
parser.set_defaults(func=purge_deleted)

View File

@ -188,6 +188,10 @@ def stack_get_all_by_owner_id(context, owner_id):
return IMPL.stack_get_all_by_owner_id(context, owner_id)
def stack_get_all_by_root_owner_id(context, owner_id):
return IMPL.stack_get_all_by_root_owner_id(context, owner_id)
def stack_count_all(context, filters=None,
show_deleted=False, show_nested=False, show_hidden=False,
tags=None, tags_any=None, not_tags=None,

View File

@ -224,7 +224,7 @@ def resource_purge_deleted(context, stack_id):
def resource_update(context, resource_id, values, atomic_key,
expected_engine_id=None):
session = context.session
with session.begin():
with session.begin(subtransactions=True):
if atomic_key is None:
values['atomic_key'] = 1
else:
@ -469,6 +469,13 @@ def stack_get_all_by_owner_id(context, owner_id):
return results
def stack_get_all_by_root_owner_id(context, owner_id):
for stack in stack_get_all_by_owner_id(context, owner_id):
yield stack
for ch_st in stack_get_all_by_root_owner_id(context, stack.id):
yield ch_st
def _get_sort_keys(sort_keys, mapping):
"""Returns an array containing only whitelisted keys
@ -627,7 +634,7 @@ def stack_update(context, stack_id, values, exp_trvsl=None):
session = context.session
with session.begin():
with session.begin(subtransactions=True):
rows_updated = (session.query(models.Stack)
.filter(models.Stack.id == stack.id)
.filter(models.Stack.current_traversal

View File

@ -299,7 +299,7 @@ class EngineService(service.Service):
by the RPC caller.
"""
RPC_API_VERSION = '1.33'
RPC_API_VERSION = '1.34'
def __init__(self, host, topic):
super(EngineService, self).__init__()
@ -2221,6 +2221,41 @@ class EngineService(service.Service):
for srv in service_objects.Service.get_all(cnxt)]
return result
@context.request_context
def migrate_convergence_1(self, ctxt, stack_id):
parent_stack = parser.Stack.load(ctxt,
stack_id=stack_id,
show_deleted=False)
if parent_stack.convergence:
LOG.info(_LI("Convergence was already enabled for stack %s"),
stack_id)
return
db_stacks = stack_object.Stack.get_all_by_root_owner_id(
ctxt, parent_stack.id)
stacks = [parser.Stack.load(ctxt, stack_id=st.id,
stack=st) for st in db_stacks]
stacks.append(parent_stack)
locks = []
try:
for st in stacks:
lock = stack_lock.StackLock(ctxt, st.id, self.engine_id)
lock.acquire()
locks.append(lock)
sess = ctxt.session
sess.begin(subtransactions=True)
try:
for st in stacks:
if not st.convergence:
st.service_check_defer = True
st.migrate_to_convergence()
sess.commit()
except Exception:
sess.rollback()
raise
finally:
for lock in locks:
lock.release()
def service_manage_report(self):
cnxt = context.get_admin_context()

View File

@ -603,7 +603,8 @@ class Stack(collections.Mapping):
return stack
@profiler.trace('Stack.store', hide_args=False)
def store(self, backup=False, exp_trvsl=None):
def store(self, backup=False, exp_trvsl=None,
ignore_traversal_check=False):
"""Store the stack in the database and return its ID.
If self.id is set, we update the existing stack.
@ -619,7 +620,7 @@ class Stack(collections.Mapping):
s['raw_template_id'] = self.t.id
if self.id:
if exp_trvsl is None:
if exp_trvsl is None and not ignore_traversal_check:
exp_trvsl = self.current_traversal
if self.convergence:
@ -1352,6 +1353,16 @@ class Stack(collections.Mapping):
rsrcs[existing_rsrc_db.name] = existing_rsrc_db
return rsrcs
def set_resource_deps(self):
curr_name_translated_dep = self.dependencies.translate(lambda res:
res.id)
ext_rsrcs_db = self.db_active_resources_get()
for r in self.dependencies:
r.needed_by = list(curr_name_translated_dep.required_by(r.id))
r.requires = list(curr_name_translated_dep.requires(r.id))
resource.Resource.set_needed_by(ext_rsrcs_db[r.id], r.needed_by)
resource.Resource.set_requires(ext_rsrcs_db[r.id], r.requires)
def _compute_convg_dependencies(self, existing_resources,
current_template_deps, current_resources):
def make_graph_key(rsrc):
@ -2064,3 +2075,19 @@ class Stack(collections.Mapping):
return self.time_elapsed() > self.timeout_secs()
return False
def migrate_to_convergence(self):
values = {'current_template_id': self.t.id}
db_rsrcs = self.db_active_resources_get()
if db_rsrcs is not None:
for res in db_rsrcs.values():
res.update_and_save(values=values)
self.set_resource_deps()
self.current_traversal = uuidutils.generate_uuid()
self.convergence = True
prev_raw_template_id = self.prev_raw_template_id
self.prev_raw_template_id = None
self.store(ignore_traversal_check=True)
if prev_raw_template_id:
raw_template_object.RawTemplate.delete(self.context,
prev_raw_template_id)

View File

@ -15,10 +15,9 @@
"""Stack object."""
import six
from oslo_versionedobjects import base
from oslo_versionedobjects import fields
import six
from heat.common import exception
from heat.common.i18n import _
@ -145,6 +144,16 @@ class Stack(
except exception.NotFound:
pass
@classmethod
def get_all_by_root_owner_id(cls, context, root_owner_id):
db_stacks = db_api.stack_get_all_by_root_owner_id(context,
root_owner_id)
for db_stack in db_stacks:
try:
yield cls._from_db_object(context, cls(context), db_stack)
except exception.NotFound:
pass
@classmethod
def count_all(cls, context, **kwargs):
return db_api.stack_count_all(context, **kwargs)

View File

@ -56,6 +56,7 @@ class EngineClient(object):
1.32 - Add get_files call
1.33 - Remove tenant_safe from list_stacks, count_stacks
and list_software_configs
1.34 - Add migrate_convergence_1 call
"""
BASE_RPC_API_VERSION = '1.0'
@ -846,3 +847,14 @@ class EngineClient(object):
self.make_msg('export_stack',
stack_identity=stack_identity),
version='1.22')
def migrate_convergence_1(self, ctxt, stack_id):
"""Migrate the stack to convergence engine
:param ctxt: RPC context
:param stack_name: Name of the stack you want to migrate
"""
return self.call(ctxt,
self.make_msg('migrate_convergence_1',
stack_id=stack_id),
version='1.34')

View File

@ -1881,6 +1881,36 @@ class DBAPIStackTest(common.HeatTestCase):
parent_stack2.id)
self.assertEqual(2, len(stack2_children))
def test_stack_get_all_by_root_owner_id(self):
parent_stack1 = create_stack(self.ctx, self.template, self.user_creds)
parent_stack2 = create_stack(self.ctx, self.template, self.user_creds)
for i in range(3):
lvl1_st = create_stack(self.ctx, self.template, self.user_creds,
owner_id=parent_stack1.id)
for j in range(2):
create_stack(self.ctx, self.template, self.user_creds,
owner_id=lvl1_st.id)
for i in range(2):
lvl1_st = create_stack(self.ctx, self.template, self.user_creds,
owner_id=parent_stack2.id)
for j in range(4):
lvl2_st = create_stack(self.ctx, self.template,
self.user_creds, owner_id=lvl1_st.id)
for k in range(3):
create_stack(self.ctx, self.template,
self.user_creds, owner_id=lvl2_st.id)
stack1_children = db_api.stack_get_all_by_root_owner_id(
self.ctx,
parent_stack1.id)
# 3 stacks on the first level + 6 stack on the second
self.assertEqual(9, len(list(stack1_children)))
stack2_children = db_api.stack_get_all_by_root_owner_id(
self.ctx,
parent_stack2.id)
# 2 + 8 + 24
self.assertEqual(34, len(list(stack2_children)))
def test_stack_get_all_with_regular_tenant(self):
values = [
{'tenant': UUID1},

View File

@ -40,7 +40,7 @@ class ServiceEngineTest(common.HeatTestCase):
def test_make_sure_rpc_version(self):
self.assertEqual(
'1.33',
'1.34',
service.EngineService.RPC_API_VERSION,
('RPC version is changed, please update this test to new version '
'and make sure additional test cases are added for RPC APIs '

View File

@ -873,3 +873,36 @@ class TestConvgComputeDependencies(common.HeatTestCase):
'((4, False), (3, False)), '
'((5, False), (3, False))])',
repr(self.stack._convg_deps))
class TestConvergenceMigration(common.HeatTestCase):
def test_migration_to_convergence_engine(self):
self.ctx = utils.dummy_context()
self.stack = tools.get_stack('test_stack_convg', self.ctx,
template=tools.string_template_five)
self.stack.store()
for r in self.stack.resources.values():
r._store()
self.stack.migrate_to_convergence()
self.stack = self.stack.load(self.ctx, self.stack.id)
self.assertTrue(self.stack.convergence)
self.assertIsNone(self.stack.prev_raw_template_id)
exp_required_by = {'A': ['C'], 'B': ['C'], 'C': ['D', 'E'],
'D': [], 'E': []}
exp_requires = {'A': [], 'B': [], 'C': ['A', 'B'], 'D': ['C'],
'E': ['C']}
exp_tmpl_id = self.stack.t.id
def id_to_name(ids):
names = []
for r in self.stack.resources.values():
if r.id in ids:
names.append(r.name)
return names
for r in self.stack.resources.values():
self.assertEqual(sorted(exp_required_by[r.name]),
sorted(r.required_by()))
self.assertEqual(sorted(exp_requires[r.name]),
sorted(id_to_name(r.requires)))
self.assertEqual(exp_tmpl_id, r.current_template_id)