Refactoring DB access layer

* Breaking DB api and DB models into separate versions
* Fixing unit tests and dependent code
* Preparing DB models for the new API/engine

TODO:
* Adjust v2 DB models according to the new spec

Change-Id: I763cc3f401b8040a182733750ce05577653e1d35
This commit is contained in:
Renat Akhmerov 2014-08-01 16:30:00 +07:00
parent f997ca183d
commit 3b8e451d87
26 changed files with 961 additions and 250 deletions

View File

@ -110,6 +110,7 @@ def _has_action_context_param(action_cls):
return _ACTION_CTX_PARAM in arg_spec.args
# TODO(rakhmerov): It's not used anywhere.
def _create_adhoc_action(db_task, openstack_context):
task_spec = spec_parser.get_task_spec(db_task['task_spec'])
@ -145,6 +146,7 @@ def _create_adhoc_action(db_task, openstack_context):
**action_params)
# TODO(rakhmerov): It's not used anywhere. Remove it later.
def create_action(db_task):
task_spec = spec_parser.get_task_spec(db_task['task_spec'])

View File

@ -268,6 +268,7 @@ class SSHAction(base.Action):
return raise_exc(parent_exc=e)
# TODO(rakhmerov): It's not used anywhere. Remove it later.
class AdHocAction(base.Action):
def __init__(self, action_context,
base_action_cls, action_spec, **params):

View File

@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(rakhmerov): Module deprecated in favor of v1/api.py
from oslo.db import api as db_api
from mistral import exceptions
@ -22,7 +24,7 @@ from mistral.openstack.common import log as logging
# Workbooks
_BACKEND_MAPPING = {
'sqlalchemy': 'mistral.db.sqlalchemy.api',
'sqlalchemy': 'mistral.db.v1.sqlalchemy.api',
}
IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING)

View File

@ -0,0 +1,163 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from oslo.db import options
from oslo.db.sqlalchemy import session as db_session
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral import utils
LOG = logging.getLogger(__name__)
options.set_defaults(cfg.CONF, sqlite_db="mistral.sqlite")
_DB_SESSION_THREAD_LOCAL_NAME = "db_sql_alchemy_session"
_facade = None
def _get_facade():
global _facade
if not _facade:
_facade = db_session.EngineFacade(
cfg.CONF.database.connection, sqlite_fk=True, autocommit=False,
**dict(cfg.CONF.database.iteritems()))
return _facade
def get_engine():
return _get_facade().get_engine()
def _get_session():
return _get_facade().get_session()
def _get_thread_local_session():
return utils.get_thread_local(_DB_SESSION_THREAD_LOCAL_NAME)
def _get_or_create_thread_local_session():
ses = _get_thread_local_session()
if ses:
return ses, False
ses = _get_session()
_set_thread_local_session(ses)
return ses, True
def _set_thread_local_session(session):
utils.set_thread_local(_DB_SESSION_THREAD_LOCAL_NAME, session)
def session_aware(param_name="session"):
"""Decorator for methods working within db session."""
def _decorator(func):
def _within_session(*args, **kw):
# If 'created' flag is True it means that the transaction is
# demarcated explicitly outside this module.
ses, created = _get_or_create_thread_local_session()
try:
kw[param_name] = ses
result = func(*args, **kw)
if created:
ses.commit()
return result
except Exception:
if created:
ses.rollback()
raise
finally:
if created:
_set_thread_local_session(None)
ses.close()
_within_session.__doc__ = func.__doc__
return _within_session
return _decorator
# Transaction management.
def start_tx():
"""Opens new database session and starts new transaction assuming
there wasn't any opened sessions within the same thread.
"""
ses = _get_thread_local_session()
if ses:
raise exc.DataAccessException("Database transaction has already been"
" started.")
_set_thread_local_session(_get_session())
def commit_tx():
"""Commits previously started database transaction."""
ses = _get_thread_local_session()
if not ses:
raise exc.DataAccessException("Nothing to commit. Database transaction"
" has not been previously started.")
ses.commit()
def rollback_tx():
"""Rolls back previously started database transaction."""
ses = _get_thread_local_session()
if not ses:
raise exc.DataAccessException("Nothing to roll back. Database"
" transaction has not been started.")
ses.rollback()
def end_tx():
"""Ends current database transaction.
It rolls back all uncommitted changes and closes database session.
"""
ses = _get_thread_local_session()
if not ses:
raise exc.DataAccessException("Database transaction has not been"
" started.")
if ses.dirty:
ses.rollback()
ses.close()
_set_thread_local_session(None)
@session_aware()
def model_query(model, session=None):
"""Query helper.
:param model: base model to query
"""
return session.query(model)

