Provide a way to upgrade metadata definitions

Currently there is no way to upgrade metadata definitions to the
newest set. This change extends existing command:
glance-manage db load_metadefs.

The extension allows user to merge metadata definitions that are
stored in files with data that exists in database. By default it
prefers existing data over new (--merge), but it can be combined
with other options to change this logic. Use --prefer_new flag so
it will prefer new data over existing data in database or
--overwrite so it will drop every namespace (and attached
resources) found in both, database and file.

By default glance-manage db load_metadefs works the same way it
worked before extension. To enable new logic user needs to
provide at least --merge option (or combine it with other two
options)

Implements: blueprint metadefs-upgrade-by-json-file
Change-Id: I55fa6640142db5110deb88d9ecd8507e7f533c58
This commit is contained in:
Pawel Koniszewski 2015-03-12 00:26:16 -04:00
parent 60a402e243
commit b55ae36fcf
4 changed files with 288 additions and 95 deletions

View File

@ -41,6 +41,7 @@ if os.path.exists(os.path.join(possible_topdir, 'glance', '__init__.py')):
from oslo_config import cfg
from oslo_db.sqlalchemy import migration
from oslo_utils import encodeutils
import six
from glance.common import config
from glance.common import exception
@ -114,12 +115,25 @@ class DbCommands(object):
db_migration.MIGRATE_REPO_PATH,
version)
@args('--path', metavar='<path>', help='Path to the directory where '
'json metadata files are stored')
def load_metadefs(self, path=None):
@args('--path', metavar='<path>', help='Path to the directory or file '
'where json metadata is stored')
@args('--merge', action='store_true',
help='Merge files with data that is in the database. By default it '
'prefers existing data over new. This logic can be changed by '
'combining --merge option with one of these two options: '
'--prefer_new or --overwrite.')
@args('--prefer_new', action='store_true',
help='Prefer new metadata over existing. Existing metadata '
'might be overwritten. Needs to be combined with --merge '
'option.')
@args('--overwrite', action='store_true',
help='Drop and rewrite metadata. Needs to be combined with --merge '
'option')
def load_metadefs(self, path=None, merge=False,
prefer_new=False, overwrite=False):
"""Load metadefinition json files to database"""
metadata.db_load_metadefs(db_api.get_engine(),
path)
metadata.db_load_metadefs(db_api.get_engine(), path, merge,
prefer_new, overwrite)
def unload_metadefs(self):
"""Unload metadefinitions from database"""
@ -156,8 +170,12 @@ class DbLegacyCommands(object):
self.command_object.sync(CONF.command.version,
CONF.command.current_version)
def load_metadefs(self, path=None):
self.command_object.load_metadefs(CONF.command.path)
def load_metadefs(self, path=None, merge=False,
prefer_new=False, overwrite=False):
self.command_object.load_metadefs(CONF.command.path,
CONF.command.merge,
CONF.command.prefer_new,
CONF.command.overwrite)
def unload_metadefs(self):
self.command_object.unload_metadefs()
@ -198,6 +216,9 @@ def add_legacy_command_parsers(command_object, subparsers):
parser = subparsers.add_parser('db_load_metadefs')
parser.set_defaults(action_fn=legacy_command_object.load_metadefs)
parser.add_argument('path', nargs='?')
parser.add_argument('merge', nargs='?')
parser.add_argument('prefer_new', nargs='?')
parser.add_argument('overwrite', nargs='?')
parser.set_defaults(action='db_load_metadefs')
parser = subparsers.add_parser('db_unload_metadefs')
@ -285,8 +306,9 @@ def main():
v = getattr(CONF.command, 'action_kwarg_' + k)
if v is None:
continue
func_kwargs[k] = encodeutils.safe_decode(v)
if isinstance(v, six.string_types):
v = encodeutils.safe_decode(v)
func_kwargs[k] = v
func_args = [encodeutils.safe_decode(arg)
for arg in CONF.command.action_args]
return CONF.command.action_fn(*func_args, **func_kwargs)

