Merge "Add heat-manage subcommand to migrate legacy prop. data"

This commit is contained in:
Jenkins 2017-02-07 01:16:52 +00:00 committed by Gerrit Code Review
commit 467c542097
4 changed files with 176 additions and 7 deletions

View File

@ -22,8 +22,9 @@ The standard pattern for executing a heat-manage command is:
Run with -h to see a list of available commands:
``heat-manage -h``
Commands are ``db_version``, ``db_sync``, ``purge_deleted``, ``migrate_covergence_1``
and ``service``. Detailed descriptions are below.
Commands are ``db_version``, ``db_sync``, ``purge_deleted``,
``migrate_covergence_1``, ``migrate_properties_data``, and
``service``. Detailed descriptions are below.
``heat-manage db_version``
@ -38,6 +39,12 @@ and ``service``. Detailed descriptions are below.
Purge db entries marked as deleted and older than [age]. When project_id
argument is provided, only entries belonging to this project will be purged.
``heat-manage migrate_properties_data``
Migrates properties data from the legacy locations in the db
(resource.properties_data and event.resource_properties) to the
modern location, the resource_properties_data table.
``heat-manage migrate_convergence_1 [stack_id]``
Migrates [stack_id] from non-convergence to convergence. This requires running

View File

@ -148,6 +148,11 @@ def do_crypt_parameters_and_properties():
ctxt, prev_encryption_key, CONF.command.verbose_update_params)
def do_properties_data_migrate():
ctxt = context.get_admin_context()
db_api.db_properties_data_migrate(ctxt)
def add_command_parsers(subparsers):
# db_version parser
parser = subparsers.add_parser('db_version')
@ -215,6 +220,10 @@ def add_command_parsers(subparsers):
parser.add_argument('stack_id',
help=_('Stack id'))
# migrate properties_data parser
parser = subparsers.add_parser('migrate_properties_data')
parser.set_defaults(func=do_properties_data_migrate)
ServiceManageCommand.add_service_parsers(subparsers)
command_opt = cfg.SubCommandOpt('command',

View File

@ -37,6 +37,7 @@ from heat.common import exception
from heat.common.i18n import _
from heat.common.i18n import _LE
from heat.common.i18n import _LI
from heat.common.i18n import _LW
from heat.db.sqlalchemy import filters as db_filters
from heat.db.sqlalchemy import migration
from heat.db.sqlalchemy import models
@ -1692,6 +1693,73 @@ def db_decrypt_parameters_and_properties(ctxt, encryption_key, batch_size=50,
return excs
def db_properties_data_migrate(ctxt, batch_size=50):
"""Migrate properties data from legacy columns to new location in db.
:param ctxt: RPC context
:param batch_size: number of templates requested from db in each iteration.
50 means that heat requests 50 templates, encrypt them
and proceed with next 50 items.
"""
session = ctxt.session
query = session.query(models.Resource).filter(and_(
models.Resource.properties_data.isnot(None),
models.Resource.rsrc_prop_data_id.is_(None)))
resource_batches = _get_batch(
session=session, ctxt=ctxt, query=query,
model=models.Resource, batch_size=batch_size)
next_batch = list(itertools.islice(resource_batches, batch_size))
while next_batch:
with session.begin():
for resource in next_batch:
try:
encrypted = resource.properties_data_encrypted
if encrypted is None:
LOG.warning(
_LW('Unexpected: resource.encrypted is None for '
'resource id %(id)d for legacy '
'resource.properties_data, assuming False.'),
{'id': resource.id})
encrypted = False
rsrc_prop_data = resource_prop_data_create(
ctxt, {'encrypted': encrypted,
'data': resource.properties_data})
resource_update(ctxt, resource.id,
{'properties_data_encrypted': None,
'properties_data': None,
'rsrc_prop_data_id': rsrc_prop_data.id},
resource.atomic_key)
except Exception:
LOG.exception(_LE('Failed to migrate properties_data for '
'resource %(id)d'), {'id': resource.id})
continue
next_batch = list(itertools.islice(resource_batches, batch_size))
query = session.query(models.Event).filter(and_(
models.Event.resource_properties.isnot(None),
models.Event.rsrc_prop_data_id.is_(None)))
event_batches = _get_batch(
session=session, ctxt=ctxt, query=query,
model=models.Event, batch_size=batch_size)
next_batch = list(itertools.islice(event_batches, batch_size))
while next_batch:
with session.begin():
for event in next_batch:
try:
prop_data = event.resource_properties
rsrc_prop_data = resource_prop_data_create(
ctxt, {'encrypted': False,
'data': prop_data})
event.update({'resource_properties': None,
'rsrc_prop_data_id': rsrc_prop_data.id})
except Exception:
LOG.exception(_LE('Failed to migrate resource_properties '
'for event %(id)d'), {'id': event.id})
continue
next_batch = list(itertools.islice(event_batches, batch_size))
def _get_batch(session, ctxt, query, model, batch_size=50):
last_batch_marker = None
while True:

View File

@ -1439,9 +1439,11 @@ def create_resource_prop_data(ctx, **kwargs):
return db_api.resource_prop_data_create(ctx, **values)
def create_event(ctx, **kwargs):
rpd = db_api.resource_prop_data_create(ctx, {'data': {'name': 'foo'},
'encrypted': False})
def create_event(ctx, legacy_prop_data=False, **kwargs):
if not legacy_prop_data:
rpd = db_api.resource_prop_data_create(ctx,
{'data': {'foo2': 'ev_bar'},
'encrypted': False})
values = {
'stack_id': 'test_stack_id',
'resource_action': 'create',
@ -1449,8 +1451,11 @@ def create_event(ctx, **kwargs):
'resource_name': 'res',
'physical_resource_id': UUID1,
'resource_status_reason': "create_complete",
'rsrc_prop_data': rpd,
}
if not legacy_prop_data:
values['rsrc_prop_data'] = rpd
else:
values['resource_properties'] = {'foo2': 'ev_bar'}
values.update(kwargs)
return db_api.event_create(ctx, values)
@ -2721,7 +2726,7 @@ class DBAPIEventTest(common.HeatTestCase):
self.assertEqual('res', ret_event.resource_name)
self.assertEqual(UUID1, ret_event.physical_resource_id)
self.assertEqual('create_complete', ret_event.resource_status_reason)
self.assertEqual({'name': 'foo'}, ret_event.rsrc_prop_data.data)
self.assertEqual({'foo2': 'ev_bar'}, ret_event.rsrc_prop_data.data)
def test_event_get_all(self):
self.stack1 = create_stack(self.ctx, self.template, self.user_creds,
@ -3294,6 +3299,86 @@ class DBAPISyncPointTest(common.HeatTestCase):
self.assertEqual(len(self.resources) * 4, add.call_count)
class DBAPIMigratePropertiesDataTest(common.HeatTestCase):
def setUp(self):
super(DBAPIMigratePropertiesDataTest, self).setUp()
self.ctx = utils.dummy_context()
templ = create_raw_template(self.ctx)
user_creds = create_user_creds(self.ctx)
stack = create_stack(self.ctx, templ, user_creds)
stack2 = create_stack(self.ctx, templ, user_creds)
create_resource(self.ctx, stack, True, name='res1')
create_resource(self.ctx, stack2, True, name='res2')
create_event(self.ctx, True)
create_event(self.ctx, True)
def _test_migrate_resource(self, batch_size=50):
resources = self.ctx.session.query(models.Resource).all()
self.assertEqual(2, len(resources))
for resource in resources:
self.assertEqual('bar1', resource.properties_data['foo1'])
db_api.db_properties_data_migrate(self.ctx, batch_size=batch_size)
for resource in resources:
self.assertEqual('bar1', resource.rsrc_prop_data.data['foo1'])
self.assertFalse(resource.rsrc_prop_data.encrypted)
self.assertIsNone(resource.properties_data)
self.assertIsNone(resource.properties_data_encrypted)
def _test_migrate_event(self, batch_size=50):
events = self.ctx.session.query(models.Event).all()
self.assertEqual(2, len(events))
for event in events:
self.assertEqual('ev_bar', event.resource_properties['foo2'])
db_api.db_properties_data_migrate(self.ctx, batch_size=batch_size)
self.ctx.session.expire_all()
events = self.ctx.session.query(models.Event).all()
for event in events:
self.assertEqual('ev_bar', event.rsrc_prop_data.data['foo2'])
self.assertFalse(event.rsrc_prop_data.encrypted)
self.assertIsNone(event.resource_properties)
def test_migrate_event(self):
self._test_migrate_event()
def test_migrate_event_in_batches(self):
self._test_migrate_event(batch_size=1)
def test_migrate_resource(self):
self._test_migrate_resource()
def test_migrate_resource_in_batches(self):
self._test_migrate_resource(batch_size=1)
def test_migrate_encrypted_resource(self):
resources = self.ctx.session.query(models.Resource).all()
db_api.db_encrypt_parameters_and_properties(
self.ctx, 'i have a key for you if you want')
encrypted_data_pre_migration = resources[0].properties_data['foo1'][1]
db_api.db_properties_data_migrate(self.ctx)
resources = self.ctx.session.query(models.Resource).all()
self.assertTrue(resources[0].rsrc_prop_data.encrypted)
self.assertIsNone(resources[0].properties_data)
self.assertIsNone(resources[0].properties_data_encrypted)
self.assertEqual('cryptography_decrypt_v1',
resources[0].rsrc_prop_data.data['foo1'][0])
self.assertEqual(encrypted_data_pre_migration,
resources[0].rsrc_prop_data.data['foo1'][1])
db_api.db_decrypt_parameters_and_properties(
self.ctx, 'i have a key for you if you want')
self.ctx.session.expire_all()
resources = self.ctx.session.query(models.Resource).all()
self.assertEqual('bar1', resources[0].rsrc_prop_data.data['foo1'])
self.assertFalse(resources[0].rsrc_prop_data.encrypted)
self.assertIsNone(resources[0].properties_data)
self.assertIsNone(resources[0].properties_data_encrypted)
class DBAPICryptParamsPropsTest(common.HeatTestCase):
def setUp(self):
super(DBAPICryptParamsPropsTest, self).setUp()