View File

@ -14,11 +14,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
from oslo.db.sqlalchemy import models as oslo_models
import sqlalchemy as sa
from sqlalchemy.ext import declarative
from sqlalchemy.orm import attributes
def _generate_unicode_uuid():
return unicode(str(uuid.uuid4()))
def _id_column():
return sa.Column(sa.String(36),
primary_key=True,
default=_generate_unicode_uuid)
class _MistralBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
"""Base class for all Mistral SQLAlchemy DB Models."""

View File

183
mistral/db/v1/api.py Normal file
View File

@ -0,0 +1,183 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.db import api as db_api
from mistral import exceptions
from mistral.openstack.common import log as logging
# Workbooks
_BACKEND_MAPPING = {
'sqlalchemy': 'mistral.db.v1.sqlalchemy.api',
}
IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING)
LOG = logging.getLogger(__name__)
def setup_db():
IMPL.setup_db()
def drop_db():
IMPL.drop_db()
# Transaction control.
def start_tx():
IMPL.start_tx()
def commit_tx():
IMPL.commit_tx()
def rollback_tx():
IMPL.rollback_tx()
def end_tx():
IMPL.end_tx()
# Workbook
def workbook_get(name):
return IMPL.workbook_get(name)
def workbook_create(values):
return IMPL.workbook_create(values)
def workbook_update(name, values):
return IMPL.workbook_update(name, values)
def workbook_delete(name):
IMPL.workbook_delete(name)
def workbooks_get():
return IMPL.workbooks_get_all()
def workbook_definition_get(workbook_name):
definition = IMPL.workbook_get(workbook_name)['definition']
if not definition:
raise exceptions.NotFoundException("Definition of workbook "
"%s is empty." % workbook_name)
return definition
def workbook_definition_put(workbook_name, text):
return IMPL.workbook_update(workbook_name, {'definition': text})
# Executions
def execution_get(id):
return IMPL.execution_get(id)
def ensure_execution_exists(execution_id):
return IMPL.ensure_execution_exists(execution_id)
def execution_create(workbook_name, values):
return IMPL.execution_create(workbook_name, values)
def execution_update(id, values):
return IMPL.execution_update(id, values)
def execution_delete(id):
return IMPL.execution_delete(id)
def executions_get(**kwargs):
return IMPL.executions_get(**kwargs)
# Tasks
def task_get(id):
return IMPL.task_get(id)
def task_create(execution_id, values):
return IMPL.task_create(execution_id, values)
def task_update(id, values):
return IMPL.task_update(id, values)
def task_delete(id):
return IMPL.task_delete(id)
def tasks_get(**kwargs):
return IMPL.tasks_get(**kwargs)
# Listeners
def listener_get(workbook_name, id):
return {}
def listener_create(workbook_name, values):
values['id'] = 1
return values
def listener_update(workbook_name, id, values):
return values
def listener_delete(workbook_name, id):
pass
def listeners_get(workbook_name):
return [{}]
# Triggers
def trigger_create(values):
return IMPL.trigger_create(values)
def triggers_get(**kwargs):
return IMPL.triggers_get_all(**kwargs)
def trigger_update(trigger_id, values):
return IMPL.trigger_update(trigger_id, values)
def get_next_triggers(time):
return IMPL.get_next_triggers(time)

View File

View File