View File

@ -48,7 +48,10 @@ def get_backend():
def load_metadefs():
"""Read metadefinition files and insert data into the database"""
return get_backend().db_load_metadefs(engine=db_api.get_engine(),
metadata_path=None)
metadata_path=None,
merge=False,
prefer_new=False,
overwrite=False)
def unload_metadefs():

View File

@ -27,6 +27,7 @@ from oslo_config import cfg
from oslo_utils import timeutils
import six
import sqlalchemy
from sqlalchemy import and_
from sqlalchemy.schema import MetaData
from sqlalchemy.sql import select
@ -76,16 +77,24 @@ def get_metadef_tags_table(meta):
def _get_resource_type_id(meta, name):
resource_types_table = get_metadef_resource_types_table(meta)
return resource_types_table.select().\
where(resource_types_table.c.name == name).execute().fetchone().id
rt_table = get_metadef_resource_types_table(meta)
resource_type = (
select([rt_table.c.id]).
where(rt_table.c.name == name).
select_from(rt_table).
execute().fetchone())
if resource_type:
return resource_type[0]
return None
def _get_resource_type(meta, resource_type_id):
resource_types_table = get_metadef_resource_types_table(meta)
return resource_types_table.select().\
where(resource_types_table.c.id == resource_type_id).\
execute().fetchone()
rt_table = get_metadef_resource_types_table(meta)
return (
select([rt_table.c.id]).
where(rt_table.c.id == resource_type_id).
select_from(rt_table).
execute().fetchone())
def _get_namespace_resource_types(meta, namespace_id):
@ -118,31 +127,72 @@ def _get_tags(meta, namespace_id):
execute().fetchall())
def _populate_metadata(meta, metadata_path=None):
def _get_resource_id(table, namespace_id, resource_name):
resource = (
select([table.c.id]).
where(and_(table.c.namespace_id == namespace_id,
table.c.name == resource_name)).
select_from(table).
execute().fetchone())
if resource:
return resource[0]
return None
def _clear_metadata(meta):
metadef_tables = [get_metadef_properties_table(meta),
get_metadef_objects_table(meta),
get_metadef_tags_table(meta),
get_metadef_namespace_resource_types_table(meta),
get_metadef_namespaces_table(meta),
get_metadef_resource_types_table(meta)]
for table in metadef_tables:
table.delete().execute()
LOG.info(_LI("Table %s has been cleared"), table)
def _clear_namespace_metadata(meta, namespace_id):
metadef_tables = [get_metadef_properties_table(meta),
get_metadef_objects_table(meta),
get_metadef_tags_table(meta),
get_metadef_namespace_resource_types_table(meta)]
namespaces_table = get_metadef_namespaces_table(meta)
for table in metadef_tables:
table.delete().where(table.c.namespace_id == namespace_id).execute()
namespaces_table.delete().where(
namespaces_table.c.id == namespace_id).execute()
def _populate_metadata(meta, metadata_path=None, merge=False,
prefer_new=False, overwrite=False):
if not metadata_path:
metadata_path = CONF.metadata_source_path
try:
json_schema_files = [f for f in os.listdir(metadata_path)
if isfile(join(metadata_path, f))
and f.endswith('.json')]
if isfile(metadata_path):
json_schema_files = [metadata_path]
else:
json_schema_files = [f for f in os.listdir(metadata_path)
if isfile(join(metadata_path, f))
and f.endswith('.json')]
except OSError as e:
LOG.error(utils.exception_to_str(e))
return
metadef_namespaces_table = get_metadef_namespaces_table(meta)
metadef_namespace_resource_types_tables =\
get_metadef_namespace_resource_types_table(meta)
metadef_objects_table = get_metadef_objects_table(meta)
metadef_tags_table = get_metadef_tags_table(meta)
metadef_properties_table = get_metadef_properties_table(meta)
metadef_resource_types_table = get_metadef_resource_types_table(meta)
if not json_schema_files:
LOG.error(_LE("Json schema files not found in %s. Aborting."),
metadata_path)
return
namespaces_table = get_metadef_namespaces_table(meta)
namespace_rt_table = get_metadef_namespace_resource_types_table(meta)
objects_table = get_metadef_objects_table(meta)
tags_table = get_metadef_tags_table(meta)
properties_table = get_metadef_properties_table(meta)
resource_types_table = get_metadef_resource_types_table(meta)
for json_schema_file in json_schema_files:
try:
file = join(metadata_path, json_schema_file)
@ -158,104 +208,130 @@ def _populate_metadata(meta, metadata_path=None):
'description': metadata.get('description', None),
'visibility': metadata.get('visibility', None),
'protected': metadata.get('protected', None),
'owner': metadata.get('owner', 'admin'),
'created_at': timeutils.utcnow()
'owner': metadata.get('owner', 'admin')
}
temp = metadef_namespaces_table.select(
whereclause='namespace = \'%s\'' % values['namespace'])\
.execute().fetchone()
db_namespace = select(
[namespaces_table.c.id]
).where(
namespaces_table.c.namespace == values['namespace']
).select_from(
namespaces_table
).execute().fetchone()
if temp is not None:
LOG.info(_LI("Skipping namespace %s. It already exists in the "
if db_namespace and overwrite:
LOG.info(_LI("Overwriting namespace %s"), values['namespace'])
_clear_namespace_metadata(meta, db_namespace[0])
db_namespace = None
if not db_namespace:
values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(namespaces_table, values)
db_namespace = select(
[namespaces_table.c.id]
).where(
namespaces_table.c.namespace == values['namespace']
).select_from(
namespaces_table
).execute().fetchone()
elif not merge:
LOG.info(_LI("Skipping namespace %s. It already exists in the "
"database."), values['namespace'])
continue
elif prefer_new:
values.update({'updated_at': timeutils.utcnow()})
_update_data_in_db(namespaces_table, values,
namespaces_table.c.id, db_namespace[0])
_insert_data_to_db(metadef_namespaces_table, values)
db_namespace = select(
[metadef_namespaces_table.c.id]
).where(
metadef_namespaces_table.c.namespace == values['namespace']
).select_from(
metadef_namespaces_table
).execute().fetchone()
namespace_id = db_namespace['id']
for resource_type in metadata.get('resource_type_associations',
[]):
try:
resource_type_id = \
_get_resource_type_id(meta, resource_type['name'])
except AttributeError:
values = {
'name': resource_type['name'],
'protected': True,
'created_at': timeutils.utcnow()
}
_insert_data_to_db(metadef_resource_types_table,
values)
resource_type_id =\
_get_resource_type_id(meta, resource_type['name'])
namespace_id = db_namespace[0]
for resource_type in metadata.get('resource_type_associations', []):
values = {
'resource_type_id': resource_type_id,
'namespace_id': namespace_id,
'created_at': timeutils.utcnow(),
'properties_target': resource_type.get(
'properties_target'),
'prefix': resource_type.get('prefix', None)
}
_insert_data_to_db(metadef_namespace_resource_types_tables,
values)
rt_id = _get_resource_type_id(meta, resource_type['name'])
if not rt_id:
rt_name = resource_type['name']
resource_type = {
'name': rt_name,
'protected': True,
'created_at': timeutils.utcnow()
}
_insert_data_to_db(resource_types_table,
resource_type)
rt_id = _get_resource_type_id(meta, rt_name)
values.update({
'resource_type_id': rt_id,
'created_at': timeutils.utcnow(),
})
_insert_data_to_db(namespace_rt_table, values)
elif prefer_new:
values.update({
'resource_type_id': rt_id,
'updated_at': timeutils.utcnow(),
})
_update_rt_association(namespace_rt_table, values,
rt_id, namespace_id)
for property, schema in six.iteritems(metadata.get('properties',
{})):
values = {
'name': property,
'namespace_id': namespace_id,
'json_schema': json.dumps(schema),
'created_at': timeutils.utcnow()
'json_schema': json.dumps(schema)
}
_insert_data_to_db(metadef_properties_table, values)
property_id = _get_resource_id(properties_table,
namespace_id, property)
if not property_id:
values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(properties_table, values)
elif prefer_new:
values.update({'updated_at': timeutils.utcnow()})
_update_data_in_db(properties_table, values,
properties_table.c.id, property_id)
for object in metadata.get('objects', []):
values = {
'name': object.get('name'),
'name': object['name'],
'description': object.get('description', None),
'namespace_id': namespace_id,
'json_schema': json.dumps(object.get('properties', None)),
'created_at': timeutils.utcnow()
'json_schema': json.dumps(
object.get('properties', None))
}
_insert_data_to_db(metadef_objects_table, values)
object_id = _get_resource_id(objects_table, namespace_id,
object['name'])
if not object_id:
values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(objects_table, values)
elif prefer_new:
values.update({'updated_at': timeutils.utcnow()})
_update_data_in_db(objects_table, values,
objects_table.c.id, object_id)
for tag in metadata.get('tags', []):
timeutils_utcnow = timeutils.utcnow()
values = {
'name': tag.get('name'),
'namespace_id': namespace_id,
'created_at': timeutils_utcnow,
'updated_at': timeutils_utcnow
}
_insert_data_to_db(metadef_tags_table, values)
tag_id = _get_resource_id(tags_table, namespace_id, tag['id'])
if not tag_id:
values.update({'created_at': timeutils.utcnow()})
_insert_data_to_db(tags_table, values)
elif prefer_new:
values.update({'updated_at': timeutils.utcnow()})
_update_data_in_db(tags_table, values,
tags_table.c.id, tag_id)
LOG.info(_LI("File %s loaded to database."), file)
LOG.info(_LI("Metadata loading finished"))
def _clear_metadata(meta):
metadef_tables = [get_metadef_properties_table(meta),
get_metadef_objects_table(meta),
get_metadef_tags_table(meta),
get_metadef_namespace_resource_types_table(meta),
get_metadef_namespaces_table(meta)]
for table in metadef_tables:
table.delete().execute()
LOG.info(_LI("Table %s has been cleared"), table)
def _insert_data_to_db(table, values, log_exception=True):
try:
table.insert(values=values).execute()
@ -264,6 +340,23 @@ def _insert_data_to_db(table, values, log_exception=True):
LOG.warning(_LW("Duplicate entry for values: %s"), values)
def _update_data_in_db(table, values, column, value):
try:
(table.update(values=values).
where(column == value).execute())
except sqlalchemy.exc.IntegrityError:
LOG.warning(_LW("Duplicate entry for values: %s"), values)
def _update_rt_association(table, values, rt_id, namespace_id):
try:
(table.update(values=values).
where(and_(table.c.resource_type_id == rt_id,
table.c.namespace_id == namespace_id)).execute())
except sqlalchemy.exc.IntegrityError:
LOG.warning(_LW("Duplicate entry for values: %s"), values)
def _export_data_to_file(meta, path):
if not path:
path = CONF.metadata_source_path
@ -349,11 +442,22 @@ def _export_data_to_file(meta, path):
'namespace': namespace_file_name, 'file': file_name})
def db_load_metadefs(engine, metadata_path=None):
def db_load_metadefs(engine, metadata_path=None, merge=False,
prefer_new=False, overwrite=False):
meta = MetaData()
meta.bind = engine
_populate_metadata(meta, metadata_path)
if not merge and (prefer_new or overwrite):
LOG.error(_LE("To use --prefer_new or --overwrite you need to combine "
"of these options with --merge option."))
return
if prefer_new and overwrite and merge:
LOG.error(_LE("Please provide no more than one option from this list: "
"--prefer_new, --overwrite"))
return
_populate_metadata(meta, metadata_path, merge, prefer_new, overwrite)
def db_unload_metadefs(engine):

