Migrate to Oslo DB code

- engine and session handling code is reused
 - integrity errors caused by duplicating entries
   are handled properly
 - the base class for models is mostly reused
   (the only exception is SoftDeleteMixin, which
    requires changes to existing DB schema; this
    will be implemented in BP db-unique-keys)
 - use common exceptions handling code

Blueprint: db-session-cleanup

Change-Id: I7f5eb53daddc57066af9c73caa633b37a2afd9c2
This commit is contained in:
Roman Podolyaka 2013-05-23 17:55:10 +03:00
parent 77a77456af
commit 7055b78265
11 changed files with 59 additions and 279 deletions

View File

@ -32,11 +32,11 @@ these objects be simple dictionaries.
**Related Flags**
:db_backend: string to lookup in the list of LazyPluggable backends.
`sqlalchemy` is the only supported backend right now.
:backend: string to lookup in the list of LazyPluggable backends.
`sqlalchemy` is the only supported backend right now.
:sql_connection: string specifying the sqlalchemy connection to use, like:
`sqlite:///var/lib/cinder/cinder.sqlite`.
:connection: string specifying the sqlalchemy connection to use, like:
`sqlite:///var/lib/cinder/cinder.sqlite`.
:enable_new_services: when adding a new service to the database, is it in the
pool of available hardware (Default: True)
@ -47,9 +47,15 @@ from oslo.config import cfg
from cinder import exception
from cinder import flags
from cinder import utils
from cinder.openstack.common.db import api as db_api
db_opts = [
# TODO(rpodolyaka): this option is deprecated but still passed to
# LazyPluggable class which doesn't support retrieving
# of options put into groups. Nova's version of this
# class supports this. Perhaps, we should put it to Oslo
# and then reuse here.
cfg.StrOpt('db_backend',
default='sqlalchemy',
help='The backend to use for db'),
@ -69,8 +75,11 @@ db_opts = [
FLAGS = flags.FLAGS
FLAGS.register_opts(db_opts)
IMPL = utils.LazyPluggable('db_backend',
sqlalchemy='cinder.db.sqlalchemy.api')
_BACKEND_MAPPING = {'sqlalchemy': 'cinder.db.sqlalchemy.api'}
IMPL = db_api.DBAPI(backend_mapping=_BACKEND_MAPPING)
class NoMoreTargets(exception.CinderException):

View File

@ -20,6 +20,7 @@
"""Implementation of SQLAlchemy backend."""
import datetime
import sys
import uuid
import warnings
@ -32,9 +33,10 @@ from sqlalchemy.sql import func
from cinder.common import sqlalchemyutils
from cinder import db
from cinder.db.sqlalchemy import models
from cinder.db.sqlalchemy.session import get_session
from cinder import exception
from cinder import flags
from cinder.openstack.common.db import exception as db_exc
from cinder.openstack.common.db.sqlalchemy import session as db_session
from cinder.openstack.common import log as logging
from cinder.openstack.common import timeutils
from cinder.openstack.common import uuidutils
@ -44,6 +46,15 @@ FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
get_engine = db_session.get_engine
get_session = db_session.get_session
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def is_admin_context(context):
"""Indicates if the request context is an administrator."""
@ -1219,6 +1230,7 @@ def snapshot_destroy(context, snapshot_id):
def snapshot_get(context, snapshot_id, session=None):
result = model_query(context, models.Snapshot, session=session,
project_only=True).\
options(joinedload('volume')).\
filter_by(id=snapshot_id).\
first()
@ -1230,14 +1242,18 @@ def snapshot_get(context, snapshot_id, session=None):
@require_admin_context
def snapshot_get_all(context):
return model_query(context, models.Snapshot).all()
return model_query(context, models.Snapshot).\
options(joinedload('snapshot_metadata')).\
all()
@require_context
def snapshot_get_all_for_volume(context, volume_id):
return model_query(context, models.Snapshot, read_deleted='no',
project_only=True).\
filter_by(volume_id=volume_id).all()
filter_by(volume_id=volume_id).\
options(joinedload('snapshot_metadata')).\
all()
@require_context
@ -1245,6 +1261,7 @@ def snapshot_get_all_by_project(context, project_id):
authorize_project_context(context, project_id)
return model_query(context, models.Snapshot).\
filter_by(project_id=project_id).\
options(joinedload('snapshot_metadata')).\
all()
@ -1460,7 +1477,7 @@ def volume_type_create(context, values):
volume_type_ref.update(values)
volume_type_ref.save()
except Exception, e:
raise exception.DBError(e)
raise db_exc.DBError(e)
return volume_type_ref

View File

@ -20,7 +20,7 @@ import distutils.version as dist_version
import os
from cinder.db import migration
from cinder.db.sqlalchemy.session import get_engine
from cinder.db.sqlalchemy.api import get_engine
from cinder import exception
from cinder import flags
from cinder.openstack.common import log as logging

View File

@ -22,15 +22,12 @@ SQLAlchemy models for cinder data.
"""
from sqlalchemy import Column, Integer, String, Text, schema
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import ForeignKey, DateTime, Boolean
from sqlalchemy.orm import relationship, backref, object_mapper
from sqlalchemy.orm import relationship, backref
from cinder.db.sqlalchemy.session import get_session
from cinder import exception
from cinder import flags
from cinder.openstack.common.db.sqlalchemy import models
from cinder.openstack.common import timeutils
@ -38,67 +35,24 @@ FLAGS = flags.FLAGS
BASE = declarative_base()
class CinderBase(object):
class CinderBase(models.TimestampMixin,
models.ModelBase):
"""Base class for Cinder Models."""
__table_args__ = {'mysql_engine': 'InnoDB'}
__table_initialized__ = False
created_at = Column(DateTime, default=timeutils.utcnow)
updated_at = Column(DateTime, onupdate=timeutils.utcnow)
# TODO(rpodolyaka): reuse models.SoftDeleteMixin in the next stage
# of implementing of BP db-cleanup
deleted_at = Column(DateTime)
deleted = Column(Boolean, default=False)
metadata = None
def save(self, session=None):
"""Save this object."""
if not session:
session = get_session()
session.add(self)
try:
session.flush()
except IntegrityError, e:
if str(e).endswith('is not unique'):
raise exception.Duplicate(str(e))
else:
raise
def delete(self, session=None):
"""Delete this object."""
self.deleted = True
self.deleted_at = timeutils.utcnow()
self.save(session=session)
def __setitem__(self, key, value):
setattr(self, key, value)
def __getitem__(self, key):
return getattr(self, key)
def get(self, key, default=None):
return getattr(self, key, default)
def __iter__(self):
self._i = iter(object_mapper(self).columns)
return self
def next(self):
n = self._i.next().name
return n, getattr(self, n)
def update(self, values):
"""Make the model object behave like a dict."""
for k, v in values.iteritems():
setattr(self, k, v)
def iteritems(self):
"""Make the model object behave like a dict.
Includes attributes from joins."""
local = dict(self)
joined = dict([(k, v) for k, v in self.__dict__.iteritems()
if not k[0] == '_'])
local.update(joined)
return local.iteritems()
class Service(BASE, CinderBase):
"""Represents a running service on a host."""
@ -448,6 +402,6 @@ def register_models():
VolumeTypes,
VolumeGlanceMetadata,
)
engine = create_engine(FLAGS.sql_connection, echo=False)
engine = create_engine(FLAGS.database.connection, echo=False)
for model in models:
model.metadata.create_all(engine)

View File

@ -1,151 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Session Handling for SQLAlchemy backend."""
import time
from sqlalchemy.exc import DisconnectionError, OperationalError
import sqlalchemy.interfaces
import sqlalchemy.orm
from sqlalchemy.pool import NullPool, StaticPool
import cinder.exception
import cinder.flags as flags
from cinder.openstack.common import log as logging
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
_ENGINE = None
_MAKER = None
def get_session(autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy session."""
global _MAKER
if _MAKER is None:
engine = get_engine()
_MAKER = get_maker(engine, autocommit, expire_on_commit)
session = _MAKER()
session.query = cinder.exception.wrap_db_error(session.query)
session.flush = cinder.exception.wrap_db_error(session.flush)
return session
def synchronous_switch_listener(dbapi_conn, connection_rec):
"""Switch sqlite connections to non-synchronous mode"""
dbapi_conn.execute("PRAGMA synchronous = OFF")
def ping_listener(dbapi_conn, connection_rec, connection_proxy):
"""
Ensures that MySQL connections checked out of the
pool are alive.
Borrowed from:
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
"""
try:
dbapi_conn.cursor().execute('select 1')
except dbapi_conn.OperationalError, ex:
if ex.args[0] in (2006, 2013, 2014, 2045, 2055):
LOG.warn(_('Got mysql server has gone away: %s'), ex)
raise DisconnectionError("Database server went away")
else:
raise
def is_db_connection_error(args):
"""Return True if error in connecting to db."""
# NOTE(adam_g): This is currently MySQL specific and needs to be extended
# to support Postgres and others.
conn_err_codes = ('2002', '2003', '2006')
for err_code in conn_err_codes:
if args.find(err_code) != -1:
return True
return False
def get_engine():
"""Return a SQLAlchemy engine."""
global _ENGINE
if _ENGINE is None:
connection_dict = sqlalchemy.engine.url.make_url(FLAGS.sql_connection)
engine_args = {
"pool_recycle": FLAGS.sql_idle_timeout,
"echo": False,
'convert_unicode': True,
}
# Map our SQL debug level to SQLAlchemy's options
if FLAGS.sql_connection_debug >= 100:
engine_args['echo'] = 'debug'
elif FLAGS.sql_connection_debug >= 50:
engine_args['echo'] = True
if "sqlite" in connection_dict.drivername:
engine_args["poolclass"] = NullPool
if FLAGS.sql_connection == "sqlite://":
engine_args["poolclass"] = StaticPool
engine_args["connect_args"] = {'check_same_thread': False}
_ENGINE = sqlalchemy.create_engine(FLAGS.sql_connection, **engine_args)
if 'mysql' in connection_dict.drivername:
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
elif "sqlite" in connection_dict.drivername:
if not FLAGS.sqlite_synchronous:
sqlalchemy.event.listen(_ENGINE, 'connect',
synchronous_switch_listener)
try:
_ENGINE.connect()
except OperationalError, e:
if not is_db_connection_error(e.args[0]):
raise
remaining = FLAGS.sql_max_retries
if remaining == -1:
remaining = 'infinite'
while True:
msg = _('SQL connection failed. %s attempts left.')
LOG.warn(msg % remaining)
if remaining != 'infinite':
remaining -= 1
time.sleep(FLAGS.sql_retry_interval)
try:
_ENGINE.connect()
break
except OperationalError, e:
if ((remaining != 'infinite' and remaining == 0) or
not is_db_connection_error(e.args[0])):
raise
return _ENGINE
def get_maker(engine, autocommit=True, expire_on_commit=False):
"""Return a SQLAlchemy sessionmaker using the given engine."""
return sqlalchemy.orm.sessionmaker(bind=engine,
autocommit=autocommit,
expire_on_commit=expire_on_commit)

View File

@ -28,6 +28,7 @@ from oslo.config import cfg
import webob.exc
from cinder import flags
from cinder.openstack.common import exception as com_exception
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
@ -69,28 +70,7 @@ class ProcessExecutionError(IOError):
IOError.__init__(self, message)
class Error(Exception):
pass
class DBError(Error):
"""Wraps an implementation specific exception."""
def __init__(self, inner_exception=None):
self.inner_exception = inner_exception
super(DBError, self).__init__(str(inner_exception))
def wrap_db_error(f):
def _wrap(*args, **kwargs):
try:
return f(*args, **kwargs)
except UnicodeEncodeError:
raise InvalidUnicodeParameter()
except Exception, e:
LOG.exception(_('DB exception wrapped.'))
raise DBError(e)
_wrap.func_name = f.func_name
return _wrap
Error = com_exception.Error
class CinderException(Exception):
@ -196,11 +176,6 @@ class InvalidContentType(Invalid):
message = _("Invalid content type %(content_type)s.")
class InvalidUnicodeParameter(Invalid):
message = _("Invalid Parameter: "
"Unicode is not supported by the current database.")
# Cannot be templated as the error syntax varies.
# msg needs to be constructed when raised.
class InvalidParameterValue(Invalid):

View File

@ -78,15 +78,6 @@ core_opts = [
default=None,
help='Virtualization api connection type : libvirt, xenapi, '
'or fake'),
cfg.StrOpt('sql_connection',
default='sqlite:///$state_path/$sqlite_db',
help='The SQLAlchemy connection string used to connect to the '
'database',
secret=True),
cfg.IntOpt('sql_connection_debug',
default=0,
help='Verbosity of SQL debugging information. 0=None, '
'100=Everything'),
cfg.StrOpt('api_paste_config',
default="api-paste.ini",
help='File name for the paste.deploy config for cinder-api'),
@ -166,22 +157,6 @@ global_opts = [
default=1000,
help='the maximum number of items returned in a single '
'response from a collection resource'),
cfg.StrOpt('sqlite_db',
default='cinder.sqlite',
help='the filename to use with sqlite'),
cfg.BoolOpt('sqlite_synchronous',
default=True,
help='If passed, use synchronous mode for sqlite'),
cfg.IntOpt('sql_idle_timeout',
default=3600,
help='timeout before idle sql connections are reaped'),
cfg.IntOpt('sql_max_retries',
default=10,
help='maximum db connection retries during startup. '
'(setting -1 implies an infinite retry count)'),
cfg.IntOpt('sql_retry_interval',
default=10,
help='interval between retries of opening a sql connection'),
cfg.StrOpt('volume_manager',
default='cinder.volume.manager.VolumeManager',
help='full class name for the Manager for volume'),

View File

@ -40,7 +40,7 @@ setattr(__builtin__, '_', lambda x: x)
import os
import shutil
from cinder.db.sqlalchemy.session import get_engine
from cinder.db.sqlalchemy.api import get_engine
from cinder import flags
FLAGS = flags.FLAGS
@ -49,7 +49,7 @@ _DB = None
def reset_db():
if FLAGS.sql_connection == "sqlite://":
if FLAGS.database.connection == "sqlite://":
engine = get_engine()
engine.dispose()
conn = engine.connect()
@ -66,7 +66,7 @@ def setup():
from cinder.tests import fake_flags
fake_flags.set_defaults(FLAGS)
if FLAGS.sql_connection == "sqlite://":
if FLAGS.database.connection == "sqlite://":
if migration.db_version() > 1:
return
else:
@ -75,7 +75,7 @@ def setup():
return
migration.db_sync()
if FLAGS.sql_connection == "sqlite://":
if FLAGS.database.connection == "sqlite://":
global _DB
engine = get_engine()
conn = engine.connect()

View File

@ -39,7 +39,7 @@ def set_defaults(conf):
conf.set_default('rpc_backend', 'cinder.openstack.common.rpc.impl_fake')
conf.set_default('iscsi_num_targets', 8)
conf.set_default('verbose', True)
conf.set_default('sql_connection', "sqlite://")
conf.set_default('connection', 'sqlite://', group='database')
conf.set_default('sqlite_synchronous', False)
conf.set_default('policy_file', 'cinder/tests/policy.json')
conf.set_default('xiv_proxy', 'cinder.tests.test_xiv.XIVFakeProxyDriver')

View File

@ -19,8 +19,8 @@ Unit Tests for volume types code
import time
from cinder import context
from cinder.db.sqlalchemy import api as db_api
from cinder.db.sqlalchemy import models
from cinder.db.sqlalchemy import session as sql_session
from cinder import exception
from cinder import flags
from cinder.openstack.common import log as logging
@ -75,7 +75,7 @@ class VolumeTypeTestCase(test.TestCase):
def test_get_all_volume_types(self):
"""Ensures that all volume types can be retrieved."""
session = sql_session.get_session()
session = db_api.get_session()
total_volume_types = session.query(models.VolumeTypes).count()
vol_types = volume_types.get_all_types(self.ctxt)
self.assertEqual(total_volume_types, len(vol_types))
@ -92,7 +92,7 @@ class VolumeTypeTestCase(test.TestCase):
def test_default_volume_type_missing_in_db(self):
"""Ensures proper exception raised if default volume type
is not in database."""
session = sql_session.get_session()
session = db_api.get_session()
default_vol_type = volume_types.get_default_volume_type()
self.assertEqual(default_vol_type, {})

View File

@ -25,6 +25,7 @@ from cinder import context
from cinder import db
from cinder import exception
from cinder import flags
from cinder.openstack.common.db import exception as db_exc
from cinder.openstack.common import log as logging
FLAGS = flags.FLAGS
@ -37,7 +38,7 @@ def create(context, name, extra_specs={}):
type_ref = db.volume_type_create(context,
dict(name=name,
extra_specs=extra_specs))
except exception.DBError, e:
except db_exc.DBError as e:
LOG.exception(_('DB error: %s') % e)
raise exception.VolumeTypeCreateFailed(name=name,
extra_specs=extra_specs)