@ -16,206 +16,69 @@
import sys
from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo.db import options
from oslo.db.sqlalchemy import session as db_session
import sqlalchemy as sa
from mistral import context
from mistral.db.sqlalchemy import models as m
from mistral.db.sqlalchemy import base as b
from mistral.db.v1.sqlalchemy import models
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral import utils
LOG = logging.getLogger(__name__)
options.set_defaults(cfg.CONF, sqlite_db="mistral.sqlite")
_DB_SESSION_THREAD_LOCAL_NAME = "db_sql_alchemy_session"
_facade = None
def get_facade():
global _facade
if not _facade:
_facade = db_session.EngineFacade(
cfg.CONF.database.connection, sqlite_fk=True, autocommit=False,
**dict(cfg.CONF.database.iteritems()))
return _facade
def get_engine():
return get_facade().get_engine()
def get_session():
return get_facade().get_session()
def get_backend():
"""The backend is this module itself."""
"""Consumed by openstack common code.
The backend is this module itself.
:return Name of db backend.
"""
return sys.modules[__name__]
def setup_db():
try:
engine = get_engine()
m.Trigger.metadata.create_all(engine)
models.Workbook.metadata.create_all(b.get_engine())
except sa.exc.OperationalError as e:
LOG.exception("Database registration exception: %s", e)
return False
return True
raise exc.DBException("Failed to setup database: %s" % e)
def drop_db():
global _facade
try:
engine = get_engine()
m.Trigger.metadata.drop_all(engine)
# TODO(rakhmerov): How to setup for multiple versions?
models.Workbook.metadata.drop_all(b.get_engine())
_facade = None
except Exception as e:
LOG.exception("Database shutdown exception: %s", e)
return False
return True
def to_dict(func):
def decorator(*args, **kwargs):
res = func(*args, **kwargs)
if isinstance(res, list):
return [item.to_dict() for item in res]
if res:
return res.to_dict()
else:
return None
return decorator
def _get_thread_local_session():
return utils.get_thread_local(_DB_SESSION_THREAD_LOCAL_NAME)
def _get_or_create_thread_local_session():
ses = _get_thread_local_session()
if ses:
return ses, False
ses = get_session()
_set_thread_local_session(ses)
return ses, True
def _set_thread_local_session(session):
utils.set_thread_local(_DB_SESSION_THREAD_LOCAL_NAME, session)
def session_aware(param_name="session"):
"""Decorator for methods working within db session."""
def _decorator(func):
def _within_session(*args, **kw):
# If 'created' flag is True it means that the transaction is
# demarcated explicitly outside this module.
ses, created = _get_or_create_thread_local_session()
try:
kw[param_name] = ses
result = func(*args, **kw)
if created:
ses.commit()
return result
except Exception:
if created:
ses.rollback()
raise
finally:
if created:
_set_thread_local_session(None)
ses.close()
_within_session.__doc__ = func.__doc__
return _within_session
return _decorator
raise exc.DBException("Failed to drop database: %s" + e)
# Transaction management.
def start_tx():
"""Opens new database session and starts new transaction assuming
there wasn't any opened sessions within the same thread.
"""
ses = _get_thread_local_session()
if ses:
raise exc.DataAccessException("Database transaction has already been"
" started.")
_set_thread_local_session(get_session())
b.start_tx()
def commit_tx():
"""Commits previously started database transaction."""
ses = _get_thread_local_session()
if not ses:
raise exc.DataAccessException("Nothing to commit. Database transaction"
" has not been previously started.")
ses.commit()
b.commit_tx()
def rollback_tx():
"""Rolls back previously started database transaction."""
ses = _get_thread_local_session()
if not ses:
raise exc.DataAccessException("Nothing to roll back. Database"
" transaction has not been started.")
ses.rollback()
b.rollback_tx()
def end_tx():
"""Ends current database transaction.
It rolls back all uncommitted changes and closes database session.
"""
ses = _get_thread_local_session()
if not ses:
raise exc.DataAccessException("Database transaction has not been"
" started.")
if ses.dirty:
ses.rollback()
ses.close()
_set_thread_local_session(None)
@session_aware()
def model_query(model, session=None):
"""Query helper.
:param model: base model to query
"""
return session.query(model)
b.end_tx()
# Triggers.
@session_aware()
@b.session_aware()
def trigger_create(values, session=None):
trigger = m.Trigger()
trigger = models.Trigger()
trigger.update(values.copy())
try:
@ -227,7 +90,7 @@ def trigger_create(values, session=None):
return trigger
@session_aware()
@b.session_aware()
def trigger_update(trigger_id, values, session=None):
trigger = _trigger_get(trigger_id)
if trigger is None:
@ -239,7 +102,7 @@ def trigger_update(trigger_id, values, session=None):
return trigger
@session_aware()
@b.session_aware()
def trigger_delete(trigger_id, session=None):
trigger = _trigger_get(trigger_id)
if not trigger:
@ -249,17 +112,17 @@ def trigger_delete(trigger_id, session=None):
session.delete(trigger)
@session_aware()
@b.session_aware()
def get_next_triggers(time, session=None):
query = model_query(m.Trigger)
query = query.filter(m.Trigger.next_execution_time < time)
query = query.order_by(m.Trigger.next_execution_time)
query = b.model_query(models.Trigger)
query = query.filter(models.Trigger.next_execution_time < time)
query = query.order_by(models.Trigger.next_execution_time)
return query.all()
@session_aware()
@b.session_aware()
def _trigger_get(trigger_id, session=None):
query = model_query(m.Trigger)
query = b.model_query(models.Trigger)
return query.filter_by(id=trigger_id).first()
@ -272,7 +135,7 @@ def trigger_get(trigger_id):
def _triggers_get_all(**kwargs):
query = model_query(m.Trigger)
query = b.model_query(models.Trigger)
return query.filter_by(**kwargs).all()
@ -282,9 +145,9 @@ def triggers_get_all(**kwargs):
# Workbooks.
@session_aware()
@b.session_aware()
def workbook_create(values, session=None):
workbook = m.Workbook()
workbook = models.Workbook()
workbook.update(values.copy())
workbook['project_id'] = context.ctx().project_id
@ -297,7 +160,7 @@ def workbook_create(values, session=None):
return workbook
@session_aware()
@b.session_aware()
def workbook_update(workbook_name, values, session=None):
workbook = _workbook_get(workbook_name)
@ -311,7 +174,7 @@ def workbook_update(workbook_name, values, session=None):
return workbook
@session_aware()
@b.session_aware()
def workbook_delete(workbook_name, session=None):
workbook = _workbook_get(workbook_name)
if not workbook:
@ -336,16 +199,16 @@ def workbooks_get_all(**kwargs):
def _workbooks_get_all(**kwargs):
query = model_query(m.Workbook)
query = b.model_query(models.Workbook)
proj = query.filter_by(project_id=context.ctx().project_id,
**kwargs)
public = query.filter_by(scope='public', **kwargs)
return proj.union(public).all()
@session_aware()
@b.session_aware()
def _workbook_get(workbook_name, session=None):
query = model_query(m.Workbook)
query = b.model_query(models.Workbook)
if context.ctx().is_admin:
return query.filter_by(name=workbook_name).first()
else:
@ -356,9 +219,9 @@ def _workbook_get(workbook_name, session=None):
# Workflow executions.
@session_aware()
@b.session_aware()
def execution_create(workbook_name, values, session=None):
execution = m.WorkflowExecution()
execution = models.WorkflowExecution()
execution.update(values.copy())
execution.update({'workbook_name': workbook_name})
@ -371,7 +234,7 @@ def execution_create(workbook_name, values, session=None):
return execution
@session_aware()
@b.session_aware()
def execution_update(execution_id, values, session=None):
execution = _execution_get(execution_id)
if not execution:
@ -383,7 +246,7 @@ def execution_update(execution_id, values, session=None):
return execution
@session_aware()
@b.session_aware()
def execution_delete(execution_id, session=None):
execution = _execution_get(execution_id)
if not execution:
@ -412,12 +275,12 @@ def executions_get(**kwargs):
def _executions_get(**kwargs):
query = model_query(m.WorkflowExecution)
query = b.model_query(models.WorkflowExecution)
return query.filter_by(**kwargs).all()
def _execution_get(execution_id):
query = model_query(m.WorkflowExecution)
query = b.model_query(models.WorkflowExecution)
return query.filter_by(id=execution_id).first()
@ -425,9 +288,9 @@ def _execution_get(execution_id):
# Workflow tasks.
@session_aware()
@b.session_aware()
def task_create(execution_id, values, session=None):
task = m.Task()
task = models.Task()
task.update(values)
task.update({'execution_id': execution_id})
@ -440,7 +303,7 @@ def task_create(execution_id, values, session=None):
return task
@session_aware()
@b.session_aware()
def task_update(task_id, values, session=None):
task = _task_get(task_id)
if not task:
@ -452,7 +315,7 @@ def task_update(task_id, values, session=None):
return task
@session_aware()
@b.session_aware()
def task_delete(task_id, session=None):
task = _task_get(task_id)
if not task:
@ -472,7 +335,7 @@ def task_get(task_id):
def _task_get(task_id):
query = model_query(m.Task)
query = b.model_query(models.Task)
return query.filter_by(id=task_id).first()
@ -481,5 +344,5 @@ def tasks_get(**kwargs):
def _tasks_get(**kwargs):
query = model_query(m.Task)
query = b.model_query(models.Task)
return query.filter_by(**kwargs).all()