View File

@ -113,7 +113,7 @@ class TestLegacyManage(TestManageBase):
self._main_test_helper(['glance.cmd.manage', 'db_load_metadefs'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
None)
None, None, None, None)
def test_db_metadefs_load_with_specified_path(self):
db_metadata.db_load_metadefs = mock.Mock()
@ -121,7 +121,31 @@ class TestLegacyManage(TestManageBase):
'/mock/'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
'/mock/')
'/mock/', None, None, None)
def test_db_metadefs_load_from_path_merge(self):
db_metadata.db_load_metadefs = mock.Mock()
self._main_test_helper(['glance.cmd.manage', 'db_load_metadefs',
'/mock/', 'True'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
'/mock/', 'True', None, None)
def test_db_metadefs_load_from_merge_and_prefer_new(self):
db_metadata.db_load_metadefs = mock.Mock()
self._main_test_helper(['glance.cmd.manage', 'db_load_metadefs',
'/mock/', 'True', 'True'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
'/mock/', 'True', 'True', None)
def test_db_metadefs_load_from_merge_and_prefer_new_and_overwrite(self):
db_metadata.db_load_metadefs = mock.Mock()
self._main_test_helper(['glance.cmd.manage', 'db_load_metadefs',
'/mock/', 'True', 'True', 'True'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
'/mock/', 'True', 'True', 'True')
def test_db_metadefs_export(self):
db_metadata.db_export_metadefs = mock.Mock()
@ -201,7 +225,7 @@ class TestManage(TestManageBase):
self._main_test_helper(['glance.cmd.manage', 'db', 'load_metadefs'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
None)
None, False, False, False)
def test_db_metadefs_load_with_specified_path(self):
db_metadata.db_load_metadefs = mock.Mock()
@ -209,7 +233,47 @@ class TestManage(TestManageBase):
'--path', '/mock/'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
'/mock/')
'/mock/', False, False, False)
def test_db_metadefs_load_prefer_new_with_path(self):
db_metadata.db_load_metadefs = mock.Mock()
self._main_test_helper(['glance.cmd.manage', 'db', 'load_metadefs',
'--path', '/mock/', '--merge', '--prefer_new'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
'/mock/', True, True, False)
def test_db_metadefs_load_prefer_new(self):
db_metadata.db_load_metadefs = mock.Mock()
self._main_test_helper(['glance.cmd.manage', 'db', 'load_metadefs',
'--merge', '--prefer_new'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
None, True, True, False)
def test_db_metadefs_load_overwrite_existing(self):
db_metadata.db_load_metadefs = mock.Mock()
self._main_test_helper(['glance.cmd.manage', 'db', 'load_metadefs',
'--merge', '--overwrite'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
None, True, False, True)
def test_db_metadefs_load_prefer_new_and_overwrite_existing(self):
db_metadata.db_load_metadefs = mock.Mock()
self._main_test_helper(['glance.cmd.manage', 'db', 'load_metadefs',
'--merge', '--prefer_new', '--overwrite'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
None, True, True, True)
def test_db_metadefs_load_from_path_overwrite_existing(self):
db_metadata.db_load_metadefs = mock.Mock()
self._main_test_helper(['glance.cmd.manage', 'db', 'load_metadefs',
'--path', '/mock/', '--merge', '--overwrite'],
db_metadata.db_load_metadefs,
db_api.get_engine(),
'/mock/', True, False, True)
def test_db_metadefs_export(self):
db_metadata.db_export_metadefs = mock.Mock()