a6007a98b1
The final step in cleaning up our many base classes. This will allow us to do things consistently across the test suite in future changes. Change-Id: I0bf663fdfd3c8be93e5658493e221d0a7db78832 Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
267 lines
8.2 KiB
Python
267 lines
8.2 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
from unittest import mock
|
|
|
|
from sqlalchemy import exc as sa_exc
|
|
from sqlalchemy import inspect
|
|
from sqlalchemy import schema
|
|
from sqlalchemy import types
|
|
|
|
from oslo_db import exception
|
|
from oslo_db.sqlalchemy import enginefacade
|
|
from oslo_db.sqlalchemy import provision
|
|
from oslo_db.sqlalchemy import test_fixtures
|
|
from oslo_db.sqlalchemy import utils
|
|
from oslo_db.tests import base as test_base
|
|
from oslo_db.tests.sqlalchemy import base as db_test_base
|
|
|
|
|
|
class DropAllObjectsTest(db_test_base._DbTestCase):
|
|
|
|
def setUp(self):
|
|
super(DropAllObjectsTest, self).setUp()
|
|
|
|
self.metadata = metadata = schema.MetaData()
|
|
schema.Table(
|
|
'a', metadata,
|
|
schema.Column('id', types.Integer, primary_key=True),
|
|
mysql_engine='InnoDB'
|
|
)
|
|
schema.Table(
|
|
'b', metadata,
|
|
schema.Column('id', types.Integer, primary_key=True),
|
|
schema.Column('a_id', types.Integer, schema.ForeignKey('a.id')),
|
|
mysql_engine='InnoDB'
|
|
)
|
|
schema.Table(
|
|
'c', metadata,
|
|
schema.Column('id', types.Integer, primary_key=True),
|
|
schema.Column('b_id', types.Integer, schema.ForeignKey('b.id')),
|
|
schema.Column(
|
|
'd_id', types.Integer,
|
|
schema.ForeignKey('d.id', use_alter=True, name='c_d_fk')),
|
|
mysql_engine='InnoDB'
|
|
)
|
|
schema.Table(
|
|
'd', metadata,
|
|
schema.Column('id', types.Integer, primary_key=True),
|
|
schema.Column('c_id', types.Integer, schema.ForeignKey('c.id')),
|
|
mysql_engine='InnoDB'
|
|
)
|
|
|
|
metadata.create_all(self.engine, checkfirst=False)
|
|
# will drop nothing if the test worked
|
|
self.addCleanup(metadata.drop_all, self.engine, checkfirst=True)
|
|
|
|
def test_drop_all(self):
|
|
insp = inspect(self.engine)
|
|
self.assertEqual(
|
|
set(['a', 'b', 'c', 'd']),
|
|
set(insp.get_table_names())
|
|
)
|
|
|
|
self._get_default_provisioned_db().\
|
|
backend.drop_all_objects(self.engine)
|
|
|
|
insp = inspect(self.engine)
|
|
self.assertEqual(
|
|
[],
|
|
insp.get_table_names()
|
|
)
|
|
|
|
|
|
class BackendNotAvailableTest(test_base.BaseTestCase):
|
|
def test_no_dbapi(self):
|
|
backend = provision.Backend(
|
|
"postgresql", "postgresql+nosuchdbapi://hostname/dsn")
|
|
|
|
with mock.patch(
|
|
"sqlalchemy.create_engine",
|
|
mock.Mock(side_effect=ImportError("nosuchdbapi"))):
|
|
|
|
# NOTE(zzzeek): Call and test the _verify function twice, as it
|
|
# exercises a different code path on subsequent runs vs.
|
|
# the first run
|
|
ex = self.assertRaises(
|
|
exception.BackendNotAvailable,
|
|
backend._verify)
|
|
self.assertEqual(
|
|
"Backend 'postgresql+nosuchdbapi' is unavailable: "
|
|
"No DBAPI installed", str(ex))
|
|
|
|
ex = self.assertRaises(
|
|
exception.BackendNotAvailable,
|
|
backend._verify)
|
|
self.assertEqual(
|
|
"Backend 'postgresql+nosuchdbapi' is unavailable: "
|
|
"No DBAPI installed", str(ex))
|
|
|
|
def test_cant_connect(self):
|
|
backend = provision.Backend(
|
|
"postgresql", "postgresql+nosuchdbapi://hostname/dsn")
|
|
|
|
with mock.patch(
|
|
"sqlalchemy.create_engine",
|
|
mock.Mock(return_value=mock.Mock(connect=mock.Mock(
|
|
side_effect=sa_exc.OperationalError(
|
|
"can't connect", None, None))
|
|
))
|
|
):
|
|
|
|
# NOTE(zzzeek): Call and test the _verify function twice, as it
|
|
# exercises a different code path on subsequent runs vs.
|
|
# the first run
|
|
ex = self.assertRaises(
|
|
exception.BackendNotAvailable,
|
|
backend._verify)
|
|
self.assertEqual(
|
|
"Backend 'postgresql+nosuchdbapi' is unavailable: "
|
|
"Could not connect", str(ex))
|
|
|
|
ex = self.assertRaises(
|
|
exception.BackendNotAvailable,
|
|
backend._verify)
|
|
self.assertEqual(
|
|
"Backend 'postgresql+nosuchdbapi' is unavailable: "
|
|
"Could not connect", str(ex))
|
|
|
|
|
|
class MySQLDropAllObjectsTest(
|
|
DropAllObjectsTest, db_test_base._MySQLOpportunisticTestCase,
|
|
):
|
|
pass
|
|
|
|
|
|
class PostgreSQLDropAllObjectsTest(
|
|
DropAllObjectsTest, db_test_base._PostgreSQLOpportunisticTestCase,
|
|
):
|
|
pass
|
|
|
|
|
|
class RetainSchemaTest(test_base.BaseTestCase):
|
|
DRIVER = "sqlite"
|
|
|
|
def setUp(self):
|
|
super(RetainSchemaTest, self).setUp()
|
|
|
|
metadata = schema.MetaData()
|
|
self.test_table = schema.Table(
|
|
'test_table', metadata,
|
|
schema.Column('x', types.Integer),
|
|
schema.Column('y', types.Integer),
|
|
mysql_engine='InnoDB'
|
|
)
|
|
|
|
def gen_schema(engine):
|
|
metadata.create_all(engine, checkfirst=False)
|
|
self._gen_schema = gen_schema
|
|
|
|
def test_once(self):
|
|
self._run_test()
|
|
|
|
def test_twice(self):
|
|
self._run_test()
|
|
|
|
def _run_test(self):
|
|
try:
|
|
database_resource = provision.DatabaseResource(
|
|
self.DRIVER, provision_new_database=True)
|
|
except exception.BackendNotAvailable:
|
|
self.skipTest("database not available")
|
|
|
|
schema_resource = provision.SchemaResource(
|
|
database_resource, self._gen_schema)
|
|
|
|
schema = schema_resource.getResource()
|
|
|
|
conn = schema.database.engine.connect()
|
|
engine = utils.NonCommittingEngine(conn)
|
|
|
|
with engine.connect() as conn:
|
|
rows = conn.execute(self.test_table.select())
|
|
self.assertEqual([], rows.fetchall())
|
|
|
|
trans = conn.begin()
|
|
conn.execute(
|
|
self.test_table.insert(),
|
|
{"x": 1, "y": 2}
|
|
)
|
|
trans.rollback()
|
|
|
|
rows = conn.execute(self.test_table.select())
|
|
self.assertEqual([], rows.fetchall())
|
|
|
|
trans = conn.begin()
|
|
conn.execute(
|
|
self.test_table.insert(),
|
|
{"x": 2, "y": 3}
|
|
)
|
|
trans.commit()
|
|
|
|
rows = conn.execute(self.test_table.select())
|
|
self.assertEqual([(2, 3)], rows.fetchall())
|
|
|
|
engine._dispose()
|
|
schema_resource.finishedWith(schema)
|
|
|
|
|
|
class MySQLRetainSchemaTest(RetainSchemaTest):
|
|
DRIVER = "mysql"
|
|
|
|
|
|
class PostgresqlRetainSchemaTest(RetainSchemaTest):
|
|
DRIVER = "postgresql"
|
|
|
|
|
|
class AdHocURLTest(test_base.BaseTestCase):
|
|
def test_sqlite_setup_teardown(self):
|
|
|
|
fixture = test_fixtures.AdHocDbFixture("sqlite:///foo.db")
|
|
|
|
fixture.setUp()
|
|
|
|
self.assertEqual(
|
|
str(enginefacade._context_manager._factory._writer_engine.url),
|
|
"sqlite:///foo.db"
|
|
)
|
|
|
|
self.assertTrue(os.path.exists("foo.db"))
|
|
fixture.cleanUp()
|
|
|
|
self.assertFalse(os.path.exists("foo.db"))
|
|
|
|
def test_mysql_setup_teardown(self):
|
|
try:
|
|
mysql_backend = provision.Backend.backend_for_database_type(
|
|
"mysql")
|
|
except exception.BackendNotAvailable:
|
|
self.skipTest("mysql backend not available")
|
|
|
|
mysql_backend.create_named_database("adhoc_test")
|
|
self.addCleanup(
|
|
mysql_backend.drop_named_database, "adhoc_test"
|
|
)
|
|
url = str(mysql_backend.provisioned_database_url("adhoc_test"))
|
|
|
|
fixture = test_fixtures.AdHocDbFixture(url)
|
|
|
|
fixture.setUp()
|
|
|
|
self.assertEqual(
|
|
str(enginefacade._context_manager._factory._writer_engine.url),
|
|
url
|
|
)
|
|
|
|
fixture.cleanUp()
|