View File

@ -15,51 +15,10 @@
# limitations under the License.
import sqlalchemy as sa
import uuid
from mistral.db.sqlalchemy import model_base as mb
from mistral.db.sqlalchemy import types as st
# Helpers
def _generate_unicode_uuid():
return unicode(str(uuid.uuid4()))
def _id_column():
return sa.Column(sa.String(36),
primary_key=True,
default=_generate_unicode_uuid)
class Trigger(mb.MistralBase):
"""Contains all info about trigger."""
__tablename__ = 'triggers'
__table_args__ = (
sa.UniqueConstraint('name'),
)
id = _id_column()
name = sa.Column(sa.String(80), nullable=False)
pattern = sa.Column(sa.String(20), nullable=False)
next_execution_time = sa.Column(sa.DateTime, nullable=False)
workbook_name = sa.Column(sa.String(80), nullable=False)
class WorkflowExecution(mb.MistralBase):
"""Contains info about particular workflow execution."""
__tablename__ = 'workflow_executions'
id = _id_column()
workbook_name = sa.Column(sa.String(80))
task = sa.Column(sa.String(80))
state = sa.Column(sa.String(20))
context = sa.Column(st.JsonDictType())
class Workbook(mb.MistralBase):
"""Contains info about workbook (including definition in Mistral DSL)."""
@ -70,7 +29,7 @@ class Workbook(mb.MistralBase):
sa.UniqueConstraint('name'),
)
id = _id_column()
id = mb._id_column()
name = sa.Column(sa.String(80), primary_key=True)
definition = sa.Column(sa.Text(), nullable=True)
description = sa.Column(sa.String(200))
@ -80,12 +39,24 @@ class Workbook(mb.MistralBase):
trust_id = sa.Column(sa.String(80))
class WorkflowExecution(mb.MistralBase):
"""Contains info about particular workflow execution."""
__tablename__ = 'workflow_executions'
id = mb._id_column()
workbook_name = sa.Column(sa.String(80))
task = sa.Column(sa.String(80))
state = sa.Column(sa.String(20))
context = sa.Column(st.JsonDictType())
class Task(mb.MistralBase):
"""Contains info about particular task."""
__tablename__ = 'tasks'
id = _id_column()
id = mb._id_column()
name = sa.Column(sa.String(80))
requires = sa.Column(st.JsonListType())
workbook_name = sa.Column(sa.String(80))
@ -105,3 +76,19 @@ class Task(mb.MistralBase):
# Effectively internal engine properties which will be used to determine
# execution of a task.
task_runtime_context = sa.Column(st.JsonDictType())
class Trigger(mb.MistralBase):
"""Contains all info about trigger."""
__tablename__ = 'triggers'
__table_args__ = (
sa.UniqueConstraint('name'),
)
id = mb._id_column()
name = sa.Column(sa.String(80), nullable=False)
pattern = sa.Column(sa.String(20), nullable=False)
next_execution_time = sa.Column(sa.DateTime, nullable=False)
workbook_name = sa.Column(sa.String(80), nullable=False)

