Sync the latest DB code from oslo-incubator

Update includes a lot of changes from oslo. Previous update was
on Dec 9, 2013.

Sahara code changed because of oslo.db API change.

The main change in API is that oslo.db no longer stores SQLAlchemy
Engine and sessionmaker instances globally and it's up to applications to
create them.

Change-Id: I3e90739ab96fa8c4c718f22542df8460cb052ad9
This commit is contained in:
Andrew Lazarev
2014-05-29 13:58:37 -07:00
parent 32c0a0f073
commit 0e69ae82b7
20 changed files with 1257 additions and 518 deletions

View File

@@ -39,11 +39,14 @@ from sahara.openstack.common import log as logging
CONF = cfg.CONF
CONF.import_opt('backend', 'sahara.openstack.common.db.options',
group='database')
_BACKEND_MAPPING = {
'sqlalchemy': 'sahara.db.sqlalchemy.api',
}
IMPL = db_api.DBAPI(backend_mapping=_BACKEND_MAPPING)
IMPL = db_api.DBAPI(CONF.database.backend, backend_mapping=_BACKEND_MAPPING)
LOG = logging.getLogger(__name__)

View File

@@ -36,4 +36,5 @@ class Base(object):
def is_mysql_avail():
return CONF.database.connection.startswith('mysql')
connection = CONF.database.connection
return connection and connection.startswith('mysql')

View File

@@ -30,11 +30,38 @@ from sahara.openstack.common import log as logging
LOG = logging.getLogger(__name__)
get_engine = db_session.get_engine
get_session = db_session.get_session
CONF = cfg.CONF
_FACADE = None
def _create_facade_lazily():
global _FACADE
if _FACADE is None:
params = dict(CONF.database.iteritems())
params["sqlite_fk"] = True
_FACADE = db_session.EngineFacade(
CONF.database.connection,
**params
)
return _FACADE
def get_engine():
facade = _create_facade_lazily()
return facade.get_engine()
def get_session(**kwargs):
facade = _create_facade_lazily()
return facade.get_session(**kwargs)
def cleanup():
global _FACADE
_FACADE = None
def get_backend():
"""The backend is this module itself."""
@@ -72,7 +99,7 @@ def count_query(model, context, session=None, project_only=None):
def setup_db():
try:
engine = db_session.get_engine(sqlite_fk=True)
engine = get_engine()
m.Cluster.metadata.create_all(engine)
except sa.exc.OperationalError as e:
LOG.exception("Database registration exception: %s", e)
@@ -82,7 +109,7 @@ def setup_db():
def drop_db():
try:
engine = db_session.get_engine(sqlite_fk=True)
engine = get_engine()
m.Cluster.metadata.drop_all(engine)
except Exception as e:
LOG.exception("Database shutdown exception: %s", e)
@@ -400,11 +427,13 @@ def node_group_template_create(context, values):
node_group_template = m.NodeGroupTemplate()
node_group_template.update(values)
try:
node_group_template.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for NodeGroupTemplate: %s"
% e.columns)
session = get_session()
with session.begin():
try:
node_group_template.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for NodeGroupTemplate: "
"%s" % e.columns)
return node_group_template
@@ -442,11 +471,13 @@ def data_source_create(context, values):
data_source = m.DataSource()
data_source.update(values)
try:
data_source.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for DataSource: %s"
% e.columns)
session = get_session()
with session.begin():
try:
data_source.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for DataSource: %s"
% e.columns)
return data_source
@@ -494,7 +525,7 @@ def job_execution_create(context, values):
job_ex = m.JobExecution()
job_ex.update(values)
try:
job_ex.save()
job_ex.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobExecution: %s"
% e.columns)
@@ -635,11 +666,13 @@ def job_binary_create(context, values):
job_binary = m.JobBinary()
job_binary.update(values)
try:
job_binary.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobBinary: %s"
% e.columns)
session = get_session()
with session.begin():
try:
job_binary.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobBinary: %s"
% e.columns)
return job_binary
@@ -728,11 +761,13 @@ def job_binary_internal_create(context, values):
job_binary_int = m.JobBinaryInternal()
job_binary_int.update(values)
try:
job_binary_int.save()
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobBinaryInternal: %s"
% e.columns)
session = get_session()
with session.begin():
try:
job_binary_int.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise ex.DBDuplicateEntry("Duplicate entry for JobBinaryInternal: "
"%s" % e.columns)
return job_binary_internal_get(context, job_binary_int.id)