b903d4e1ee
Now that we no longer support py27, we can use the standard library unittest.mock module instead of the third party mock lib. Change-Id: I3e1e5ed4e72837d45b78e5ccb9ce8cca416e5c11 Signed-off-by: Sean McGinnis <sean.mcginnis@gmail.com>
288 lines
13 KiB
Python
288 lines
13 KiB
Python
# Copyright 2013 Mirantis Inc.
|
|
# All Rights Reserved
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import os
|
|
import tempfile
|
|
from unittest import mock
|
|
|
|
from migrate import exceptions as migrate_exception
|
|
from migrate.versioning import api as versioning_api
|
|
import sqlalchemy
|
|
|
|
from oslo_db import exception as db_exception
|
|
from oslo_db.sqlalchemy import migration
|
|
from oslo_db.tests.sqlalchemy import base as test_base
|
|
from oslo_db.tests import utils as test_utils
|
|
|
|
|
|
class TestMigrationCommon(test_base._DbTestCase):
|
|
def setUp(self):
|
|
super(TestMigrationCommon, self).setUp()
|
|
|
|
migration._REPOSITORY = None
|
|
self.path = tempfile.mkdtemp('test_migration')
|
|
self.path1 = tempfile.mkdtemp('test_migration')
|
|
self.return_value = '/home/openstack/migrations'
|
|
self.return_value1 = '/home/extension/migrations'
|
|
self.init_version = 1
|
|
self.test_version = 123
|
|
|
|
self.patcher_repo = mock.patch.object(migration, 'Repository')
|
|
self.repository = self.patcher_repo.start()
|
|
self.repository.side_effect = [self.return_value, self.return_value1]
|
|
|
|
self.mock_api_db = mock.patch.object(versioning_api, 'db_version')
|
|
self.mock_api_db_version = self.mock_api_db.start()
|
|
self.mock_api_db_version.return_value = self.test_version
|
|
|
|
def tearDown(self):
|
|
os.rmdir(self.path)
|
|
self.mock_api_db.stop()
|
|
self.patcher_repo.stop()
|
|
super(TestMigrationCommon, self).tearDown()
|
|
|
|
def test_find_migrate_repo_path_not_found(self):
|
|
self.assertRaises(
|
|
db_exception.DBMigrationError,
|
|
migration._find_migrate_repo,
|
|
"/foo/bar/",
|
|
)
|
|
self.assertIsNone(migration._REPOSITORY)
|
|
|
|
def test_find_migrate_repo_called_once(self):
|
|
my_repository = migration._find_migrate_repo(self.path)
|
|
self.repository.assert_called_once_with(self.path)
|
|
self.assertEqual(self.return_value, my_repository)
|
|
|
|
def test_find_migrate_repo_called_few_times(self):
|
|
repo1 = migration._find_migrate_repo(self.path)
|
|
repo2 = migration._find_migrate_repo(self.path1)
|
|
self.assertNotEqual(repo1, repo2)
|
|
|
|
def test_db_version_control(self):
|
|
with test_utils.nested(
|
|
mock.patch.object(migration, '_find_migrate_repo'),
|
|
mock.patch.object(versioning_api, 'version_control'),
|
|
) as (mock_find_repo, mock_version_control):
|
|
mock_find_repo.return_value = self.return_value
|
|
|
|
version = migration.db_version_control(
|
|
self.engine, self.path, self.test_version)
|
|
|
|
self.assertEqual(self.test_version, version)
|
|
mock_version_control.assert_called_once_with(
|
|
self.engine, self.return_value, self.test_version)
|
|
|
|
@mock.patch.object(migration, '_find_migrate_repo')
|
|
@mock.patch.object(versioning_api, 'version_control')
|
|
def test_db_version_control_version_less_than_actual_version(
|
|
self, mock_version_control, mock_find_repo):
|
|
mock_find_repo.return_value = self.return_value
|
|
mock_version_control.side_effect = (migrate_exception.
|
|
DatabaseAlreadyControlledError)
|
|
self.assertRaises(db_exception.DBMigrationError,
|
|
migration.db_version_control, self.engine,
|
|
self.path, self.test_version - 1)
|
|
|
|
@mock.patch.object(migration, '_find_migrate_repo')
|
|
@mock.patch.object(versioning_api, 'version_control')
|
|
def test_db_version_control_version_greater_than_actual_version(
|
|
self, mock_version_control, mock_find_repo):
|
|
mock_find_repo.return_value = self.return_value
|
|
mock_version_control.side_effect = (migrate_exception.
|
|
InvalidVersionError)
|
|
self.assertRaises(db_exception.DBMigrationError,
|
|
migration.db_version_control, self.engine,
|
|
self.path, self.test_version + 1)
|
|
|
|
def test_db_version_return(self):
|
|
ret_val = migration.db_version(self.engine, self.path,
|
|
self.init_version)
|
|
self.assertEqual(self.test_version, ret_val)
|
|
|
|
def test_db_version_raise_not_controlled_error_first(self):
|
|
with mock.patch.object(migration, 'db_version_control') as mock_ver:
|
|
|
|
self.mock_api_db_version.side_effect = [
|
|
migrate_exception.DatabaseNotControlledError('oups'),
|
|
self.test_version]
|
|
|
|
ret_val = migration.db_version(self.engine, self.path,
|
|
self.init_version)
|
|
self.assertEqual(self.test_version, ret_val)
|
|
mock_ver.assert_called_once_with(self.engine, self.path,
|
|
version=self.init_version)
|
|
|
|
def test_db_version_raise_not_controlled_error_tables(self):
|
|
with mock.patch.object(sqlalchemy, 'MetaData') as mock_meta:
|
|
self.mock_api_db_version.side_effect = \
|
|
migrate_exception.DatabaseNotControlledError('oups')
|
|
my_meta = mock.MagicMock()
|
|
my_meta.tables = {'a': 1, 'b': 2}
|
|
mock_meta.return_value = my_meta
|
|
|
|
self.assertRaises(
|
|
db_exception.DBMigrationError, migration.db_version,
|
|
self.engine, self.path, self.init_version)
|
|
|
|
@mock.patch.object(versioning_api, 'version_control')
|
|
def test_db_version_raise_not_controlled_error_no_tables(self, mock_vc):
|
|
with mock.patch.object(sqlalchemy, 'MetaData') as mock_meta:
|
|
self.mock_api_db_version.side_effect = (
|
|
migrate_exception.DatabaseNotControlledError('oups'),
|
|
self.init_version)
|
|
my_meta = mock.MagicMock()
|
|
my_meta.tables = {}
|
|
mock_meta.return_value = my_meta
|
|
migration.db_version(self.engine, self.path, self.init_version)
|
|
|
|
mock_vc.assert_called_once_with(self.engine, self.return_value1,
|
|
self.init_version)
|
|
|
|
@mock.patch.object(versioning_api, 'version_control')
|
|
def test_db_version_raise_not_controlled_alembic_tables(self, mock_vc):
|
|
# When there are tables but the alembic control table
|
|
# (alembic_version) is present, attempt to version the db.
|
|
# This simulates the case where there is are multiple repos (different
|
|
# abs_paths) and a different path has been versioned already.
|
|
with mock.patch.object(sqlalchemy, 'MetaData') as mock_meta:
|
|
self.mock_api_db_version.side_effect = [
|
|
migrate_exception.DatabaseNotControlledError('oups'), None]
|
|
my_meta = mock.MagicMock()
|
|
my_meta.tables = {'alembic_version': 1, 'b': 2}
|
|
mock_meta.return_value = my_meta
|
|
|
|
migration.db_version(self.engine, self.path, self.init_version)
|
|
|
|
mock_vc.assert_called_once_with(self.engine, self.return_value1,
|
|
self.init_version)
|
|
|
|
@mock.patch.object(versioning_api, 'version_control')
|
|
def test_db_version_raise_not_controlled_migrate_tables(self, mock_vc):
|
|
# When there are tables but the sqlalchemy-migrate control table
|
|
# (migrate_version) is present, attempt to version the db.
|
|
# This simulates the case where there is are multiple repos (different
|
|
# abs_paths) and a different path has been versioned already.
|
|
with mock.patch.object(sqlalchemy, 'MetaData') as mock_meta:
|
|
self.mock_api_db_version.side_effect = [
|
|
migrate_exception.DatabaseNotControlledError('oups'), None]
|
|
my_meta = mock.MagicMock()
|
|
my_meta.tables = {'migrate_version': 1, 'b': 2}
|
|
mock_meta.return_value = my_meta
|
|
|
|
migration.db_version(self.engine, self.path, self.init_version)
|
|
|
|
mock_vc.assert_called_once_with(self.engine, self.return_value1,
|
|
self.init_version)
|
|
|
|
def test_db_sync_wrong_version(self):
|
|
self.assertRaises(db_exception.DBMigrationError,
|
|
migration.db_sync, self.engine, self.path, 'foo')
|
|
|
|
@mock.patch.object(versioning_api, 'upgrade')
|
|
def test_db_sync_script_not_present(self, upgrade):
|
|
# For non existent migration script file sqlalchemy-migrate will raise
|
|
# VersionNotFoundError which will be wrapped in DBMigrationError.
|
|
upgrade.side_effect = migrate_exception.VersionNotFoundError
|
|
self.assertRaises(db_exception.DBMigrationError,
|
|
migration.db_sync, self.engine, self.path,
|
|
self.test_version + 1)
|
|
|
|
@mock.patch.object(versioning_api, 'upgrade')
|
|
def test_db_sync_known_error_raised(self, upgrade):
|
|
upgrade.side_effect = migrate_exception.KnownError
|
|
self.assertRaises(db_exception.DBMigrationError,
|
|
migration.db_sync, self.engine, self.path,
|
|
self.test_version + 1)
|
|
|
|
def test_db_sync_upgrade(self):
|
|
init_ver = 55
|
|
with test_utils.nested(
|
|
mock.patch.object(migration, '_find_migrate_repo'),
|
|
mock.patch.object(versioning_api, 'upgrade')
|
|
) as (mock_find_repo, mock_upgrade):
|
|
|
|
mock_find_repo.return_value = self.return_value
|
|
self.mock_api_db_version.return_value = self.test_version - 1
|
|
|
|
migration.db_sync(self.engine, self.path, self.test_version,
|
|
init_ver)
|
|
|
|
mock_upgrade.assert_called_once_with(
|
|
self.engine, self.return_value, self.test_version)
|
|
|
|
def test_db_sync_downgrade(self):
|
|
with test_utils.nested(
|
|
mock.patch.object(migration, '_find_migrate_repo'),
|
|
mock.patch.object(versioning_api, 'downgrade')
|
|
) as (mock_find_repo, mock_downgrade):
|
|
|
|
mock_find_repo.return_value = self.return_value
|
|
self.mock_api_db_version.return_value = self.test_version + 1
|
|
|
|
migration.db_sync(self.engine, self.path, self.test_version)
|
|
|
|
mock_downgrade.assert_called_once_with(
|
|
self.engine, self.return_value, self.test_version)
|
|
|
|
def test_db_sync_sanity_called(self):
|
|
with test_utils.nested(
|
|
mock.patch.object(migration, '_find_migrate_repo'),
|
|
mock.patch.object(migration, '_db_schema_sanity_check'),
|
|
mock.patch.object(versioning_api, 'downgrade')
|
|
) as (mock_find_repo, mock_sanity, mock_downgrade):
|
|
|
|
mock_find_repo.return_value = self.return_value
|
|
migration.db_sync(self.engine, self.path, self.test_version)
|
|
|
|
self.assertEqual([mock.call(self.engine), mock.call(self.engine)],
|
|
mock_sanity.call_args_list)
|
|
|
|
def test_db_sync_sanity_skipped(self):
|
|
with test_utils.nested(
|
|
mock.patch.object(migration, '_find_migrate_repo'),
|
|
mock.patch.object(migration, '_db_schema_sanity_check'),
|
|
mock.patch.object(versioning_api, 'downgrade')
|
|
) as (mock_find_repo, mock_sanity, mock_downgrade):
|
|
|
|
mock_find_repo.return_value = self.return_value
|
|
migration.db_sync(self.engine, self.path, self.test_version,
|
|
sanity_check=False)
|
|
|
|
self.assertFalse(mock_sanity.called)
|
|
|
|
def test_db_sanity_table_not_utf8(self):
|
|
with mock.patch.object(self, 'engine') as mock_eng:
|
|
type(mock_eng).name = mock.PropertyMock(return_value='mysql')
|
|
mock_eng.execute.return_value = [['table_A', 'latin1'],
|
|
['table_B', 'latin1']]
|
|
|
|
self.assertRaises(ValueError, migration._db_schema_sanity_check,
|
|
mock_eng)
|
|
|
|
def test_db_sanity_table_not_utf8_exclude_migrate_tables(self):
|
|
with mock.patch.object(self, 'engine') as mock_eng:
|
|
type(mock_eng).name = mock.PropertyMock(return_value='mysql')
|
|
# NOTE(morganfainberg): Check both lower and upper case versions
|
|
# of the migration table names (validate case insensitivity in
|
|
# the sanity check.
|
|
mock_eng.execute.return_value = [['migrate_version', 'latin1'],
|
|
['alembic_version', 'latin1'],
|
|
['MIGRATE_VERSION', 'latin1'],
|
|
['ALEMBIC_VERSION', 'latin1']]
|
|
|
|
migration._db_schema_sanity_check(mock_eng)
|