View File

123
mistral/db/v2/api.py Normal file
View File

@ -0,0 +1,123 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.db import api as db_api
from mistral.openstack.common import log as logging
_BACKEND_MAPPING = {
'sqlalchemy': 'mistral.db.v2.sqlalchemy.api',
}
IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING)
LOG = logging.getLogger(__name__)
def setup_db():
IMPL.setup_db()
def drop_db():
IMPL.drop_db()
# Transaction control.
def start_tx():
IMPL.start_tx()
def commit_tx():
IMPL.commit_tx()
def rollback_tx():
IMPL.rollback_tx()
def end_tx():
IMPL.end_tx()
# Workbooks.
def get_workbook(name):
return IMPL.get_workbook(name)
def get_workbooks():
return IMPL.get_workbooks()
def create_workbook(values):
return IMPL.create_workbook(values)
def update_workbook(name, values):
return IMPL.update_workbook(name, values)
def delete_workbook(name):
IMPL.delete_workbook(name)
# Executions.
def get_execution(id):
return IMPL.get_execution(id)
def get_executions(**kwargs):
return IMPL.get_executions(**kwargs)
def ensure_execution_exists(id):
return IMPL.ensure_execution_exists(id)
def create_execution(values):
return IMPL.create_execution(values)
def update_execution(id, values):
return IMPL.update_execution(id, values)
def delete_execution(id):
return IMPL.delete_execution(id)
# Tasks.
def get_task(id):
return IMPL.get_task(id)
def get_tasks(**kwargs):
return IMPL.get_tasks(**kwargs)
def create_task(values):
return IMPL.create_task(values)
def update_task(id, values):
return IMPL.update_task(id, values)
def delete_task(id):
return IMPL.delete_task(id)

