Sync latest oslo.db code

Common db code was updated in oslo. The most important thing is that
engine instances don't stored anymore in oslo.db - ce69e7f.
This patch moves methods `get_engine` and `get_session` to module
`heat.db.sqalchemy.api`.

Latest commit in oslo related to db module:
2fd457bf2ccbeb2b84ffb204778b6417cd5405ba

Change-Id: Iaa2e9ba26e824c678c698914170e3dffbf1c5c95
changes/39/76539/7
Andrey Kurilin 9 years ago
parent 155314d236
commit cfcbc994d3

@ -153,14 +153,6 @@
#max_json_body_size=1048576
#
# Options defined in heat.db.api
#
# The backend to use for db. (string value)
#db_backend=sqlalchemy
#
# Options defined in heat.engine.clients
#
@ -179,17 +171,6 @@
#loadbalancer_template=<None>
#
# Options defined in heat.openstack.common.db.sqlalchemy.session
#
# the filename to use with sqlite (string value)
#sqlite_db=heat.sqlite
# If true, use synchronous mode for sqlite (boolean value)
#sqlite_synchronous=true
#
# Options defined in heat.openstack.common.eventlet_backdoor
#
@ -801,30 +782,34 @@
[database]
#
# Options defined in heat.openstack.common.db.api
# Options defined in heat.openstack.common.db.options
#
# The file name to use with SQLite (string value)
#sqlite_db=heat.sqlite
# If True, SQLite uses synchronous mode (boolean value)
#sqlite_synchronous=true
# The backend to use for db (string value)
# Deprecated group/name - [DEFAULT]/db_backend
#backend=sqlalchemy
#
# Options defined in heat.openstack.common.db.sqlalchemy.session
#
# The SQLAlchemy connection string used to connect to the
# database (string value)
# Deprecated group/name - [DEFAULT]/sql_connection
# Deprecated group/name - [DATABASE]/sql_connection
# Deprecated group/name - [sql]/connection
#connection=sqlite:////heat/openstack/common/db/$sqlite_db
#connection=<None>
# The SQLAlchemy connection string used to connect to the
# slave database (string value)
#slave_connection=
# The SQL mode to be used for MySQL sessions. This option,
# including the default, overrides any server-set SQL mode. To
# use whatever SQL mode is set by the server configuration,
# set this to no value. Example: mysql_sql_mode= (string
# value)
#mysql_sql_mode=TRADITIONAL
# timeout before idle sql connections are reaped (integer
# Timeout before idle sql connections are reaped (integer
# value)
# Deprecated group/name - [DEFAULT]/sql_idle_timeout
# Deprecated group/name - [DATABASE]/sql_idle_timeout
@ -843,13 +828,13 @@
# Deprecated group/name - [DATABASE]/sql_max_pool_size
#max_pool_size=<None>
# maximum db connection retries during startup. (setting -1
# Maximum db connection retries during startup. (setting -1
# implies an infinite retry count) (integer value)
# Deprecated group/name - [DEFAULT]/sql_max_retries
# Deprecated group/name - [DATABASE]/sql_max_retries
#max_retries=10
# interval between retries of opening a sql connection
# Interval between retries of opening a sql connection
# (integer value)
# Deprecated group/name - [DEFAULT]/sql_retry_interval
# Deprecated group/name - [DATABASE]/reconnect_interval
@ -876,6 +861,25 @@
# Deprecated group/name - [DATABASE]/sqlalchemy_pool_timeout
#pool_timeout=<None>
# Enable the experimental use of database reconnect on
# connection lost (boolean value)
#use_db_reconnect=false
# seconds between db connection retries (integer value)
#db_retry_interval=1
# Whether to increase interval between db connection retries,
# up to db_max_retry_interval (boolean value)
#db_inc_retry_interval=true
# max seconds between db connection retries, if
# db_inc_retry_interval is enabled (integer value)
#db_max_retry_interval=10
# maximum db connection retries before error is raised.
# (setting -1 implies an infinite retry count) (integer value)
#db_max_retries=20
[ec2authtoken]

