# Copyright 2014 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import collections from contextlib import contextmanager import subprocess from alembic.ddl import base as alembic_ddl from alembic import script as alembic_script from oslo_config import cfg from oslo_config import fixture as config_fixture from oslo_db.sqlalchemy import test_migrations from oslo_log import log as logging from oslotest import base as oslotest_base import six import sqlalchemy from sqlalchemy import event # noqa from sqlalchemy.sql import ddl as sqla_ddl from neutron.db import migration as migration_root from neutron.db.migration.alembic_migrations import external from neutron.db.migration import cli as migration from neutron.db.migration.models import head as head_models from neutron.tests import base as test_base from neutron.tests.functional import base as functional_base from neutron.tests.unit import testlib_api cfg.CONF.import_opt('core_plugin', 'neutron.conf.common') CREATION_OPERATIONS = { 'sqla': (sqla_ddl.CreateIndex, sqla_ddl.CreateTable, sqla_ddl.CreateColumn, ), 'alembic': (alembic_ddl.AddColumn, ) } DROP_OPERATIONS = { 'sqla': (sqla_ddl.DropConstraint, sqla_ddl.DropIndex, sqla_ddl.DropTable, ), 'alembic': (alembic_ddl.DropColumn, ) } LOG = logging.getLogger(__name__) # NOTE(slaweq): replace alembic_util logging functions used normally with # olso_log logger to limit output on stdout migration.log_error = LOG.error migration.log_warning = LOG.warning migration.log_info = LOG.info def upgrade(engine, alembic_config, branch_name='heads'): cfg.CONF.set_override('connection', engine.url, group='database') migration.do_alembic_command(alembic_config, 'upgrade', branch_name) class _TestModelsMigrations(test_migrations.ModelsMigrationsSync): '''Test for checking of equality models state and migrations. For the opportunistic testing you need to set up a db named 'openstack_citest' with user 'openstack_citest' and password 'openstack_citest' on localhost. The test will then use that db and user/password combo to run the tests. For PostgreSQL on Ubuntu this can be done with the following commands:: sudo -u postgres psql postgres=# create user openstack_citest with createdb login password 'openstack_citest'; postgres=# create database openstack_citest with owner openstack_citest; For MySQL on Ubuntu this can be done with the following commands:: mysql -u root >create database openstack_citest; >grant all privileges on openstack_citest.* to openstack_citest@localhost identified by 'openstack_citest'; Output is a list that contains information about differences between db and models. Output example:: [('add_table', Table('bat', MetaData(bind=None), Column('info', String(), table=), schema=None)), ('remove_table', Table(u'bar', MetaData(bind=None), Column(u'data', VARCHAR(), table=), schema=None)), ('add_column', None, 'foo', Column('data', Integer(), table=)), ('remove_column', None, 'foo', Column(u'old_data', VARCHAR(), table=None)), [('modify_nullable', None, 'foo', u'x', {'existing_server_default': None, 'existing_type': INTEGER()}, True, False)]] * ``remove_*`` means that there is extra table/column/constraint in db; * ``add_*`` means that it is missing in db; * ``modify_*`` means that on column in db is set wrong type/nullable/server_default. Element contains information: - what should be modified, - schema, - table, - column, - existing correct column parameters, - right value, - wrong value. This class also contains tests for branches, like that correct operations are used in contract and expand branches. ''' BUILD_SCHEMA = False TIMEOUT_SCALING_FACTOR = 4 def setUp(self): super(_TestModelsMigrations, self).setUp() self.cfg = self.useFixture(config_fixture.Config()) self.cfg.config(core_plugin='ml2') self.alembic_config = migration.get_neutron_config() self.alembic_config.neutron_config = cfg.CONF # Migration tests can take a long time self.useFixture(test_base.Timeout(scaling=self.TIMEOUT_SCALING_FACTOR)) def db_sync(self, engine): upgrade(engine, self.alembic_config) def get_engine(self): return self.engine def get_metadata(self): return head_models.get_metadata() def include_object(self, object_, name, type_, reflected, compare_to): if type_ == 'table' and (name == 'alembic_version' or name in external.TABLES): return False return super(_TestModelsMigrations, self).include_object( object_, name, type_, reflected, compare_to) def filter_metadata_diff(self, diff): return list(filter(self.remove_unrelated_errors, diff)) # Remove some difference that are not mistakes just specific of # dialects, etc def remove_unrelated_errors(self, element): insp = sqlalchemy.engine.reflection.Inspector.from_engine( self.get_engine()) dialect = self.get_engine().dialect.name if isinstance(element, tuple): if dialect == 'mysql' and element[0] == 'remove_index': table_name = element[1].table.name for fk in insp.get_foreign_keys(table_name): if fk['name'] == element[1].name: return False cols = [c.name for c in element[1].expressions] for col in cols: if col in insp.get_pk_constraint( table_name)['constrained_columns']: return False else: for modified, _, table, column, _, _, new in element: if modified == 'modify_default' and dialect == 'mysql': constrained = insp.get_pk_constraint(table) if column in constrained['constrained_columns']: return False return True def test_upgrade_expand_branch(self): # Verify that "command neutron-db-manage upgrade --expand" works # without errors. Check this for both MySQL and PostgreSQL. upgrade(self.engine, self.alembic_config, branch_name='%s@head' % migration.EXPAND_BRANCH) def test_upgrade_contract_branch(self): # Verify that "command neutron-db-manage upgrade --contract" works # without errors. Check this for both MySQL and PostgreSQL. upgrade(self.engine, self.alembic_config, branch_name='%s@head' % migration.CONTRACT_BRANCH) @contextmanager def _listener(self, engine, listener_func): try: event.listen(engine, 'before_execute', listener_func) yield finally: event.remove(engine, 'before_execute', listener_func) def test_branches(self): drop_exceptions = collections.defaultdict(list) creation_exceptions = collections.defaultdict(list) def find_migration_exceptions(): # Due to some misunderstandings and some conscious decisions, # there may be some expand migrations which drop elements and # some contract migrations which create elements. These excepted # elements must be returned by a method in the script itself. # The names of the method must be 'contract_creation_exceptions' # or 'expand_drop_exceptions'. The methods must have a docstring # explaining the reason for the exception. # # Here we build lists of the excepted elements and verify that # they are documented. script = alembic_script.ScriptDirectory.from_config( self.alembic_config) for m in list(script.walk_revisions(base='base', head='heads')): branches = m.branch_labels or [] if migration.CONTRACT_BRANCH in branches: method_name = 'contract_creation_exceptions' exceptions_dict = creation_exceptions elif migration.EXPAND_BRANCH in branches: method_name = 'expand_drop_exceptions' exceptions_dict = drop_exceptions else: continue get_excepted_elements = getattr(m.module, method_name, None) if not get_excepted_elements: continue explanation = getattr(get_excepted_elements, '__doc__', "") if len(explanation) < 1: self.fail("%s() requires docstring with explanation" % '.'.join([m.module.__name__, get_excepted_elements.__name__])) for sa_type, elements in get_excepted_elements().items(): exceptions_dict[sa_type].extend(elements) def is_excepted_sqla(clauseelement, exceptions): """Identify excepted operations that are allowed for the branch.""" element = clauseelement.element element_name = element.name if isinstance(element, sqlalchemy.Index): element_name = element.table.name for sa_type_, excepted_names in exceptions.items(): if isinstance(element, sa_type_): if element_name in excepted_names: return True def is_excepted_alembic(clauseelement, exceptions): """Identify excepted operations that are allowed for the branch.""" # For alembic the clause is AddColumn or DropColumn column = clauseelement.column.name table = clauseelement.column.table.name element_name = '.'.join([table, column]) for alembic_type, excepted_names in exceptions.items(): if alembic_type == sqlalchemy.Column: if element_name in excepted_names: return True def is_allowed(clauseelement, exceptions, disallowed_ops): if (isinstance(clauseelement, disallowed_ops['sqla']) and hasattr(clauseelement, 'element')): return is_excepted_sqla(clauseelement, exceptions) if isinstance(clauseelement, disallowed_ops['alembic']): return is_excepted_alembic(clauseelement, exceptions) return True def check_expand_branch(conn, clauseelement, multiparams, params): if not is_allowed(clauseelement, drop_exceptions, DROP_OPERATIONS): self.fail("Migration in expand branch contains drop command") def check_contract_branch(conn, clauseelement, multiparams, params): if not is_allowed(clauseelement, creation_exceptions, CREATION_OPERATIONS): self.fail("Migration in contract branch contains create " "command") find_migration_exceptions() engine = self.engine cfg.CONF.set_override('connection', engine.url, group='database') with engine.begin() as connection: self.alembic_config.attributes['connection'] = connection # upgrade to latest release first; --expand users are expected to # apply all alembic scripts from previous releases before applying # the new ones for release in migration_root.NEUTRON_MILESTONES: release_revisions = migration._find_milestone_revisions( self.alembic_config, release) for rev in release_revisions: migration.do_alembic_command( self.alembic_config, 'upgrade', rev[0]) with self._listener(engine, check_expand_branch): migration.do_alembic_command( self.alembic_config, 'upgrade', '%s@head' % migration.EXPAND_BRANCH) with self._listener(engine, check_contract_branch): migration.do_alembic_command( self.alembic_config, 'upgrade', '%s@head' % migration.CONTRACT_BRANCH) def _test_has_offline_migrations(self, revision, expected): engine = self.get_engine() cfg.CONF.set_override('connection', engine.url, group='database') migration.do_alembic_command(self.alembic_config, 'upgrade', revision) self.assertEqual(expected, migration.has_offline_migrations(self.alembic_config, 'unused')) def test_has_offline_migrations_pending_contract_scripts(self): self._test_has_offline_migrations('kilo', True) def test_has_offline_migrations_all_heads_upgraded(self): self._test_has_offline_migrations('heads', False) # NOTE(ihrachys): if this test fails for you, it probably means that you # attempt to add an unsafe contract migration script, that is in # contradiction to blueprint online-upgrades # TODO(ihrachys): revisit later in Pike+ where some contract scripts may be # safe again def test_forbid_offline_migrations_starting_newton(self): engine = self.get_engine() cfg.CONF.set_override('connection', engine.url, group='database') # the following revisions are Newton heads for revision in ('5cd92597d11d', '5c85685d616d'): migration.do_alembic_command( self.alembic_config, 'upgrade', revision) self.assertFalse(migration.has_offline_migrations( self.alembic_config, 'unused'), msg='Offline contract migration scripts are forbidden for Ocata+') class TestModelsMigrationsMysql(testlib_api.MySQLTestCaseMixin, _TestModelsMigrations, testlib_api.SqlTestCaseLight, functional_base.BaseLoggingTestCase): @test_base.skip_if_timeout("bug 1687027") def test_forbid_offline_migrations_starting_newton(self): super(TestModelsMigrationsMysql, self).test_forbid_offline_migrations_starting_newton() @test_base.skip_if_timeout("bug 1687027") def test_check_mysql_engine(self): engine = self.get_engine() cfg.CONF.set_override('connection', engine.url, group='database') with engine.begin() as connection: self.alembic_config.attributes['connection'] = connection migration.do_alembic_command(self.alembic_config, 'upgrade', 'heads') insp = sqlalchemy.engine.reflection.Inspector.from_engine(engine) # Test that table creation on MySQL only builds InnoDB tables tables = insp.get_table_names() self.assertGreater(len(tables), 0, "No tables found. Wrong schema?") res = [table for table in tables if insp.get_table_options(table)['mysql_engine'] != 'InnoDB' and table != 'alembic_version'] self.assertEqual(0, len(res), "%s non InnoDB tables created" % res) @test_base.skip_if_timeout("bug 1687027") def test_upgrade_expand_branch(self): super(TestModelsMigrationsMysql, self).test_upgrade_expand_branch() @test_base.skip_if_timeout("bug 1687027") def test_upgrade_contract_branch(self): super(TestModelsMigrationsMysql, self).test_upgrade_contract_branch() @test_base.skip_if_timeout("bug 1687027") def test_branches(self): super(TestModelsMigrationsMysql, self).test_branches() @test_base.skip_if_timeout("bug 1687027") def test_has_offline_migrations_pending_contract_scripts(self): super(TestModelsMigrationsMysql, self).test_has_offline_migrations_pending_contract_scripts() @test_base.skip_if_timeout("bug 1687027") def test_has_offline_migrations_all_heads_upgraded(self): super(TestModelsMigrationsMysql, self).test_has_offline_migrations_all_heads_upgraded() @test_base.skip_if_timeout("bug 1687027") def test_models_sync(self): super(TestModelsMigrationsMysql, self).test_models_sync() class TestModelsMigrationsPsql(testlib_api.PostgreSQLTestCaseMixin, _TestModelsMigrations, testlib_api.SqlTestCaseLight): pass class TestSanityCheck(testlib_api.SqlTestCaseLight): BUILD_SCHEMA = False def setUp(self): super(TestSanityCheck, self).setUp() self.alembic_config = migration.get_neutron_config() self.alembic_config.neutron_config = cfg.CONF def _drop_table(self, table): with self.engine.begin() as conn: table.drop(conn) def test_check_sanity_1df244e556f5(self): ha_router_agent_port_bindings = sqlalchemy.Table( 'ha_router_agent_port_bindings', sqlalchemy.MetaData(), sqlalchemy.Column('port_id', sqlalchemy.String(36)), sqlalchemy.Column('router_id', sqlalchemy.String(36)), sqlalchemy.Column('l3_agent_id', sqlalchemy.String(36))) with self.engine.connect() as conn: ha_router_agent_port_bindings.create(conn) self.addCleanup(self._drop_table, ha_router_agent_port_bindings) # NOTE(haleyb): without this disabled, pylint complains # about a missing 'dml' argument. # pylint: disable=no-value-for-parameter conn.execute(ha_router_agent_port_bindings.insert(), [ {'port_id': '1234', 'router_id': '12345', 'l3_agent_id': '123'}, {'port_id': '12343', 'router_id': '12345', 'l3_agent_id': '123'} ]) script_dir = alembic_script.ScriptDirectory.from_config( self.alembic_config) script = script_dir.get_revision("1df244e556f5").module self.assertRaises(script.DuplicateL3HARouterAgentPortBinding, script.check_sanity, conn) def test_check_sanity_030a959ceafa(self): routerports = sqlalchemy.Table( 'routerports', sqlalchemy.MetaData(), sqlalchemy.Column('router_id', sqlalchemy.String(36)), sqlalchemy.Column('port_id', sqlalchemy.String(36)), sqlalchemy.Column('port_type', sqlalchemy.String(255))) with self.engine.connect() as conn: routerports.create(conn) self.addCleanup(self._drop_table, routerports) # NOTE(haleyb): without this disabled, pylint complains # about a missing 'dml' argument. # pylint: disable=no-value-for-parameter conn.execute(routerports.insert(), [ {'router_id': '1234', 'port_id': '12345', 'port_type': '123'}, {'router_id': '12343', 'port_id': '12345', 'port_type': '1232'} ]) script_dir = alembic_script.ScriptDirectory.from_config( self.alembic_config) script = script_dir.get_revision("030a959ceafa").module self.assertRaises(script.DuplicatePortRecordinRouterPortdatabase, script.check_sanity, conn) def test_check_sanity_6b461a21bcfc_dup_on_fixed_ip(self): floatingips = sqlalchemy.Table( 'floatingips', sqlalchemy.MetaData(), sqlalchemy.Column('floating_network_id', sqlalchemy.String(36)), sqlalchemy.Column('fixed_port_id', sqlalchemy.String(36)), sqlalchemy.Column('fixed_ip_address', sqlalchemy.String(64))) with self.engine.connect() as conn: floatingips.create(conn) self.addCleanup(self._drop_table, floatingips) # NOTE(haleyb): without this disabled, pylint complains # about a missing 'dml' argument. # pylint: disable=no-value-for-parameter conn.execute(floatingips.insert(), [ {'floating_network_id': '12345', 'fixed_port_id': '1234567', 'fixed_ip_address': '12345678'}, {'floating_network_id': '12345', 'fixed_port_id': '1234567', 'fixed_ip_address': '12345678'} ]) script_dir = alembic_script.ScriptDirectory.from_config( self.alembic_config) script = script_dir.get_revision("6b461a21bcfc").module self.assertRaises(script.DuplicateFloatingIPforOneFixedIP, script.check_sanity, conn) def test_check_sanity_6b461a21bcfc_dup_on_no_fixed_ip(self): floatingips = sqlalchemy.Table( 'floatingips', sqlalchemy.MetaData(), sqlalchemy.Column('floating_network_id', sqlalchemy.String(36)), sqlalchemy.Column('fixed_port_id', sqlalchemy.String(36)), sqlalchemy.Column('fixed_ip_address', sqlalchemy.String(64))) with self.engine.connect() as conn: floatingips.create(conn) self.addCleanup(self._drop_table, floatingips) # NOTE(haleyb): without this disabled, pylint complains # about a missing 'dml' argument. # pylint: disable=no-value-for-parameter conn.execute(floatingips.insert(), [ {'floating_network_id': '12345', 'fixed_port_id': '1234567', 'fixed_ip_address': None}, {'floating_network_id': '12345', 'fixed_port_id': '1234567', 'fixed_ip_address': None} ]) script_dir = alembic_script.ScriptDirectory.from_config( self.alembic_config) script = script_dir.get_revision("6b461a21bcfc").module self.assertIsNone(script.check_sanity(conn)) class TestWalkDowngrade(oslotest_base.BaseTestCase): def setUp(self): super(TestWalkDowngrade, self).setUp() self.alembic_config = migration.get_neutron_config() self.alembic_config.neutron_config = cfg.CONF def test_no_downgrade(self): script_dir = alembic_script.ScriptDirectory.from_config( self.alembic_config) versions = [v for v in script_dir.walk_revisions(base='base', head='heads')] failed_revisions = [] for version in versions: if hasattr(version.module, 'downgrade'): failed_revisions.append(version.revision) if failed_revisions: self.fail('Migrations %s have downgrade' % failed_revisions) return True class _TestWalkMigrations(object): '''This will add framework for testing schema migration for different backends. ''' BUILD_SCHEMA = False def execute_cmd(self, cmd=None): proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True) output = proc.communicate()[0] self.assertEqual(0, proc.returncode, 'Command failed with ' 'output:\n%s' % output) def _get_alembic_config(self, uri): db_config = migration.get_neutron_config() self.script_dir = alembic_script.ScriptDirectory.from_config(db_config) db_config.neutron_config = cfg.CONF db_config.neutron_config.set_override('connection', six.text_type(uri), group='database') return db_config def _revisions(self): """Provides revisions and its parent revisions. :return: List of tuples. Every tuple contains revision and its parent revision. """ revisions = list(self.script_dir.walk_revisions("base", "heads")) revisions = list(reversed(revisions)) for rev in revisions: # Destination, current yield rev.revision, rev.down_revision def _migrate_up(self, config, engine, dest, curr, with_data=False): if with_data: data = None pre_upgrade = getattr( self, "_pre_upgrade_%s" % dest, None) if pre_upgrade: data = pre_upgrade(engine) migration.do_alembic_command(config, 'upgrade', dest) if with_data: check = getattr(self, "_check_%s" % dest, None) if check and data: check(engine, data) def test_walk_versions(self): """Test migrations ability to upgrade and downgrade. """ engine = self.engine config = self._get_alembic_config(engine.url) revisions = self._revisions() for dest, curr in revisions: self._migrate_up(config, engine, dest, curr, with_data=True) class TestWalkMigrationsMysql(testlib_api.MySQLTestCaseMixin, _TestWalkMigrations, testlib_api.SqlTestCaseLight): # NOTE(slaweq): this workaround is taken from Manila patch: # https://review.opendev.org/#/c/291397/ # Set 5 minutes timeout for case of running it on # very slow nodes/VMs. Note, that this test becomes slower with each # addition of new DB migration. On fast nodes it can take about 5-10 # secs having Mitaka set of migrations. 'pymysql' works much slower # on slow nodes than 'psycopg2' and because of that this increased # timeout is required only when for testing with 'mysql' backend. @test_base.set_timeout(600) @test_base.skip_if_timeout("bug 1687027") def test_walk_versions(self): super(TestWalkMigrationsMysql, self).test_walk_versions() class TestWalkMigrationsPsql(testlib_api.PostgreSQLTestCaseMixin, _TestWalkMigrations, testlib_api.SqlTestCaseLight): pass