View File

View File

@ -0,0 +1,286 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from oslo.db import exception as db_exc
import sqlalchemy as sa
from mistral import context
from mistral.db.sqlalchemy import base as b
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def get_backend():
"""Consumed by openstack common code.
The backend is this module itself.
:return Name of db backend.
"""
return sys.modules[__name__]
def setup_db():
try:
models.Workbook.metadata.create_all(b.get_engine())
except sa.exc.OperationalError as e:
raise exc.DBException("Failed to setup database: %s" % e)
def drop_db():
global _facade
try:
# TODO(rakhmerov): How to setup for multiple versions?
models.Workbook.metadata.drop_all(b.get_engine())
_facade = None
except Exception as e:
raise exc.DBException("Failed to drop database: %s" + e)
# Transaction management.
def start_tx():
b.start_tx()
def commit_tx():
b.commit_tx()
def rollback_tx():
b.rollback_tx()
def end_tx():
b.end_tx()
# Workbooks.
def get_workbook(name):
wb = _get_workbook(name)
if not wb:
raise exc.NotFoundException(
"Workbook not found [workbook_name=%s]" % name)
return wb
def get_workbooks(**kwargs):
return _get_workbooks(**kwargs)
@b.session_aware()
def create_workbook(values, session=None):
wb = models.Workbook()
wb.update(values.copy())
wb['project_id'] = context.ctx().project_id
try:
wb.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for Workbook: %s"
% e.columns)
return wb
@b.session_aware()
def update_workbook(name, values, session=None):
wb = _get_workbook(name)
if not wb:
raise exc.NotFoundException(
"Workbook not found [workbook_name=%s]" % name)
wb.update(values.copy())
wb['project_id'] = context.ctx().project_id
return wb
@b.session_aware()
def delete_workbook(name, session=None):
wb = _get_workbook(name)
if not wb:
raise exc.NotFoundException(
"Workbook not found [workbook_name=%s]" % name)
session.delete(wb)
def _get_workbooks(**kwargs):
query = b.model_query(models.Workbook)
proj = query.filter_by(project_id=context.ctx().project_id,
**kwargs)
public = query.filter_by(scope='public', **kwargs)
return proj.union(public).all()
@b.session_aware()
def _get_workbook(name, session=None):
query = b.model_query(models.Workbook)
return query.filter_by(name=name,
project_id=context.ctx().project_id).first()
# Executions.
def get_execution(id):
execution = _get_execution(id)
if not execution:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
return execution
def ensure_execution_exists(id):
get_execution(id)
def get_executions(**kwargs):
return _get_executions(**kwargs)
@b.session_aware()
def create_execution(values, session=None):
execution = models.Execution()
execution.update(values.copy())
try:
execution.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for Execution: %s"
% e.columns)
return execution
@b.session_aware()
def update_execution(id, values, session=None):
execution = _get_execution(id)
if not execution:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
execution.update(values.copy())
return execution
@b.session_aware()
def delete_execution(id, session=None):
execution = _get_execution(id)
if not execution:
raise exc.NotFoundException(
"Execution not found [execution_id=%s]" % id)
session.delete(execution)
def _get_executions(**kwargs):
query = b.model_query(models.Execution)
return query.filter_by(**kwargs).all()
def _get_execution(id):
query = b.model_query(models.Execution)
return query.filter_by(id=id).first()
# Tasks.
def get_task(id):
task = _get_task(id)
if not task:
raise exc.NotFoundException(
"Task not found [task_id=%s]" % id)
return task
def get_tasks(**kwargs):
return _get_tasks(**kwargs)
@b.session_aware()
def create_task(values, session=None):
task = models.Task()
task.update(values)
try:
task.save(session=session)
except db_exc.DBDuplicateEntry as e:
raise exc.DBDuplicateEntry("Duplicate entry for Task: %s"
% e.columns)
return task
@b.session_aware()
def update_task(id, values, session=None):
task = _get_task(id)
if not task:
raise exc.NotFoundException(
"Task not found [task_id=%s]" % id)
task.update(values.copy())
return task
@b.session_aware()
def delete_task(id, session=None):
task = _get_task(id)
if not task:
raise exc.NotFoundException(
"Task not found [task_id=%s]" % id)
session.delete(task)
def _get_task(id):
query = b.model_query(models.Task)
return query.filter_by(id=id).first()
def _get_tasks(**kwargs):
query = b.model_query(models.Task)
return query.filter_by(**kwargs).all()