@ -32,7 +32,7 @@ CONF = cfg.CONF
def do_db_version():
"""Print database's current migration level."""
print(api.db_version())
print(api.db_version(api.get_engine()))
def do_db_sync():
@ -40,7 +40,7 @@ def do_db_sync():
Place a database under migration control and upgrade,
creating first if necessary.
"""
api.db_sync(CONF.command.version)
api.db_sync(api.get_engine(), CONF.command.version)
def purge_deleted():

@ -28,17 +28,19 @@ from oslo.config import cfg
from heat.openstack.common.db import api as db_api
db_opts = [
cfg.StrOpt('db_backend',
default='sqlalchemy',
help='The backend to use for db.')]
CONF = cfg.CONF
CONF.register_opts(db_opts)
CONF.import_opt('backend', 'heat.openstack.common.db.options',
group='database')
_BACKEND_MAPPING = {'sqlalchemy': 'heat.db.sqlalchemy.api'}
IMPL = db_api.DBAPI(backend_mapping=_BACKEND_MAPPING)
IMPL = db_api.DBAPI(CONF.database.backend, backend_mapping=_BACKEND_MAPPING,
lazy=True)
def get_engine():
return IMPL.get_engine()
def get_session():
@ -261,11 +263,11 @@ def software_deployment_delete(context, deployment_id):
return IMPL.software_deployment_delete(context, deployment_id)
def db_sync(version=None):
def db_sync(engine, version=None):
"""Migrate the database to `version` or the most recent version."""
return IMPL.db_sync(version=version)
return IMPL.db_sync(engine, version=version)
def db_version():
def db_version(engine):
"""Display the current database version."""
return IMPL.db_version()
return IMPL.db_version(engine)

@ -31,13 +31,28 @@ from heat.openstack.common.gettextutils import _
cfg.CONF.import_opt('max_events_per_stack', 'heat.common.config')
get_engine = db_session.get_engine
get_session = db_session.get_session
CONF = cfg.CONF
CONF.import_opt('max_events_per_stack', 'heat.common.config')
CONF.import_opt('connection', 'heat.openstack.common.db.options',
group='database')
_facade = None
def get_facade():
global _facade
if not _facade:
_facade = db_session.EngineFacade(
CONF.database.connection, **dict(CONF.database.iteritems()))
return _facade
get_engine = lambda: get_facade().get_engine()
get_session = lambda: get_facade().get_session()
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
@ -743,11 +758,11 @@ def purge_deleted(age, granularity='days'):
engine.execute(user_creds_del)
def db_sync(version=None):
def db_sync(engine, version=None):
"""Migrate the database to `version` or the most recent version."""
return migration.db_sync(version=version)
return migration.db_sync(engine, version=version)
def db_version():
def db_version(engine):
"""Display the current database version."""
return migration.db_version()
return migration.db_version(engine)

@ -19,19 +19,20 @@ from heat.openstack.common.db.sqlalchemy import migration as oslo_migration
INIT_VERSION = 14
def db_sync(version=None):
def db_sync(engine, version=None):
path = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'migrate_repo')
return oslo_migration.db_sync(path, version, init_version=INIT_VERSION)
return oslo_migration.db_sync(engine, path, version,
init_version=INIT_VERSION)
def db_version():
def db_version(engine):
path = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'migrate_repo')
return oslo_migration.db_version(path, INIT_VERSION)
return oslo_migration.db_version(engine, path, INIT_VERSION)
def db_version_control(version=None):
def db_version_control(engine, version=None):
path = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'migrate_repo')
return oslo_migration.db_version_control(path, version)
return oslo_migration.db_version_control(engine, path, version)

@ -24,11 +24,14 @@ from sqlalchemy.orm.session import Session
from heat.db.sqlalchemy.types import Json
from heat.openstack.common.db.sqlalchemy import models
from heat.openstack.common.db.sqlalchemy import session
from heat.openstack.common import timeutils
BASE = declarative_base()
get_session = session.get_session
def get_session():
from heat.db.sqlalchemy import api as db_api
return db_api.get_session()
class HeatBase(models.ModelBase, models.TimestampMixin):

@ -39,7 +39,7 @@ class LazyPluggable(object):
return getattr(backend, key)
IMPL = LazyPluggable('db_backend',
IMPL = LazyPluggable('backend',
sqlalchemy='heat.db.sqlalchemy.api')

@ -16,43 +16,148 @@
"""Multiple DB API backend support.
Supported configuration options:
The following two parameters are in the 'database' group:
`backend`: DB backend name or full module path to DB backend module.
A DB backend module should implement a method named 'get_backend' which
takes no arguments. The method can return any object that implements DB
API methods.
"""
from oslo.config import cfg
import functools
import logging
import threading
import time
from heat.openstack.common.db import exception
from heat.openstack.common.gettextutils import _LE
from heat.openstack.common import importutils
db_opts = [
cfg.StrOpt('backend',
default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
]
LOG = logging.getLogger(__name__)
def safe_for_db_retry(f):
"""Enable db-retry for decorated function, if config option enabled."""
f.__dict__['enable_retry'] = True
return f
class wrap_db_retry(object):
"""Retry db.api methods, if DBConnectionError() raised
Retry decorated db.api methods. If we enabled `use_db_reconnect`
in config, this decorator will be applied to all db.api functions,
marked with @safe_for_db_retry decorator.
Decorator catchs DBConnectionError() and retries function in a
loop until it succeeds, or until maximum retries count will be reached.
"""
CONF = cfg.CONF
CONF.register_opts(db_opts, 'database')
def __init__(self, retry_interval, max_retries, inc_retry_interval,
max_retry_interval):
super(wrap_db_retry, self).__init__()
self.retry_interval = retry_interval
self.max_retries = max_retries
self.inc_retry_interval = inc_retry_interval
self.max_retry_interval = max_retry_interval
def __call__(self, f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
next_interval = self.retry_interval
remaining = self.max_retries
while True:
try:
return f(*args, **kwargs)
except exception.DBConnectionError as e:
if remaining == 0:
LOG.exception(_LE('DB exceeded retry limit.'))
raise exception.DBError(e)
if remaining != -1:
remaining -= 1
LOG.exception(_LE('DB connection error.'))
# NOTE(vsergeyev): We are using patched time module, so
# this effectively yields the execution
# context to another green thread.
time.sleep(next_interval)
if self.inc_retry_interval:
next_interval = min(
next_interval * 2,
self.max_retry_interval
)
return wrapper
class DBAPI(object):
def __init__(self, backend_mapping=None):
if backend_mapping is None:
backend_mapping = {}
backend_name = CONF.database.backend
# Import the untranslated name if we don't have a
# mapping.
backend_path = backend_mapping.get(backend_name, backend_name)
backend_mod = importutils.import_module(backend_path)
self.__backend = backend_mod.get_backend()
def __init__(self, backend_name, backend_mapping=None, lazy=False,
**kwargs):
"""Initialize the chosen DB API backend.
:param backend_name: name of the backend to load
:type backend_name: str
:param backend_mapping: backend name -> module/class to load mapping
:type backend_mapping: dict
:param lazy: load the DB backend lazily on the first DB API method call
:type lazy: bool
Keyword arguments:
:keyword use_db_reconnect: retry DB transactions on disconnect or not
:type use_db_reconnect: bool
:keyword retry_interval: seconds between transaction retries
:type retry_interval: int
:keyword inc_retry_interval: increase retry interval or not
:type inc_retry_interval: bool
:keyword max_retry_interval: max interval value between retries
:type max_retry_interval: int
:keyword max_retries: max number of retries before an error is raised
:type max_retries: int
"""
self._backend = None
self._backend_name = backend_name
self._backend_mapping = backend_mapping or {}
self._lock = threading.Lock()
if not lazy:
self._load_backend()
self.use_db_reconnect = kwargs.get('use_db_reconnect', False)
self.retry_interval = kwargs.get('retry_interval', 1)
self.inc_retry_interval = kwargs.get('inc_retry_interval', True)
self.max_retry_interval = kwargs.get('max_retry_interval', 10)
self.max_retries = kwargs.get('max_retries', 20)
def _load_backend(self):
with self._lock:
if not self._backend:
# Import the untranslated name if we don't have a mapping
backend_path = self._backend_mapping.get(self._backend_name,
self._backend_name)
backend_mod = importutils.import_module(backend_path)
self._backend = backend_mod.get_backend()
def __getattr__(self, key):
return getattr(self.__backend, key)
if not self._backend:
self._load_backend()
attr = getattr(self._backend, key)
if not hasattr(attr, '__call__'):
return attr
# NOTE(vsergeyev): If `use_db_reconnect` option is set to True, retry
# DB API methods, decorated with @safe_for_db_retry
# on disconnect.
if self.use_db_reconnect and hasattr(attr, 'enable_retry'):
attr = wrap_db_retry(
retry_interval=self.retry_interval,
max_retries=self.max_retries,
inc_retry_interval=self.inc_retry_interval,
max_retry_interval=self.max_retry_interval)(attr)
return attr

@ -17,6 +17,8 @@
"""DB related custom exceptions."""
import six
from heat.openstack.common.gettextutils import _
@ -24,7 +26,7 @@ class DBError(Exception):
"""Wraps an implementation specific exception."""
def __init__(self, inner_exception=None):
self.inner_exception = inner_exception
super(DBError, self).__init__(str(inner_exception))
super(DBError, self).__init__(six.text_type(inner_exception))
class DBDuplicateEntry(DBError):
@ -47,7 +49,7 @@ class DBInvalidUnicodeParameter(Exception):
class DbMigrationError(DBError):
"""Wraps migration specific exception."""
def __init__(self, message=None):
super(DbMigrationError, self).__init__(str(message))
super(DbMigrationError, self).__init__(message)
class DBConnectionError(DBError):

@ -0,0 +1,171 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from oslo.config import cfg
database_opts = [
cfg.StrOpt('sqlite_db',
deprecated_group='DEFAULT',
default='heat.sqlite',
help='The file name to use with SQLite'),
cfg.BoolOpt('sqlite_synchronous',
deprecated_group='DEFAULT',
default=True,
help='If True, SQLite uses synchronous mode'),
cfg.StrOpt('backend',
default='sqlalchemy',
deprecated_name='db_backend',
deprecated_group='DEFAULT',
help='The backend to use for db'),
cfg.StrOpt('connection',
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('mysql_sql_mode',
default='TRADITIONAL',
help='The SQL mode to be used for MySQL sessions. '
'This option, including the default, overrides any '
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
cfg.IntOpt('idle_timeout',
default=3600,
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_idle_timeout',
group='DATABASE'),
cfg.DeprecatedOpt('idle_timeout',
group='sql')],
help='Timeout before idle sql connections are reaped'),
cfg.IntOpt('min_pool_size',
default=1,
deprecated_opts=[cfg.DeprecatedOpt('sql_min_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_min_pool_size',
group='DATABASE')],
help='Minimum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_pool_size',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_pool_size',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_pool_size',
group='DATABASE')],
help='Maximum number of SQL connections to keep open in a '
'pool'),
cfg.IntOpt('max_retries',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_retries',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_max_retries',
group='DATABASE')],
help='Maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('retry_interval',
default=10,
deprecated_opts=[cfg.DeprecatedOpt('sql_retry_interval',
group='DEFAULT'),
cfg.DeprecatedOpt('reconnect_interval',
group='DATABASE')],
help='Interval between retries of opening a sql connection'),
cfg.IntOpt('max_overflow',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sql_max_overflow',
group='DEFAULT'),
cfg.DeprecatedOpt('sqlalchemy_max_overflow',
group='DATABASE')],
help='If set, use this value for max_overflow with sqlalchemy'),
cfg.IntOpt('connection_debug',
default=0,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_debug',
group='DEFAULT')],
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.BoolOpt('connection_trace',
default=False,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection_trace',
group='DEFAULT')],
help='Add python stack traces to SQL as comment strings'),
cfg.IntOpt('pool_timeout',
default=None,
deprecated_opts=[cfg.DeprecatedOpt('sqlalchemy_pool_timeout',
group='DATABASE')],
help='If set, use this value for pool_timeout with sqlalchemy'),
cfg.BoolOpt('use_db_reconnect',
default=False,
help='Enable the experimental use of database reconnect '
'on connection lost'),
cfg.IntOpt('db_retry_interval',
default=1,
help='seconds between db connection retries'),
cfg.BoolOpt('db_inc_retry_interval',
default=True,
help='Whether to increase interval between db connection '
'retries, up to db_max_retry_interval'),
cfg.IntOpt('db_max_retry_interval',
default=10,
help='max seconds between db connection retries, if '
'db_inc_retry_interval is enabled'),
cfg.IntOpt('db_max_retries',
default=20,
help='maximum db connection retries before error is raised. '
'(setting -1 implies an infinite retry count)'),
]
CONF = cfg.CONF
CONF.register_opts(database_opts, 'database')
def set_defaults(sql_connection, sqlite_db, max_pool_size=None,
max_overflow=None, pool_timeout=None):
"""Set defaults for configuration variables."""
cfg.set_defaults(database_opts,
connection=sql_connection,
sqlite_db=sqlite_db)
# Update the QueuePool defaults
if max_pool_size is not None:
cfg.set_defaults(database_opts,
max_pool_size=max_pool_size)
if max_overflow is not None:
cfg.set_defaults(database_opts,
max_overflow=max_overflow)
if pool_timeout is not None:
cfg.set_defaults(database_opts,
pool_timeout=pool_timeout)
def list_opts():
"""Returns a list of oslo.config options available in the library.
The returned list includes all oslo.config options which may be registered
at runtime by the library.
Each element of the list is a tuple. The first element is the name of the
group under which the list of elements in the second element will be
registered. A group name of None corresponds to the [DEFAULT] group in
config files.
The purpose of this is to allow tools like the Oslo sample config file
generator to discover the options exposed to users by this library.
:returns: a list of (group_name, opts) tuples
"""
return [('database', copy.deepcopy(database_opts))]

@ -39,51 +39,19 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import distutils.version as dist_version
import os
import re
import migrate
from migrate.changeset import ansisql
from migrate.changeset.databases import sqlite
from migrate.versioning import util as migrate_util
import sqlalchemy
from sqlalchemy.schema import UniqueConstraint
from heat.openstack.common.db import exception
from heat.openstack.common.db.sqlalchemy import session as db_session
from heat.openstack.common.gettextutils import _ # noqa
@migrate_util.decorator
def patched_with_engine(f, *a, **kw):
url = a[0]
engine = migrate_util.construct_engine(url, **kw)
try:
kw['engine'] = engine
return f(*a, **kw)
finally:
if isinstance(engine, migrate_util.Engine) and engine is not url:
migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine)
engine.dispose()
# TODO(jkoelker) When migrate 0.7.3 is released and nova depends
# on that version or higher, this can be removed
MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3')
if (not hasattr(migrate, '__version__') or
dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION):
migrate_util.with_engine = patched_with_engine
# NOTE(jkoelker) Delay importing migrate until we are patched
from migrate import exceptions as versioning_exceptions
from migrate.versioning import api as versioning_api
from migrate.versioning.repository import Repository
import sqlalchemy
from sqlalchemy.schema import UniqueConstraint
get_engine = db_session.get_engine
from heat.openstack.common.db import exception
from heat.openstack.common.gettextutils import _
def _get_unique_constraints(self, table):
@ -200,11 +168,12 @@ def patch_migrate():
sqlite.SQLiteConstraintGenerator)
def db_sync(abs_path, version=None, init_version=0, sanity_check=True):
def db_sync(engine, abs_path, version=None, init_version=0, sanity_check=True):
"""Upgrade or downgrade a database.
Function runs the upgrade() or downgrade() functions in change scripts.
:param engine: SQLAlchemy engine instance for a given database
:param abs_path: Absolute path to migrate repository.
:param version: Database will upgrade/downgrade until this version.
If None - database will update to the latest
@ -220,19 +189,24 @@ def db_sync(abs_path, version=None, init_version=0, sanity_check=True):
raise exception.DbMigrationError(
message=_("version should be an integer"))
current_version = db_version(abs_path, init_version)
current_version = db_version(engine, abs_path, init_version)
repository = _find_migrate_repo(abs_path)
if sanity_check:
_db_schema_sanity_check()
_db_schema_sanity_check(engine)
if version is None or version > current_version:
return versioning_api.upgrade(get_engine(), repository, version)
return versioning_api.upgrade(engine, repository, version)
else:
return versioning_api.downgrade(get_engine(), repository,
return versioning_api.downgrade(engine, repository,
version)
def _db_schema_sanity_check():
engine = get_engine()
def _db_schema_sanity_check(engine):
"""Ensure all database tables were created with required parameters.
:param engine: SQLAlchemy engine instance for a given database
"""
if engine.name == 'mysql':
onlyutf8_sql = ('SELECT TABLE_NAME,TABLE_COLLATION '
'from information_schema.TABLES '
@ -254,23 +228,23 @@ def _db_schema_sanity_check():
) % ','.join(table_names))
def db_version(abs_path, init_version):
def db_version(engine, abs_path, init_version):
"""Show the current version of the repository.
:param engine: SQLAlchemy engine instance for a given database
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
try:
return versioning_api.db_version(get_engine(), repository)
return versioning_api.db_version(engine, repository)
except versioning_exceptions.DatabaseNotControlledError:
meta = sqlalchemy.MetaData()
engine = get_engine()
meta.reflect(bind=engine)
tables = meta.tables
if len(tables) == 0:
db_version_control(abs_path, init_version)
return versioning_api.db_version(get_engine(), repository)
if len(tables) == 0 or 'alembic_version' in tables:
db_version_control(engine, abs_path, version=init_version)
return versioning_api.db_version(engine, repository)
else:
raise exception.DbMigrationError(
message=_(
@ -279,17 +253,18 @@ def db_version(abs_path, init_version):
"manually."))
def db_version_control(abs_path, version=None):
def db_version_control(engine, abs_path, version=None):
"""Mark a database as under this repository's version control.
Once a database is under version control, schema changes should
only be done via change scripts in this repository.
:param engine: SQLAlchemy engine instance for a given database
:param abs_path: Absolute path to migrate repository
:param version: Initial database version
"""
repository = _find_migrate_repo(abs_path)
versioning_api.version_control(get_engine(), repository, version)
versioning_api.version_control(engine, repository, version)
return version

@ -27,18 +27,16 @@ from sqlalchemy import Column, Integer
from sqlalchemy import DateTime
from sqlalchemy.orm import object_mapper
from heat.openstack.common.db.sqlalchemy import session as sa
from heat.openstack.common import timeutils
class ModelBase(object):
class ModelBase(six.Iterator):
"""Base class for models."""
__table_initialized__ = False
def save(self, session=None):
def save(self, session):
"""Save this object."""
if not session:
session = sa.get_session()
# NOTE(boris-42): This part of code should be look like:
# session.add(self)
# session.flush()
@ -81,10 +79,14 @@ class ModelBase(object):
self._i = iter(columns)
return self
def next(self):
# In Python 3, __next__() has replaced next().
def __next__(self):
n = six.advance_iterator(self._i)
return n, getattr(self, n)
def next(self):
return self.__next__()
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in six.iteritems(values):
@ -111,7 +113,7 @@ class SoftDeleteMixin(object):
deleted_at = Column(DateTime)
deleted = Column(Integer, default=0)
def soft_delete(self, session=None):
def soft_delete(self, session):
"""Mark this object as deleted."""
self.deleted = self.id
self.deleted_at = timeutils.utcnow()

@ -17,6 +17,7 @@
"""Provision test environment for specific DB backends"""
import argparse
import logging
import os
import random
import string
@ -27,23 +28,12 @@ import sqlalchemy
from heat.openstack.common.db import exception as exc
SQL_CONNECTION = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', 'sqlite://')
LOG = logging.getLogger(__name__)
def _gen_credentials(*names):
"""Generate credentials."""
auth_dict = {}
for name in names:
val = ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10))
auth_dict[name] = val
return auth_dict
def _get_engine(uri=SQL_CONNECTION):
def get_engine(uri):
"""Engine creation
By default the uri is SQL_CONNECTION which is admin credentials.
Call the function without arguments to get admin connection. Admin
connection required to create temporary user and database for each
particular test. Otherwise use existing connection to recreate connection
@ -63,50 +53,43 @@ def _execute_sql(engine, sql, driver):
except sqlalchemy.exc.OperationalError:
msg = ('%s does not match database admin '
'credentials or database does not exist.')
raise exc.DBConnectionError(msg % SQL_CONNECTION)
LOG.exception(msg % engine.url)
raise exc.DBConnectionError(msg % engine.url)
def create_database(engine):
"""Provide temporary user and database for each particular test."""
driver = engine.name
auth = _gen_credentials('database', 'user', 'passwd')
sqls = {
'mysql': [
"drop database if exists %(database)s;",
"grant all on %(database)s.* to '%(user)s'@'localhost'"
" identified by '%(passwd)s';",
"create database %(database)s;",
],
'postgresql': [
"drop database if exists %(database)s;",
"drop user if exists %(user)s;",
"create user %(user)s with password '%(passwd)s';",
"create database %(database)s owner %(user)s;",
]
auth = {
'database': ''.join(random.choice(string.ascii_lowercase)
for i in moves.range(10)),
'user': engine.url.username,
'passwd': engine.url.password,
}
sqls = [
"drop database if exists %(database)s;",
"create database %(database)s;"
]
if driver == 'sqlite':
return 'sqlite:////tmp/%s' % auth['database']
try:
sql_rows = sqls[driver]
except KeyError:
elif driver in ['mysql', 'postgresql']:
sql_query = map(lambda x: x % auth, sqls)
_execute_sql(engine, sql_query, driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
sql_query = map(lambda x: x % auth, sql_rows)
_execute_sql(engine, sql_query, driver)
params = auth.copy()
params['backend'] = driver
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
def drop_database(engine, current_uri):
def drop_database(admin_engine, current_uri):
"""Drop temporary database and user after each particular test."""
engine = _get_engine(current_uri)
admin_engine = _get_engine()
engine = get_engine(current_uri)
driver = engine.name
auth = {'database': engine.url.database, 'user': engine.url.username}
@ -115,26 +98,11 @@ def drop_database(engine, current_uri):
os.remove(auth['database'])
except OSError:
pass
return
sqls = {
'mysql': [
"drop database if exists %(database)s;",
"drop user '%(user)s'@'localhost';",
],
'postgresql': [
"drop database if exists %(database)s;",
"drop user if exists %(user)s;",
]
}
try:
sql_rows = sqls[driver]
except KeyError:
elif driver in ['mysql', 'postgresql']:
sql = "drop database if exists %(database)s;"
_execute_sql(admin_engine, [sql % auth], driver)
else:
raise ValueError('Unsupported RDBMS %s' % driver)
sql_query = map(lambda x: x % auth, sql_rows)
_execute_sql(admin_engine, sql_query, driver)
def main():
@ -173,7 +141,9 @@ def main():
args = parser.parse_args()
engine = _get_engine()
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
'sqlite://')
engine = get_engine(connection_string)
which = args.which
if which == "create":

@ -17,33 +17,24 @@
"""Session Handling for SQLAlchemy backend.
Initializing:
* Call set_defaults with the minimal of the following kwargs:
sql_connection, sqlite_db
Example::
session.set_defaults(
sql_connection="sqlite:///var/lib/heat/sqlite.db",
sqlite_db="/var/lib/heat/sqlite.db")
Recommended ways to use sessions within this framework:
* Don't use them explicitly; this is like running with AUTOCOMMIT=1.
model_query() will implicitly use a session when called without one
* Don't use them explicitly; this is like running with ``AUTOCOMMIT=1``.
`model_query()` will implicitly use a session when called without one
supplied. This is the ideal situation because it will allow queries
to be automatically retried if the database connection is interrupted.
Note: Automatic retry will be enabled in a future patch.
.. note:: Automatic retry will be enabled in a future patch.
It is generally fine to issue several queries in a row like this. Even though
they may be run in separate transactions and/or separate sessions, each one
will see the data from the prior calls. If needed, undo- or rollback-like
functionality should be handled at a logical level. For an example, look at
the code around quotas and reservation_rollback().
the code around quotas and `reservation_rollback()`.
Examples:
Examples::
.. code:: python
def get_foo(context, foo):
return (model_query(context, models.Foo).
@ -62,28 +53,29 @@ Recommended ways to use sessions within this framework:
return foo_ref
* Within the scope of a single method, keeping all the reads and writes within
the context managed by a single session. In this way, the session's __exit__
handler will take care of calling flush() and commit() for you.
If using this approach, you should not explicitly call flush() or commit().
Any error within the context of the session will cause the session to emit
a ROLLBACK. Database Errors like IntegrityError will be raised in
session's __exit__ handler, and any try/except within the context managed
by session will not be triggered. And catching other non-database errors in
the session will not trigger the ROLLBACK, so exception handlers should
always be outside the session, unless the developer wants to do a partial
commit on purpose. If the connection is dropped before this is possible,
the database will implicitly roll back the transaction.
* Within the scope of a single method, keep all the reads and writes within
the context managed by a single session. In this way, the session's
`__exit__` handler will take care of calling `flush()` and `commit()` for
you. If using this approach, you should not explicitly call `flush()` or
`commit()`. Any error within the context of the session will cause the
session to emit a `ROLLBACK`. Database errors like `IntegrityError` will be
raised in `session`'s `__exit__` handler, and any try/except within the
context managed by `session` will not be triggered. And catching other
non-database errors in the session will not trigger the ROLLBACK, so
exception handlers should always be outside the session, unless the
developer wants to do a partial commit on purpose. If the connection is
dropped before this is possible, the database will implicitly roll back the
transaction.
Note: statements in the session scope will not be automatically retried.
.. note:: Statements in the session scope will not be automatically retried.
If you create models within the session, they need to be added, but you
do not need to call model.save()
do not need to call `model.save()`:
::
.. code:: python
def create_many_foo(context, foos):
session = get_session()
session = sessionmaker()
with session.begin():
for foo in foos:
foo_ref = models.Foo()
@ -91,7 +83,7 @@ Recommended ways to use sessions within this framework:
session.add(foo_ref)
def update_bar(context, foo_id, newbar):
session = get_session()
session = sessionmaker()
with session.begin():
foo_ref = (model_query(context, models.Foo, session).
filter_by(id=foo_id).
@ -100,11 +92,16 @@ Recommended ways to use sessions within this framework:
filter_by(id=foo_ref['bar_id']).
update({'bar': newbar}))
Note: update_bar is a trivially simple example of using "with session.begin".
Whereas create_many_foo is a good example of when a transaction is needed,
it is always best to use as few queries as possible. The two queries in
update_bar can be better expressed using a single query which avoids
the need for an explicit transaction. It can be expressed like so::
.. note:: `update_bar` is a trivially simple example of using
``with session.begin``. Whereas `create_many_foo` is a good example of
when a transaction is needed, it is always best to use as few queries as
possible.
The two queries in `update_bar` can be better expressed using a single query
which avoids the need for an explicit transaction. It can be expressed like
so:
.. code:: python
def update_bar(context, foo_id, newbar):
subq = (model_query(context, models.Foo.id).
@ -115,21 +112,25 @@ Recommended ways to use sessions within this framework:
filter_by(id=subq.as_scalar()).
update({'bar': newbar}))
For reference, this emits approximately the following SQL statement::
For reference, this emits approximately the following SQL statement:
.. code:: sql
UPDATE bar SET bar = ${newbar}
WHERE id=(SELECT bar_id FROM foo WHERE id = ${foo_id} LIMIT 1);
Note: create_duplicate_foo is a trivially simple example of catching an
exception while using "with session.begin". Here create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. note:: `create_duplicate_foo` is a trivially simple example of catching an
exception while using ``with session.begin``. Here create two duplicate
instances with same primary key, must catch the exception out of context
managed by a single session:
.. code:: python
def create_duplicate_foo(context):
foo1 = models.Foo()
foo2 = models.Foo()
foo1.id = foo2.id = 1
session = get_session()
session = sessionmaker()
try:
with session.begin():
session.add(foo1)
@ -139,7 +140,7 @@ Recommended ways to use sessions within this framework:
* Passing an active session between methods. Sessions should only be passed
to private methods. The private method must use a subtransaction; otherwise
SQLAlchemy will throw an error when you call session.begin() on an existing
SQLAlchemy will throw an error when you call `session.begin()` on an existing
transaction. Public methods should not accept a session parameter and should
not be involved in sessions within the caller's scope.
@ -152,10 +153,10 @@ Recommended ways to use sessions within this framework:
becomes less clear in this situation. When this is needed for code clarity,
it should be clearly documented.
::
.. code:: python
def myfunc(foo):
session = get_session()
session = sessionmaker()
with session.begin():
# do some database things
bar = _private_func(foo, session)
@ -163,7 +164,7 @@ Recommended ways to use sessions within this framework:
def _private_func(foo, session=None):
if not session:
session = get_session()
session = sessionmaker()
with session.begin(subtransaction=True):
# do some other database things
return bar
@ -173,13 +174,13 @@ There are some things which it is best to avoid:
* Don't keep a transaction open any longer than necessary.
This means that your "with session.begin()" block should be as short
This means that your ``with session.begin()`` block should be as short
as possible, while still containing all the related calls for that
transaction.
* Avoid "with_lockmode('UPDATE')" when possible.
* Avoid ``with_lockmode('UPDATE')`` when possible.
In MySQL/InnoDB, when a "SELECT ... FOR UPDATE" query does not match
In MySQL/InnoDB, when a ``SELECT ... FOR UPDATE`` query does not match
any rows, it will take a gap-lock. This is a form of write-lock on the
"gap" where no rows exist, and prevents any other writes to that space.
This can effectively prevent any INSERT into a table by locking the gap
@ -190,15 +191,18 @@ There are some things which it is best to avoid:
number of rows matching a query, and if only one row is returned,
then issue the SELECT FOR UPDATE.
The better long-term solution is to use INSERT .. ON DUPLICATE KEY UPDATE.
The better long-term solution is to use
``INSERT .. ON DUPLICATE KEY UPDATE``.
However, this can not be done until the "deleted" columns are removed and
proper UNIQUE constraints are added to the tables.
Enabling soft deletes:
* To use/enable soft-deletes, the SoftDeleteMixin must be added
to your model class. For example::
* To use/enable soft-deletes, the `SoftDeleteMixin` must be added
to your model class. For example:
.. code:: python
class NovaBase(models.SoftDeleteMixin, models.ModelBase):
pass
@ -206,15 +210,16 @@ Enabling soft deletes:
Efficient use of soft deletes:
* There are two possible ways to mark a record as deleted::
* There are two possible ways to mark a record as deleted:
`model.soft_delete()` and `query.soft_delete()`.
model.soft_delete() and query.soft_delete().
The `model.soft_delete()` method works with a single already-fetched entry.
`query.soft_delete()` makes only one db request for all entries that
correspond to the query.
model.soft_delete() method works with single already fetched entry.
query.soft_delete() makes only one db request for all entries that correspond
to query.
* In almost all cases you should use `query.soft_delete()`. Some examples:
* In almost all cases you should use query.soft_delete(). Some examples::
.. code:: python
def soft_delete_bar():
count = model_query(BarModel).find(some_condition).soft_delete()
@ -223,7 +228,7 @@ Efficient use of soft deletes:
def complex_soft_delete_with_synchronization_bar(session=None):
if session is None:
session = get_session()
session = sessionmaker()
with session.begin(subtransactions=True):
count = (model_query(BarModel).
find(some_condition).
@ -233,24 +238,26 @@ Efficient use of soft deletes:
if count == 0:
raise Exception("0 entries were soft deleted")
* There is only one situation where model.soft_delete() is appropriate: when
* There is only one situation where `model.soft_delete()` is appropriate: when
you fetch a single record, work with it, and mark it as deleted in the same
transaction.
::
.. code:: python
def soft_delete_bar_model():
session = get_session()
session = sessionmaker()
with session.begin():
bar_ref = model_query(BarModel).find(some_condition).first()
# Work with bar_ref
bar_ref.soft_delete(session=session)
However, if you need to work with all entries that correspond to query and
then soft delete them you should use query.soft_delete() method::
then soft delete them you should use the `query.soft_delete()` method:
.. code:: python
def soft_delete_multi_models():
session = get_session()
session = sessionmaker()
with session.begin():
query = (model_query(BarModel, session=session).
find(some_condition))
@ -261,22 +268,22 @@ Efficient use of soft deletes:
# session and these entries are not used after this.
When working with many rows, it is very important to use query.soft_delete,
which issues a single query. Using model.soft_delete(), as in the following
which issues a single query. Using `model.soft_delete()`, as in the following
example, is very inefficient.
::
.. code:: python
for bar_ref in bar_refs:
bar_ref.soft_delete(session=session)
# This will produce count(bar_refs) db requests.
"""
import functools
import os.path
import logging
import re
import time
from oslo.config import cfg
import six
from sqlalchemy import exc as sqla_exc
from sqlalchemy.interfaces import PoolListener
@ -285,151 +292,12 @@ from sqlalchemy.pool import NullPool, StaticPool
from sqlalchemy.sql.expression import literal_column
from heat.openstack.common.db import exception
from heat.openstack.common.gettextutils import _
from heat.openstack.common import log as logging
from heat.openstack.common.gettextutils import _LE, _LW
from heat.openstack.common import timeutils
sqlite_db_opts = [
cfg.StrOpt('sqlite_db',
default='heat.sqlite',
help='the filename to use with sqlite'),
cfg.BoolOpt('sqlite_synchronous',
default=True,
help='If true, use synchronous mode for sqlite'),
]
database_opts = [
cfg.StrOpt('connection',
default='sqlite:///' +
os.path.abspath(os.path.join(os.path.dirname(__file__),
'../', '$sqlite_db')),
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True,
deprecated_opts=[cfg.DeprecatedOpt('sql_connection',
group='DEFAULT'),
cfg.DeprecatedOpt('sql_connection',
group='DATABASE'),
cfg.DeprecatedOpt('connection',
group='sql'), ]),
cfg.StrOpt('slave_connection',
default='',
secret=True,
help='The SQLAlchemy connection string used to connect to the '
'slave database'),
cfg.IntOpt('idle_timeout',
default=3600,