Reorganize DbTestCase to use provisioning completely

This change is the first part in a series of changes
that will allow for full flexibility in database usage
during tests.  The first step is to add more facility
to the oslo.db.sqlalchemy.provision system, implementing
a dispatch system that allows flexibility in creation
and dropping of databases, as well as moving the awareness
of the "openstack_citest" convention into provisioning.
The OpportunisticFixture and OpportunisticTestCase now
fold into DbFixture and DbTestCase, which defers in a simple
way to provision.ProvisionedDatabase for all connectivity.
ProvisionedDatabase in turn decides based on the given
environment as to how an engine should be provisioned
for a given test.

Control of database connectivity remains via the
OS_TEST_DBAPI_ADMIN_CONNECTION environment variable.  When not
set, connectivity defaults to sqlite://, plus those backends
found to be available using "opportunistic" naming conventions.
When the variable is present, it provides a semicolon-delimited
list of URLs, and only those URLs will be used for initial
connectivity.

Future changes will allow provisioning to hold onto a single
database engine per test run, as well as allow a single
test class to run in multiple backend scenarios (e.g. one test
against many different backends).

Change-Id: Ifc02505c4c8ebd4a1ca56e14f76c0989576875c3
Partial-Bug: #1339206
This commit is contained in:
Mike Bayer
2014-07-28 20:10:06 -04:00
parent 8629ddf424
commit 4e19870fd4
6 changed files with 535 additions and 162 deletions

View File

@@ -163,3 +163,11 @@ class InvalidSortKey(Exception):
class ColumnError(Exception):
"""Error raised when no column or an invalid column is found."""
class BackendNotAvailable(Exception):
"""Error raised when a particular database backend is not available
within a test suite.
"""

View File