View File

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy as sa
from sqlalchemy.orm import relationship
from mistral.db.sqlalchemy import model_base as mb
from mistral.db.sqlalchemy import types as st
class Workbook(mb.MistralBase):
"""Contains info about workbook (including definition in Mistral DSL)."""
__tablename__ = 'workbooks_v2'
__table_args__ = (
sa.UniqueConstraint('name'),
)
id = mb._id_column()
name = sa.Column(sa.String(80), primary_key=True)
definition = sa.Column(sa.Text(), nullable=True)
spec = sa.Column(st.JsonDictType())
description = sa.Column(sa.String(200))
tags = sa.Column(st.JsonListType())
scope = sa.Column(sa.String(80))
project_id = sa.Column(sa.String(80))
trust_id = sa.Column(sa.String(80))
class Execution(mb.MistralBase):
"""Contains workflow execution information."""
__tablename__ = 'executions_v2'
id = mb._id_column()
wf_spec = sa.Column(st.JsonDictType())
start_params = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20))
context = sa.Column(st.JsonDictType())
class Task(mb.MistralBase):
"""Contains task runtime information."""
__tablename__ = 'tasks_v2'
# Main properties.
id = mb._id_column()
name = sa.Column(sa.String(80))
requires = sa.Column(st.JsonListType())
wf_name = sa.Column(sa.String(80))
spec = sa.Column(st.JsonDictType())
action_spec = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20))
tags = sa.Column(st.JsonListType())
# Data Flow properties.
in_context = sa.Column(st.JsonDictType())
parameters = sa.Column(st.JsonDictType())
output = sa.Column(st.JsonDictType())
# Runtime context like iteration_no of a repeater.
# Effectively internal engine properties which will be used to determine
# execution of a task.
runtime_context = sa.Column(st.JsonDictType())
# Relations.
execution_id = sa.Column(sa.String(36), sa.ForeignKey('executions_v2.id'))
execution = relationship('Execution', backref="tasks", lazy='joined')

View File

@ -47,6 +47,10 @@ class MistralException(Error):
'%d: %s' % (self.http_code, self.message))
class DBException(MistralException):
http_code = 400
class DataAccessException(MistralException):
http_code = 400

View File

@ -20,7 +20,7 @@ import pecan
import pecan.testing
from webtest import app as webtest_app
from mistral.db.sqlalchemy import models as m
from mistral.db.v1.sqlalchemy import models
from mistral.tests import base
# Disable authentication for functional tests.
@ -32,7 +32,7 @@ __all__ = ['FunctionalTest']
# Group of methods to mock DB API calls.
def create_db_workbook(values):
wb = m.Workbook()
wb = models.Workbook()
wb.update(values)
return wb
@ -48,7 +48,7 @@ def create_mock_workbooks(arr_of_values):
def create_db_execution(values):
ex = m.WorkflowExecution()
ex = models.WorkflowExecution()
ex.update(values)
return ex
@ -64,7 +64,7 @@ def create_mock_executions(arr_of_values):
def create_db_task(values):
t = m.Task()
t = models.Task()
t.update(values)
return t

View File

