heat/heat/db/sqlalchemy/utils.py

99 lines
2.9 KiB
Python

#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# SQLAlchemy helper functions
import sqlalchemy
from sqlalchemy.orm import exc
import tenacity
def clone_table(name, parent, meta, newcols=None, ignorecols=None,
swapcols=None, ignorecons=None):
"""Helper function that clones parent table schema onto new table.
:param name: new table name
:param parent: parent table to copy schema from
:param newcols: names of new columns to be added
:param ignorecols: names of columns to be ignored while cloning
:param swapcols: alternative column schema
:param ignorecons: names of constraints to be ignored
:return: sqlalchemy.Table instance
"""
newcols = newcols or []
ignorecols = ignorecols or []
swapcols = swapcols or {}
ignorecons = ignorecons or []
cols = [c.copy() for c in parent.columns
if c.name not in ignorecols
if c.name not in swapcols]
cols.extend(swapcols.values())
cols.extend(newcols)
new_table = sqlalchemy.Table(name, meta, *(cols))
def _is_ignorable(cons):
# consider constraints on columns only
if hasattr(cons, 'columns'):
for col in ignorecols:
if col in cons.columns:
return True
return False
constraints = [c.copy() for c in parent.constraints
if c.name not in ignorecons
if not _is_ignorable(c)]
for c in constraints:
new_table.append_constraint(c)
new_table.create()
return new_table
def migrate_data(migrate_engine,
table,
new_table,
skip_columns=None):
table_name = table.name
list_of_rows = list(table.select().execute())
colnames = [c.name for c in table.columns]
for row in list_of_rows:
values = dict(zip(colnames,
map(lambda colname: getattr(row, colname),
colnames)))
if skip_columns is not None:
for column in skip_columns:
del values[column]
migrate_engine.execute(new_table.insert(values))
table.drop()
new_table.rename(table_name)
def retry_on_stale_data_error(func):
wrapper = tenacity.retry(
stop=tenacity.stop_after_attempt(3),
retry=tenacity.retry_if_exception_type(exc.StaleDataError),
reraise=True)
return wrapper(func)