@@ -15,111 +15,459 @@
"""Provision test environment for specific DB backends"""
import abc
import argparse
import copy
import logging
import os
import random
import re
import string
import six
from six import moves
import sqlalchemy
from sqlalchemy.engine import url as sa_url
from oslo.db import exception as exc
from oslo.db._i18n import _LI
from oslo.db import exception
from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import utils
LOG = logging.getLogger(__name__)
def get_engine(uri):
"""Engine creation
class ProvisionedDatabase(object):
"""Represent a single database node that can be used for testing in
a serialized fashion.
``ProvisionedDatabase`` includes features for full lifecycle management
of a node, in a way that is context-specific. Depending on how the
test environment runs, ``ProvisionedDatabase`` should know if it needs
to create and drop databases or if it is making use of a database that
is maintained by an external process.
Call the function without arguments to get admin connection. Admin
connection required to create temporary database for each
particular test. Otherwise use existing connection to recreate
connection to the temporary database.
"""
return sqlalchemy.create_engine(uri, poolclass=sqlalchemy.pool.NullPool)
def __init__(self, database_type):
self.backend = Backend.backend_for_database_type(database_type)
self.db_token = _random_ident()
self.backend.create_named_database(self.db_token)
self.engine = self.backend.provisioned_engine(self.db_token)
def dispose(self):
self.engine.dispose()
self.backend.drop_named_database(self.db_token)
def _execute_sql(engine, sql, driver):
"""Initialize connection, execute sql query and close it."""
try:
with engine.connect() as conn:
if driver == 'postgresql':
conn.connection.set_isolation_level(0)
for s in sql:
conn.execute(s)
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
LOG.exception(msg, engine.url)
raise exc.DBConnectionError(msg % engine.url)
class Backend(object):
"""Represent a particular database backend that may be provisionable.
The ``Backend`` object maintains a database type (e.g. database without
specific driver type, such as "sqlite", "postgresql", etc.),
a target URL, a base ``Engine`` for that URL object that can be used
to provision databases and a ``BackendImpl`` which knows how to perform
operations against this type of ``Engine``.
def create_database(engine):
"""Provide temporary database for each particular test."""
driver = engine.name
"""
database = ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10))
backends_by_database_type = {}
if driver == 'sqlite':
database = '/tmp/%s' % database
elif driver in ['mysql', 'postgresql']:
sql = 'create database %s;' % database
_execute_sql(engine, [sql], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
def __init__(self, database_type, url):
self.database_type = database_type
self.url = url
self.verified = False
self.engine = None
self.impl = BackendImpl.impl(database_type)
Backend.backends_by_database_type[database_type] = self
# Both shallow and deep copies may lead to surprising behaviour
# without knowing the implementation of sqlalchemy.engine.url.
# Use a shallow copy here, since we're only overriding a single
# property, invoking __str__ and then discarding our copy. This
# is currently safe and _should_ remain safe into the future.
new_url = copy.copy(engine.url)
@classmethod
def backend_for_database_type(cls, database_type):
"""Return and verify the ``Backend`` for the given database type.
new_url.database = database
return str(new_url)
Creates the engine if it does not already exist and raises
``BackendNotAvailable`` if it cannot be produced.
:return: a base ``Engine`` that allows provisioning of databases.
def drop_database(admin_engine, current_uri):
"""Drop temporary database after each particular test."""
:raises: ``BackendNotAvailable``, if an engine for this backend
cannot be produced.
engine = get_engine(current_uri)
driver = engine.name
if driver == 'sqlite':
"""
try:
os.remove(engine.url.database)
except OSError:
pass
elif driver in ['mysql', 'postgresql']:
sql = 'drop database %s;' % engine.url.database
_execute_sql(admin_engine, [sql], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
backend = cls.backends_by_database_type[database_type]
except KeyError:
raise exception.BackendNotAvailable(database_type)
else:
return backend._verify()
@classmethod
def all_viable_backends(cls):
"""Return an iterator of all ``Backend`` objects that are present
and provisionable.
"""
for backend in cls.backends_by_database_type.values():
try:
yield backend._verify()
except exception.BackendNotAvailable:
pass
def _verify(self):
"""Verify that this ``Backend`` is available and provisionable.
:return: this ``Backend``
:raises: ``BackendNotAvailable`` if the backend is not available.
"""
if not self.verified:
try:
eng = self._ensure_backend_available(self.url)
except exception.BackendNotAvailable:
raise
else:
self.engine = eng
finally:
self.verified = True
if self.engine is None:
raise exception.BackendNotAvailable(self.database_type)
return self
@classmethod
def _ensure_backend_available(cls, url):
url = sa_url.make_url(str(url))
try:
eng = sqlalchemy.create_engine(url)
except ImportError as i_e:
# SQLAlchemy performs an "import" of the DBAPI module
# within create_engine(). So if ibm_db_sa, cx_oracle etc.
# isn't installed, we get an ImportError here.
LOG.info(
_LI("The %(dbapi)s backend is unavailable: %(err)s"),
dict(dbapi=url.drivername, err=i_e))
raise exception.BackendNotAvailable("No DBAPI installed")
else:
try:
conn = eng.connect()
except sqlalchemy.exc.DBAPIError as d_e:
# upon connect, SQLAlchemy calls dbapi.connect(). This
# usually raises OperationalError and should always at
# least raise a SQLAlchemy-wrapped DBAPI Error.
LOG.info(
_LI("The %(dbapi)s backend is unavailable: %(err)s"),
dict(dbapi=url.drivername, err=d_e)
)
raise exception.BackendNotAvailable("Could not connect")
else:
conn.close()
return eng
def create_named_database(self, ident):
"""Create a database with the given name."""
self.impl.create_named_database(self.engine, ident)
def drop_named_database(self, ident, conditional=False):
"""Drop a database with the given name."""
self.impl.drop_named_database(
self.engine, ident,
conditional=conditional)
def database_exists(self, ident):
"""Return True if a database of the given name exists."""
return self.impl.database_exists(self.engine, ident)
def provisioned_engine(self, ident):
"""Given the URL of a particular database backend and the string
name of a particular 'database' within that backend, return
an Engine instance whose connections will refer directly to the
named database.
For hostname-based URLs, this typically involves switching just the
'database' portion of the URL with the given name and creating
an engine.
For URLs that instead deal with DSNs, the rules may be more custom;
for example, the engine may need to connect to the root URL and
then emit a command to switch to the named database.
"""
return self.impl.provisioned_engine(self.url, ident)
@classmethod
def _setup(cls):
"""Initial startup feature will scan the environment for configured
URLs and place them into the list of URLs we will use for provisioning.
This searches through OS_TEST_DBAPI_ADMIN_CONNECTION for URLs. If
not present, we set up URLs based on the "opportunstic" convention,
e.g. username+password = "openstack_citest".
The provisioning system will then use or discard these URLs as they
are requested, based on whether or not the target database is actually
found to be available.
"""
configured_urls = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', None)
if configured_urls:
configured_urls = configured_urls.split(";")
else:
configured_urls = [
impl.create_opportunistic_driver_url()
for impl in BackendImpl.all_impls()
]
for url_str in configured_urls:
url = sa_url.make_url(url_str)
m = re.match(r'([^+]+?)(?:\+(.+))?$', url.drivername)
database_type, drivertype = m.group(1, 2)
Backend(database_type, url)
def main():
"""Controller to handle commands
@six.add_metaclass(abc.ABCMeta)
class BackendImpl(object):
"""Provide database-specific implementations of key provisioning
functions.
``BackendImpl`` is owned by a ``Backend`` instance which delegates
to it for all database-specific features.
"""
@classmethod
def all_impls(cls):
"""Return an iterator of all possible BackendImpl objects.
These are BackendImpls that are implemented, but not
necessarily provisionable.
"""
for database_type in cls.impl.reg:
if database_type == '*':
continue
yield BackendImpl.impl(database_type)
@utils.dispatch_for_dialect("*")
def impl(drivername):
"""Return a ``BackendImpl`` instance corresponding to the
given driver name.
This is a dispatched method which will refer to the constructor
of implementing subclasses.
"""
raise NotImplementedError(
"No provision impl available for driver: %s" % drivername)
def __init__(self, drivername):
self.drivername = drivername
@abc.abstractmethod
def create_opportunistic_driver_url(self):
"""Produce a string url known as the 'opportunistic' URL.
This URL is one that corresponds to an established Openstack
convention for a pre-established database login, which, when
detected as available in the local environment, is automatically
used as a test platform for a specific type of driver.
"""
@abc.abstractmethod
def create_named_database(self, engine, ident):
"""Create a database with the given name."""
@abc.abstractmethod
def drop_named_database(self, engine, ident, conditional=False):
"""Drop a database with the given name."""
def provisioned_engine(self, base_url, ident):
"""Return a provisioned engine.
Given the URL of a particular database backend and the string
name of a particular 'database' within that backend, return
an Engine instance whose connections will refer directly to the
named database.
For hostname-based URLs, this typically involves switching just the
'database' portion of the URL with the given name and creating
an engine.
For URLs that instead deal with DSNs, the rules may be more custom;
for example, the engine may need to connect to the root URL and
then emit a command to switch to the named database.
"""
url = sa_url.make_url(str(base_url))
url.database = ident
return session.create_engine(
url,
logging_name="%s@%s" % (self.drivername, ident))
@BackendImpl.impl.dispatch_for("mysql")
class MySQLBackendImpl(BackendImpl):
def create_opportunistic_driver_url(self):
return "mysql://openstack_citest:openstack_citest@localhost/"
def create_named_database(self, engine, ident):
with engine.connect() as conn:
conn.execute("CREATE DATABASE %s" % ident)
def drop_named_database(self, engine, ident, conditional=False):
with engine.connect() as conn:
if not conditional or self.database_exists(conn, ident):
conn.execute("DROP DATABASE %s" % ident)
def database_exists(self, engine, ident):
return bool(engine.scalar("SHOW DATABASES LIKE '%s'" % ident))
@BackendImpl.impl.dispatch_for("sqlite")
class SQLiteBackendImpl(BackendImpl):
def create_opportunistic_driver_url(self):
return "sqlite://"
def create_named_database(self, engine, ident):
url = self._provisioned_database_url(engine.url, ident)
eng = sqlalchemy.create_engine(url)
eng.connect().close()
def provisioned_engine(self, base_url, ident):
return session.create_engine(
self._provisioned_database_url(base_url, ident))
def drop_named_database(self, engine, ident, conditional=False):
url = self._provisioned_database_url(engine.url, ident)
filename = url.database
if filename and (not conditional or os.access(filename, os.F_OK)):
os.remove(filename)
def database_exists(self, engine, ident):
url = self._provisioned_database_url(engine.url, ident)
filename = url.database
return not filename or os.access(filename, os.F_OK)
def _provisioned_database_url(self, base_url, ident):
if base_url.database:
return sa_url.make_url("sqlite:////tmp/%s.db" % ident)
else:
return base_url
@BackendImpl.impl.dispatch_for("postgresql")
class PostgresqlBackendImpl(BackendImpl):
def create_opportunistic_driver_url(self):
return "postgresql://openstack_citest:openstack_citest"\
"@localhost/postgres"
def create_named_database(self, engine, ident):
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT") as conn:
conn.execute("CREATE DATABASE %s" % ident)
def drop_named_database(self, engine, ident, conditional=False):
with engine.connect().execution_options(
isolation_level="AUTOCOMMIT") as conn:
self._close_out_database_users(conn, ident)
if conditional:
conn.execute("DROP DATABASE IF EXISTS %s" % ident)
else:
conn.execute("DROP DATABASE %s" % ident)
def database_exists(self, engine, ident):
return bool(
engine.scalar(
sqlalchemy.text(
"select datname from pg_database "
"where datname=:name"), name=ident)
)
def _close_out_database_users(self, conn, ident):
"""Attempt to guarantee a database can be dropped.
Optional feature which guarantees no connections with our
username are attached to the DB we're going to drop.
This method has caveats; for one, the 'pid' column was named
'procpid' prior to Postgresql 9.2. But more critically,
prior to 9.2 this operation required superuser permissions,
even if the connections we're closing are under the same username
as us. In more recent versions this restriction has been
lifted for same-user connections.
"""
if conn.dialect.server_version_info >= (9, 2):
conn.execute(
sqlalchemy.text(
"select pg_terminate_backend(pid) "
"from pg_stat_activity "
"where usename=current_user and "
"pid != pg_backend_pid() "
"and datname=:dname"
), dname=ident)
def _random_ident():
return ''.join(
random.choice(string.ascii_lowercase)
for i in moves.range(10))
def _echo_cmd(args):
idents = [_random_ident() for i in moves.range(args.instances_count)]
print("\n".join(idents))
def _create_cmd(args):
idents = [_random_ident() for i in moves.range(args.instances_count)]
for backend in Backend.all_viable_backends():
for ident in idents:
backend.create_named_database(ident)
print("\n".join(idents))
def _drop_cmd(args):
for backend in Backend.all_viable_backends():
for ident in args.instances:
backend.drop_named_database(ident, args.conditional)
Backend._setup()
def main(argv=None):
"""Command line interface to create/drop databases.
::create: Create test database with random names.
::drop: Drop database created by previous command.
::echo: create random names and display them; don't create.
"""
parser = argparse.ArgumentParser(
description='Controller to handle database creation and dropping'
' commands.',
epilog='Under normal circumstances is not used directly.'
' Used in .testr.conf to automate test database creation'
' and dropping processes.')
epilog='Typically called by the test runner, e.g. shell script, '
'testr runner via .testr.conf, or other system.')
subparsers = parser.add_subparsers(
help='Subcommands to manipulate temporary test databases.')
create = subparsers.add_parser(
'create',
help='Create temporary test databases.')
create.set_defaults(which='create')
create.set_defaults(which=_create_cmd)
create.add_argument(
'instances_count',
type=int,
@@ -128,25 +476,31 @@ def main():
drop = subparsers.add_parser(
'drop',
help='Drop temporary test databases.')
drop.set_defaults(which='drop')
drop.set_defaults(which=_drop_cmd)
drop.add_argument(
'instances',
nargs='+',
help='List of databases uri to be dropped.')
drop.add_argument(
'--conditional',
action="store_true",
help="Check if database exists first before dropping"
)
args = parser.parse_args()
echo = subparsers.add_parser(
'echo',
help="Create random database names and display only."
)
echo.set_defaults(which=_echo_cmd)
echo.add_argument(
'instances_count',
type=int,
help='Number of identifiers to create.')
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
'sqlite://')
engine = get_engine(connection_string)
which = args.which
args = parser.parse_args(argv)
if which == "create":
for i in range(int(args.instances_count)):
print(create_database(engine))
elif which == "drop":
for db in args.instances:
drop_database(engine, db)
cmd = args.which
cmd(args)
if __name__ == "__main__":