@ -22,7 +22,7 @@ import pecan
import pecan.testing
from mistral.db import api as db_api
from mistral.db.sqlalchemy import models as m
from mistral.db.v1.sqlalchemy import models
from mistral.openstack.common import timeutils
from mistral.tests.api import base
@ -62,7 +62,7 @@ PKI_TOKEN_VERIFIED = {
def get_mock_workbook(values):
wb = m.Workbook()
wb = models.Workbook()
wb.update(values)
return wb

View File

@ -16,7 +16,6 @@
import pkg_resources as pkg
import sys
import tempfile
from oslo.config import cfg
from oslo import messaging
@ -26,7 +25,8 @@ from stevedore import driver
import testtools.matchers as ttm
from mistral import context as auth_context
from mistral.db.sqlalchemy import api as db_api
from mistral.db.sqlalchemy import base as db_sa_base
from mistral.db.v1 import api as db_api_v1
from mistral import engine
from mistral.engine import executor
from mistral.openstack.common import log as logging
@ -105,11 +105,11 @@ class BaseTest(base.BaseTestCase):
class DbTestCase(BaseTest):
def setUp(self):
super(DbTestCase, self).setUp()
_db_fd, self.db_path = tempfile.mkstemp()
cfg.CONF.set_default('connection', 'sqlite:///' + self.db_path,
group='database')
db_api.setup_db()
self.addCleanup(db_api.drop_db)
cfg.CONF.set_default('connection', 'sqlite://', group='database')
db_api_v1.setup_db()
self.addCleanup(db_api_v1.drop_db)
self.ctx = auth_context.MistralContext(user_id='1-2-3-4',
project_id='5-6-7-8',
@ -120,7 +120,7 @@ class DbTestCase(BaseTest):
self.addCleanup(auth_context.set_ctx, None)
def is_db_session_open(self):
return db_api._get_thread_local_session() is not None
return db_sa_base._get_thread_local_session() is not None
class EngineTestCase(DbTestCase):

View File

@ -19,7 +19,7 @@ import json
from mistral.actions import action_factory as a_f
from mistral.actions import std_actions as std
from mistral.db.sqlalchemy import models
from mistral.db.v1.sqlalchemy import models
from mistral.engine import data_flow
from mistral import exceptions
from mistral.openstack.common import log as logging

View File

@ -15,7 +15,7 @@
# limitations under the License.
from mistral import context as auth_context
from mistral.db.sqlalchemy import api as db_api
from mistral.db.v1.sqlalchemy import api as db_api
from mistral import exceptions as exc
from mistral.openstack.common import timeutils
from mistral.tests import base as test_base

View File

@ -19,7 +19,7 @@ from oslo.config import cfg
from mistral.actions import std_actions
from mistral import context as auth_context
from mistral.db import api as db_api
from mistral.db.sqlalchemy import models
from mistral.db.v1.sqlalchemy import models
from mistral import engine
from mistral.engine.drivers.default import engine as concrete_engine
from mistral.engine import executor

View File

@ -17,7 +17,7 @@
import copy
from mistral.db import api as db_api
from mistral.db.sqlalchemy import models
from mistral.db.v1.sqlalchemy import models
from mistral.engine import data_flow
from mistral.engine import states
from mistral.openstack.common import log as logging

View File

@ -20,7 +20,7 @@ from oslo.config import cfg
from mistral.actions import std_actions
from mistral.db import api as db_api
from mistral.db.sqlalchemy import models as m
from mistral.db.v1.sqlalchemy import models as m
from mistral import engine
from mistral.engine.drivers.default import engine as concrete_engine
from mistral.engine import states

View File

@ -24,7 +24,7 @@ eventlet.monkey_patch()
from mistral.actions import std_actions
from mistral.cmd import launch
from mistral.db import api as db_api
from mistral.db.sqlalchemy import models
from mistral.db.v1.sqlalchemy import models
from mistral.engine import states
from mistral.openstack.common import importutils
from mistral.openstack.common import log as logging

View File

@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from mistral.db.sqlalchemy import models as m
from mistral.db.v2.sqlalchemy import models
from mistral.openstack.common import log as logging
from mistral.tests import base
from mistral.workbook import parser as spec_parser
@ -32,7 +32,7 @@ class ReverseWorkflowHandlerTest(base.BaseTest):
base.get_resource('dsl_v2/reverse_workflow.yaml')
)
exec_db = m.WorkflowExecution()
exec_db = models.Execution()
exec_db.update({
'id': '1-2-3-4',
'wf_spec': wf_spec.to_dict()