View File

@@ -368,7 +368,7 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
connection_debug=0, max_pool_size=None, max_overflow=None,
pool_timeout=None, sqlite_synchronous=True,
connection_trace=False, max_retries=10, retry_interval=10,
thread_checkin=True):
thread_checkin=True, logging_name=None):
"""Return a new SQLAlchemy engine."""
url = sqlalchemy.engine.url.make_url(sql_connection)
@@ -377,6 +377,7 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
"pool_recycle": idle_timeout,
'convert_unicode': True,
'connect_args': {},
'logging_name': logging_name
}
_setup_logging(connection_debug)

View File

@@ -13,9 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import os
import fixtures
try:
@@ -24,9 +21,10 @@ except ImportError:
raise NameError('Oslotest is not installed. Please add oslotest in your'
' test-requirements')
import six
import testtools
import six
from oslo.db import exception
from oslo.db.sqlalchemy import provision
from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import utils
@@ -41,8 +39,12 @@ class DbFixture(fixtures.Fixture):
credentials for specific backend.
"""
def _get_uri(self):
return os.getenv('OS_TEST_DBAPI_CONNECTION', 'sqlite://')
DRIVER = "sqlite"
# these names are deprecated, and are not used by DbFixture.
# they are here for backwards compatibility with test suites that
# are referring to them directly.
DBNAME = PASSWORD = USERNAME = 'openstack_citest'
def __init__(self, test):
super(DbFixture, self).__init__()
@@ -52,9 +54,17 @@ class DbFixture(fixtures.Fixture):
def setUp(self):
super(DbFixture, self).setUp()
self.test.engine = session.create_engine(self._get_uri())
self.addCleanup(self.test.engine.dispose)
self.test.sessionmaker = session.get_maker(self.test.engine)
try:
self.provision = provision.ProvisionedDatabase(self.DRIVER)
self.addCleanup(self.provision.dispose)
except exception.BackendNotAvailable:
msg = '%s backend is not available.' % self.DRIVER
return self.test.skip(msg)
else:
self.test.engine = self.provision.engine
self.addCleanup(setattr, self.test, 'engine', None)
self.test.sessionmaker = session.get_maker(self.test.engine)
self.addCleanup(setattr, self.test, 'sessionmaker', None)
class DbTestCase(test_base.BaseTestCase):
@@ -72,6 +82,9 @@ class DbTestCase(test_base.BaseTestCase):
self.useFixture(self.FIXTURE(self))
class OpportunisticTestCase(DbTestCase):
"""Placeholder for backwards compatibility."""
ALLOWED_DIALECTS = ['sqlite', 'mysql', 'postgresql']
@@ -98,64 +111,12 @@ def backend_specific(*dialects):
return wrap
@six.add_metaclass(abc.ABCMeta)
class OpportunisticFixture(DbFixture):
"""Base fixture to use default CI databases.
The databases exist in OpenStack CI infrastructure. But for the
correct functioning in local environment the databases must be
created manually.
"""
DRIVER = abc.abstractproperty(lambda: None)
DBNAME = PASSWORD = USERNAME = 'openstack_citest'
_uri = None
def _get_uri(self):
if self._uri is not None:
return self._uri
credentials = {
'backend': self.DRIVER,
'user': self.USERNAME,
'passwd': self.PASSWORD,
'database': self.DBNAME}
if self.DRIVER and not utils.is_backend_avail(**credentials):
msg = '%s backend is not available.' % self.DRIVER
raise testtools.testcase.TestSkipped(msg)
self._provisioning_engine = provision.get_engine(
utils.get_connect_string(backend=self.DRIVER,
user=self.USERNAME,
passwd=self.PASSWORD,
database=self.DBNAME)
)
self._uri = provision.create_database(self._provisioning_engine)
self.addCleanup(
provision.drop_database, self._provisioning_engine, self._uri)
self.addCleanup(setattr, self, '_uri', None)
return self._uri
@six.add_metaclass(abc.ABCMeta)
class OpportunisticTestCase(DbTestCase):
"""Base test case to use default CI databases.
The subclasses of the test case are running only when openstack_citest
database is available otherwise tests will be skipped.
"""
FIXTURE = abc.abstractproperty(lambda: None)
class MySQLOpportunisticFixture(OpportunisticFixture):
class MySQLOpportunisticFixture(DbFixture):
DRIVER = 'mysql'
DBNAME = '' # connect to MySQL server, but not to the openstack_citest db
class PostgreSQLOpportunisticFixture(OpportunisticFixture):
class PostgreSQLOpportunisticFixture(DbFixture):
DRIVER = 'postgresql'
DBNAME = 'postgres' # PostgreSQL requires the db name here,use service one
class MySQLOpportunisticTestCase(OpportunisticTestCase):

View File

@@ -649,6 +649,11 @@ def get_connect_string(backend, database, user=None, passwd=None,
Try to get a connection with a very specific set of values, if we get
these then we'll run the tests, otherwise they are skipped
DEPRECATED: this function is deprecated and will be removed from oslo.db
in a few releases. Please use the provisioning system for dealing
with URLs and database provisioning.
"""
args = {'backend': backend,
'user': user,
@@ -663,22 +668,25 @@ def get_connect_string(backend, database, user=None, passwd=None,
def is_backend_avail(backend, database, user=None, passwd=None):
"""Return True if the given backend is available.
DEPRECATED: this function is deprecated and will be removed from oslo.db
in a few releases. Please use the provisioning system to access
databases based on backend availability.
"""
from oslo.db.sqlalchemy import provision
connect_uri = get_connect_string(backend=backend,
database=database,
user=user,
passwd=passwd)
try:
connect_uri = get_connect_string(backend=backend,
database=database,
user=user,
passwd=passwd)
engine = sqlalchemy.create_engine(connect_uri)
connection = engine.connect()
except Exception as e:
# intentionally catch all to handle exceptions even if we don't
# have any backend code loaded.
msg = _LI("The %(backend)s backend is unavailable: %(exception)s")
LOG.info(msg, {"backend": backend, "exception": e})
provision.Backend._ensure_backend_available(connect_uri)
except exception.BackendNotAvailable:
return False
else:
connection.close()
engine.dispose()
return True

View File

@@ -26,13 +26,15 @@ from sqlalchemy.dialects import mysql
from sqlalchemy import Boolean, Index, Integer, DateTime, String, SmallInteger
from sqlalchemy import MetaData, Table, Column, ForeignKey
from sqlalchemy.engine import reflection
from sqlalchemy.exc import ResourceClosedError
from sqlalchemy.engine import url as sa_url
from sqlalchemy.exc import OperationalError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import select
from sqlalchemy.types import UserDefinedType, NullType
from oslo.db import exception
from oslo.db.sqlalchemy import models
from oslo.db.sqlalchemy import provision
from oslo.db.sqlalchemy import session
from oslo.db.sqlalchemy import test_base as db_test_base
from oslo.db.sqlalchemy import utils
@@ -537,19 +539,58 @@ class TestConnectionUtils(test_utils.BaseTestCase):
def test_is_backend_unavail(self):
log = self.useFixture(fixtures.FakeLogger())
error_cause = ('This result object does not return rows. It has been'
'closed automatically.')
error_msg = ("The %s backend is unavailable: %s\n" %
('mysql', error_cause))
err = OperationalError("Can't connect to database", None, None)
error_msg = "The mysql backend is unavailable: %s\n" % err
self.mox.StubOutWithMock(sqlalchemy.engine.base.Engine, 'connect')
sqlalchemy.engine.base.Engine.connect().AndRaise(
ResourceClosedError(error_cause))
sqlalchemy.engine.base.Engine.connect().AndRaise(err)
self.mox.ReplayAll()
self.assertFalse(utils.is_backend_avail(**self.full_credentials))
self.assertEqual(error_msg, log.output)
def test_ensure_backend_available(self):
self.mox.StubOutWithMock(sqlalchemy.engine.base.Engine, 'connect')
fake_connection = self.mox.CreateMockAnything()
fake_connection.close()
sqlalchemy.engine.base.Engine.connect().AndReturn(fake_connection)
self.mox.ReplayAll()
eng = provision.Backend._ensure_backend_available(self.connect_string)
self.assertIsInstance(eng, sqlalchemy.engine.base.Engine)
self.assertEqual(self.connect_string, str(eng.url))
def test_ensure_backend_available_no_connection_raises(self):
log = self.useFixture(fixtures.FakeLogger())
err = OperationalError("Can't connect to database", None, None)
self.mox.StubOutWithMock(sqlalchemy.engine.base.Engine, 'connect')
sqlalchemy.engine.base.Engine.connect().AndRaise(err)
self.mox.ReplayAll()
exc = self.assertRaises(
exception.BackendNotAvailable,
provision.Backend._ensure_backend_available, self.connect_string
)
self.assertEqual("Could not connect", str(exc))
self.assertEqual(
"The mysql backend is unavailable: %s" % err,
log.output.strip())
def test_ensure_backend_available_no_dbapi_raises(self):
log = self.useFixture(fixtures.FakeLogger())
self.mox.StubOutWithMock(sqlalchemy, 'create_engine')
sqlalchemy.create_engine(
sa_url.make_url(self.connect_string)).AndRaise(
ImportError("Can't import DBAPI module foobar"))
self.mox.ReplayAll()
exc = self.assertRaises(
exception.BackendNotAvailable,
provision.Backend._ensure_backend_available, self.connect_string
)
self.assertEqual("No DBAPI installed", str(exc))
self.assertEqual(
"The mysql backend is unavailable: Can't import "
"DBAPI module foobar", log.output.strip())
def test_get_db_connection_info(self):
conn_pieces = parse.urlparse(self.connect_string)
self.assertEqual(utils.get_db_connection_info(conn_pieces),