diff --git a/mistral/actions/action_factory.py b/mistral/actions/action_factory.py index 778b3833..70a48e81 100644 --- a/mistral/actions/action_factory.py +++ b/mistral/actions/action_factory.py @@ -110,6 +110,7 @@ def _has_action_context_param(action_cls): return _ACTION_CTX_PARAM in arg_spec.args +# TODO(rakhmerov): It's not used anywhere. def _create_adhoc_action(db_task, openstack_context): task_spec = spec_parser.get_task_spec(db_task['task_spec']) @@ -145,6 +146,7 @@ def _create_adhoc_action(db_task, openstack_context): **action_params) +# TODO(rakhmerov): It's not used anywhere. Remove it later. def create_action(db_task): task_spec = spec_parser.get_task_spec(db_task['task_spec']) diff --git a/mistral/actions/std_actions.py b/mistral/actions/std_actions.py index 6c8dca93..be9b3ccd 100644 --- a/mistral/actions/std_actions.py +++ b/mistral/actions/std_actions.py @@ -268,6 +268,7 @@ class SSHAction(base.Action): return raise_exc(parent_exc=e) +# TODO(rakhmerov): It's not used anywhere. Remove it later. class AdHocAction(base.Action): def __init__(self, action_context, base_action_cls, action_spec, **params): diff --git a/mistral/db/api.py b/mistral/db/api.py index 8034c0cb..657ce83d 100644 --- a/mistral/db/api.py +++ b/mistral/db/api.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +# TODO(rakhmerov): Module deprecated in favor of v1/api.py + from oslo.db import api as db_api from mistral import exceptions @@ -22,7 +24,7 @@ from mistral.openstack.common import log as logging # Workbooks _BACKEND_MAPPING = { - 'sqlalchemy': 'mistral.db.sqlalchemy.api', + 'sqlalchemy': 'mistral.db.v1.sqlalchemy.api', } IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING) diff --git a/mistral/db/sqlalchemy/base.py b/mistral/db/sqlalchemy/base.py new file mode 100644 index 00000000..e699b557 --- /dev/null +++ b/mistral/db/sqlalchemy/base.py @@ -0,0 +1,163 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg +from oslo.db import options +from oslo.db.sqlalchemy import session as db_session + +from mistral import exceptions as exc +from mistral.openstack.common import log as logging +from mistral import utils + + +LOG = logging.getLogger(__name__) + +options.set_defaults(cfg.CONF, sqlite_db="mistral.sqlite") + +_DB_SESSION_THREAD_LOCAL_NAME = "db_sql_alchemy_session" + +_facade = None + + +def _get_facade(): + global _facade + if not _facade: + _facade = db_session.EngineFacade( + cfg.CONF.database.connection, sqlite_fk=True, autocommit=False, + **dict(cfg.CONF.database.iteritems())) + return _facade + + +def get_engine(): + return _get_facade().get_engine() + + +def _get_session(): + return _get_facade().get_session() + + +def _get_thread_local_session(): + return utils.get_thread_local(_DB_SESSION_THREAD_LOCAL_NAME) + + +def _get_or_create_thread_local_session(): + ses = _get_thread_local_session() + + if ses: + return ses, False + + ses = _get_session() + _set_thread_local_session(ses) + + return ses, True + + +def _set_thread_local_session(session): + utils.set_thread_local(_DB_SESSION_THREAD_LOCAL_NAME, session) + + +def session_aware(param_name="session"): + """Decorator for methods working within db session.""" + + def _decorator(func): + def _within_session(*args, **kw): + # If 'created' flag is True it means that the transaction is + # demarcated explicitly outside this module. + ses, created = _get_or_create_thread_local_session() + + try: + kw[param_name] = ses + + result = func(*args, **kw) + + if created: + ses.commit() + + return result + except Exception: + if created: + ses.rollback() + raise + finally: + if created: + _set_thread_local_session(None) + ses.close() + + _within_session.__doc__ = func.__doc__ + + return _within_session + + return _decorator + + +# Transaction management. + + +def start_tx(): + """Opens new database session and starts new transaction assuming + there wasn't any opened sessions within the same thread. + """ + ses = _get_thread_local_session() + if ses: + raise exc.DataAccessException("Database transaction has already been" + " started.") + + _set_thread_local_session(_get_session()) + + +def commit_tx(): + """Commits previously started database transaction.""" + ses = _get_thread_local_session() + if not ses: + raise exc.DataAccessException("Nothing to commit. Database transaction" + " has not been previously started.") + + ses.commit() + + +def rollback_tx(): + """Rolls back previously started database transaction.""" + ses = _get_thread_local_session() + if not ses: + raise exc.DataAccessException("Nothing to roll back. Database" + " transaction has not been started.") + + ses.rollback() + + +def end_tx(): + """Ends current database transaction. + It rolls back all uncommitted changes and closes database session. + """ + ses = _get_thread_local_session() + if not ses: + raise exc.DataAccessException("Database transaction has not been" + " started.") + + if ses.dirty: + ses.rollback() + + ses.close() + _set_thread_local_session(None) + + +@session_aware() +def model_query(model, session=None): + """Query helper. + + :param model: base model to query + """ + return session.query(model) diff --git a/mistral/db/sqlalchemy/model_base.py b/mistral/db/sqlalchemy/model_base.py index 54fa786e..94592cb0 100644 --- a/mistral/db/sqlalchemy/model_base.py +++ b/mistral/db/sqlalchemy/model_base.py @@ -14,11 +14,25 @@ # See the License for the specific language governing permissions and # limitations under the License. + +import uuid + from oslo.db.sqlalchemy import models as oslo_models +import sqlalchemy as sa from sqlalchemy.ext import declarative from sqlalchemy.orm import attributes +def _generate_unicode_uuid(): + return unicode(str(uuid.uuid4())) + + +def _id_column(): + return sa.Column(sa.String(36), + primary_key=True, + default=_generate_unicode_uuid) + + class _MistralBase(oslo_models.ModelBase, oslo_models.TimestampMixin): """Base class for all Mistral SQLAlchemy DB Models.""" diff --git a/mistral/db/v1/__init__.py b/mistral/db/v1/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/db/v1/api.py b/mistral/db/v1/api.py new file mode 100644 index 00000000..d9a4c1a4 --- /dev/null +++ b/mistral/db/v1/api.py @@ -0,0 +1,183 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.db import api as db_api + +from mistral import exceptions +from mistral.openstack.common import log as logging + +# Workbooks + +_BACKEND_MAPPING = { + 'sqlalchemy': 'mistral.db.v1.sqlalchemy.api', +} + +IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING) +LOG = logging.getLogger(__name__) + + +def setup_db(): + IMPL.setup_db() + + +def drop_db(): + IMPL.drop_db() + + +# Transaction control. + + +def start_tx(): + IMPL.start_tx() + + +def commit_tx(): + IMPL.commit_tx() + + +def rollback_tx(): + IMPL.rollback_tx() + + +def end_tx(): + IMPL.end_tx() + + +# Workbook + + +def workbook_get(name): + return IMPL.workbook_get(name) + + +def workbook_create(values): + return IMPL.workbook_create(values) + + +def workbook_update(name, values): + return IMPL.workbook_update(name, values) + + +def workbook_delete(name): + IMPL.workbook_delete(name) + + +def workbooks_get(): + return IMPL.workbooks_get_all() + + +def workbook_definition_get(workbook_name): + definition = IMPL.workbook_get(workbook_name)['definition'] + if not definition: + raise exceptions.NotFoundException("Definition of workbook " + "%s is empty." % workbook_name) + return definition + + +def workbook_definition_put(workbook_name, text): + return IMPL.workbook_update(workbook_name, {'definition': text}) + + +# Executions + + +def execution_get(id): + return IMPL.execution_get(id) + + +def ensure_execution_exists(execution_id): + return IMPL.ensure_execution_exists(execution_id) + + +def execution_create(workbook_name, values): + return IMPL.execution_create(workbook_name, values) + + +def execution_update(id, values): + return IMPL.execution_update(id, values) + + +def execution_delete(id): + return IMPL.execution_delete(id) + + +def executions_get(**kwargs): + return IMPL.executions_get(**kwargs) + + +# Tasks + +def task_get(id): + return IMPL.task_get(id) + + +def task_create(execution_id, values): + return IMPL.task_create(execution_id, values) + + +def task_update(id, values): + return IMPL.task_update(id, values) + + +def task_delete(id): + return IMPL.task_delete(id) + + +def tasks_get(**kwargs): + return IMPL.tasks_get(**kwargs) + + +# Listeners + + +def listener_get(workbook_name, id): + return {} + + +def listener_create(workbook_name, values): + values['id'] = 1 + + return values + + +def listener_update(workbook_name, id, values): + return values + + +def listener_delete(workbook_name, id): + pass + + +def listeners_get(workbook_name): + return [{}] + + +# Triggers + +def trigger_create(values): + return IMPL.trigger_create(values) + + +def triggers_get(**kwargs): + return IMPL.triggers_get_all(**kwargs) + + +def trigger_update(trigger_id, values): + return IMPL.trigger_update(trigger_id, values) + + +def get_next_triggers(time): + return IMPL.get_next_triggers(time) diff --git a/mistral/db/v1/sqlalchemy/__init__.py b/mistral/db/v1/sqlalchemy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/db/sqlalchemy/api.py b/mistral/db/v1/sqlalchemy/api.py similarity index 58% rename from mistral/db/sqlalchemy/api.py rename to mistral/db/v1/sqlalchemy/api.py index 6822a67b..1886a797 100644 --- a/mistral/db/sqlalchemy/api.py +++ b/mistral/db/v1/sqlalchemy/api.py @@ -16,206 +16,69 @@ import sys -from oslo.config import cfg from oslo.db import exception as db_exc -from oslo.db import options -from oslo.db.sqlalchemy import session as db_session import sqlalchemy as sa from mistral import context -from mistral.db.sqlalchemy import models as m +from mistral.db.sqlalchemy import base as b +from mistral.db.v1.sqlalchemy import models from mistral import exceptions as exc from mistral.openstack.common import log as logging -from mistral import utils LOG = logging.getLogger(__name__) -options.set_defaults(cfg.CONF, sqlite_db="mistral.sqlite") - -_DB_SESSION_THREAD_LOCAL_NAME = "db_sql_alchemy_session" - -_facade = None - - -def get_facade(): - global _facade - if not _facade: - _facade = db_session.EngineFacade( - cfg.CONF.database.connection, sqlite_fk=True, autocommit=False, - **dict(cfg.CONF.database.iteritems())) - return _facade - - -def get_engine(): - return get_facade().get_engine() - - -def get_session(): - return get_facade().get_session() - def get_backend(): - """The backend is this module itself.""" + """Consumed by openstack common code. + + The backend is this module itself. + :return Name of db backend. + """ return sys.modules[__name__] def setup_db(): try: - engine = get_engine() - m.Trigger.metadata.create_all(engine) + models.Workbook.metadata.create_all(b.get_engine()) except sa.exc.OperationalError as e: - LOG.exception("Database registration exception: %s", e) - return False - return True + raise exc.DBException("Failed to setup database: %s" % e) def drop_db(): global _facade + try: - engine = get_engine() - m.Trigger.metadata.drop_all(engine) + # TODO(rakhmerov): How to setup for multiple versions? + models.Workbook.metadata.drop_all(b.get_engine()) _facade = None except Exception as e: - LOG.exception("Database shutdown exception: %s", e) - return False - return True - - -def to_dict(func): - def decorator(*args, **kwargs): - res = func(*args, **kwargs) - - if isinstance(res, list): - return [item.to_dict() for item in res] - - if res: - return res.to_dict() - else: - return None - - return decorator - - -def _get_thread_local_session(): - return utils.get_thread_local(_DB_SESSION_THREAD_LOCAL_NAME) - - -def _get_or_create_thread_local_session(): - ses = _get_thread_local_session() - - if ses: - return ses, False - - ses = get_session() - _set_thread_local_session(ses) - - return ses, True - - -def _set_thread_local_session(session): - utils.set_thread_local(_DB_SESSION_THREAD_LOCAL_NAME, session) - - -def session_aware(param_name="session"): - """Decorator for methods working within db session.""" - - def _decorator(func): - def _within_session(*args, **kw): - # If 'created' flag is True it means that the transaction is - # demarcated explicitly outside this module. - ses, created = _get_or_create_thread_local_session() - - try: - kw[param_name] = ses - - result = func(*args, **kw) - - if created: - ses.commit() - - return result - except Exception: - if created: - ses.rollback() - raise - finally: - if created: - _set_thread_local_session(None) - ses.close() - - _within_session.__doc__ = func.__doc__ - - return _within_session - - return _decorator + raise exc.DBException("Failed to drop database: %s" + e) # Transaction management. - def start_tx(): - """Opens new database session and starts new transaction assuming - there wasn't any opened sessions within the same thread. - """ - ses = _get_thread_local_session() - if ses: - raise exc.DataAccessException("Database transaction has already been" - " started.") - - _set_thread_local_session(get_session()) + b.start_tx() def commit_tx(): - """Commits previously started database transaction.""" - ses = _get_thread_local_session() - if not ses: - raise exc.DataAccessException("Nothing to commit. Database transaction" - " has not been previously started.") - - ses.commit() + b.commit_tx() def rollback_tx(): - """Rolls back previously started database transaction.""" - ses = _get_thread_local_session() - if not ses: - raise exc.DataAccessException("Nothing to roll back. Database" - " transaction has not been started.") - - ses.rollback() + b.rollback_tx() def end_tx(): - """Ends current database transaction. - It rolls back all uncommitted changes and closes database session. - """ - ses = _get_thread_local_session() - if not ses: - raise exc.DataAccessException("Database transaction has not been" - " started.") - - if ses.dirty: - ses.rollback() - - ses.close() - _set_thread_local_session(None) - - -@session_aware() -def model_query(model, session=None): - """Query helper. - - :param model: base model to query - """ - return session.query(model) + b.end_tx() # Triggers. -@session_aware() +@b.session_aware() def trigger_create(values, session=None): - trigger = m.Trigger() + trigger = models.Trigger() trigger.update(values.copy()) try: @@ -227,7 +90,7 @@ def trigger_create(values, session=None): return trigger -@session_aware() +@b.session_aware() def trigger_update(trigger_id, values, session=None): trigger = _trigger_get(trigger_id) if trigger is None: @@ -239,7 +102,7 @@ def trigger_update(trigger_id, values, session=None): return trigger -@session_aware() +@b.session_aware() def trigger_delete(trigger_id, session=None): trigger = _trigger_get(trigger_id) if not trigger: @@ -249,17 +112,17 @@ def trigger_delete(trigger_id, session=None): session.delete(trigger) -@session_aware() +@b.session_aware() def get_next_triggers(time, session=None): - query = model_query(m.Trigger) - query = query.filter(m.Trigger.next_execution_time < time) - query = query.order_by(m.Trigger.next_execution_time) + query = b.model_query(models.Trigger) + query = query.filter(models.Trigger.next_execution_time < time) + query = query.order_by(models.Trigger.next_execution_time) return query.all() -@session_aware() +@b.session_aware() def _trigger_get(trigger_id, session=None): - query = model_query(m.Trigger) + query = b.model_query(models.Trigger) return query.filter_by(id=trigger_id).first() @@ -272,7 +135,7 @@ def trigger_get(trigger_id): def _triggers_get_all(**kwargs): - query = model_query(m.Trigger) + query = b.model_query(models.Trigger) return query.filter_by(**kwargs).all() @@ -282,9 +145,9 @@ def triggers_get_all(**kwargs): # Workbooks. -@session_aware() +@b.session_aware() def workbook_create(values, session=None): - workbook = m.Workbook() + workbook = models.Workbook() workbook.update(values.copy()) workbook['project_id'] = context.ctx().project_id @@ -297,7 +160,7 @@ def workbook_create(values, session=None): return workbook -@session_aware() +@b.session_aware() def workbook_update(workbook_name, values, session=None): workbook = _workbook_get(workbook_name) @@ -311,7 +174,7 @@ def workbook_update(workbook_name, values, session=None): return workbook -@session_aware() +@b.session_aware() def workbook_delete(workbook_name, session=None): workbook = _workbook_get(workbook_name) if not workbook: @@ -336,16 +199,16 @@ def workbooks_get_all(**kwargs): def _workbooks_get_all(**kwargs): - query = model_query(m.Workbook) + query = b.model_query(models.Workbook) proj = query.filter_by(project_id=context.ctx().project_id, **kwargs) public = query.filter_by(scope='public', **kwargs) return proj.union(public).all() -@session_aware() +@b.session_aware() def _workbook_get(workbook_name, session=None): - query = model_query(m.Workbook) + query = b.model_query(models.Workbook) if context.ctx().is_admin: return query.filter_by(name=workbook_name).first() else: @@ -356,9 +219,9 @@ def _workbook_get(workbook_name, session=None): # Workflow executions. -@session_aware() +@b.session_aware() def execution_create(workbook_name, values, session=None): - execution = m.WorkflowExecution() + execution = models.WorkflowExecution() execution.update(values.copy()) execution.update({'workbook_name': workbook_name}) @@ -371,7 +234,7 @@ def execution_create(workbook_name, values, session=None): return execution -@session_aware() +@b.session_aware() def execution_update(execution_id, values, session=None): execution = _execution_get(execution_id) if not execution: @@ -383,7 +246,7 @@ def execution_update(execution_id, values, session=None): return execution -@session_aware() +@b.session_aware() def execution_delete(execution_id, session=None): execution = _execution_get(execution_id) if not execution: @@ -412,12 +275,12 @@ def executions_get(**kwargs): def _executions_get(**kwargs): - query = model_query(m.WorkflowExecution) + query = b.model_query(models.WorkflowExecution) return query.filter_by(**kwargs).all() def _execution_get(execution_id): - query = model_query(m.WorkflowExecution) + query = b.model_query(models.WorkflowExecution) return query.filter_by(id=execution_id).first() @@ -425,9 +288,9 @@ def _execution_get(execution_id): # Workflow tasks. -@session_aware() +@b.session_aware() def task_create(execution_id, values, session=None): - task = m.Task() + task = models.Task() task.update(values) task.update({'execution_id': execution_id}) @@ -440,7 +303,7 @@ def task_create(execution_id, values, session=None): return task -@session_aware() +@b.session_aware() def task_update(task_id, values, session=None): task = _task_get(task_id) if not task: @@ -452,7 +315,7 @@ def task_update(task_id, values, session=None): return task -@session_aware() +@b.session_aware() def task_delete(task_id, session=None): task = _task_get(task_id) if not task: @@ -472,7 +335,7 @@ def task_get(task_id): def _task_get(task_id): - query = model_query(m.Task) + query = b.model_query(models.Task) return query.filter_by(id=task_id).first() @@ -481,5 +344,5 @@ def tasks_get(**kwargs): def _tasks_get(**kwargs): - query = model_query(m.Task) + query = b.model_query(models.Task) return query.filter_by(**kwargs).all() diff --git a/mistral/db/sqlalchemy/models.py b/mistral/db/v1/sqlalchemy/models.py similarity index 89% rename from mistral/db/sqlalchemy/models.py rename to mistral/db/v1/sqlalchemy/models.py index b25b45c4..60c87e45 100644 --- a/mistral/db/sqlalchemy/models.py +++ b/mistral/db/v1/sqlalchemy/models.py @@ -15,51 +15,10 @@ # limitations under the License. import sqlalchemy as sa -import uuid from mistral.db.sqlalchemy import model_base as mb from mistral.db.sqlalchemy import types as st -# Helpers - - -def _generate_unicode_uuid(): - return unicode(str(uuid.uuid4())) - - -def _id_column(): - return sa.Column(sa.String(36), - primary_key=True, - default=_generate_unicode_uuid) - - -class Trigger(mb.MistralBase): - """Contains all info about trigger.""" - - __tablename__ = 'triggers' - - __table_args__ = ( - sa.UniqueConstraint('name'), - ) - - id = _id_column() - name = sa.Column(sa.String(80), nullable=False) - pattern = sa.Column(sa.String(20), nullable=False) - next_execution_time = sa.Column(sa.DateTime, nullable=False) - workbook_name = sa.Column(sa.String(80), nullable=False) - - -class WorkflowExecution(mb.MistralBase): - """Contains info about particular workflow execution.""" - - __tablename__ = 'workflow_executions' - - id = _id_column() - workbook_name = sa.Column(sa.String(80)) - task = sa.Column(sa.String(80)) - state = sa.Column(sa.String(20)) - context = sa.Column(st.JsonDictType()) - class Workbook(mb.MistralBase): """Contains info about workbook (including definition in Mistral DSL).""" @@ -70,7 +29,7 @@ class Workbook(mb.MistralBase): sa.UniqueConstraint('name'), ) - id = _id_column() + id = mb._id_column() name = sa.Column(sa.String(80), primary_key=True) definition = sa.Column(sa.Text(), nullable=True) description = sa.Column(sa.String(200)) @@ -80,12 +39,24 @@ class Workbook(mb.MistralBase): trust_id = sa.Column(sa.String(80)) +class WorkflowExecution(mb.MistralBase): + """Contains info about particular workflow execution.""" + + __tablename__ = 'workflow_executions' + + id = mb._id_column() + workbook_name = sa.Column(sa.String(80)) + task = sa.Column(sa.String(80)) + state = sa.Column(sa.String(20)) + context = sa.Column(st.JsonDictType()) + + class Task(mb.MistralBase): """Contains info about particular task.""" __tablename__ = 'tasks' - id = _id_column() + id = mb._id_column() name = sa.Column(sa.String(80)) requires = sa.Column(st.JsonListType()) workbook_name = sa.Column(sa.String(80)) @@ -105,3 +76,19 @@ class Task(mb.MistralBase): # Effectively internal engine properties which will be used to determine # execution of a task. task_runtime_context = sa.Column(st.JsonDictType()) + + +class Trigger(mb.MistralBase): + """Contains all info about trigger.""" + + __tablename__ = 'triggers' + + __table_args__ = ( + sa.UniqueConstraint('name'), + ) + + id = mb._id_column() + name = sa.Column(sa.String(80), nullable=False) + pattern = sa.Column(sa.String(20), nullable=False) + next_execution_time = sa.Column(sa.DateTime, nullable=False) + workbook_name = sa.Column(sa.String(80), nullable=False) diff --git a/mistral/db/v2/__init__.py b/mistral/db/v2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/db/v2/api.py b/mistral/db/v2/api.py new file mode 100644 index 00000000..090e4a56 --- /dev/null +++ b/mistral/db/v2/api.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.db import api as db_api + +from mistral.openstack.common import log as logging + +_BACKEND_MAPPING = { + 'sqlalchemy': 'mistral.db.v2.sqlalchemy.api', +} + +IMPL = db_api.DBAPI('sqlalchemy', backend_mapping=_BACKEND_MAPPING) +LOG = logging.getLogger(__name__) + + +def setup_db(): + IMPL.setup_db() + + +def drop_db(): + IMPL.drop_db() + + +# Transaction control. + + +def start_tx(): + IMPL.start_tx() + + +def commit_tx(): + IMPL.commit_tx() + + +def rollback_tx(): + IMPL.rollback_tx() + + +def end_tx(): + IMPL.end_tx() + + +# Workbooks. + +def get_workbook(name): + return IMPL.get_workbook(name) + + +def get_workbooks(): + return IMPL.get_workbooks() + + +def create_workbook(values): + return IMPL.create_workbook(values) + + +def update_workbook(name, values): + return IMPL.update_workbook(name, values) + + +def delete_workbook(name): + IMPL.delete_workbook(name) + + +# Executions. + +def get_execution(id): + return IMPL.get_execution(id) + + +def get_executions(**kwargs): + return IMPL.get_executions(**kwargs) + + +def ensure_execution_exists(id): + return IMPL.ensure_execution_exists(id) + + +def create_execution(values): + return IMPL.create_execution(values) + + +def update_execution(id, values): + return IMPL.update_execution(id, values) + + +def delete_execution(id): + return IMPL.delete_execution(id) + + +# Tasks. + +def get_task(id): + return IMPL.get_task(id) + + +def get_tasks(**kwargs): + return IMPL.get_tasks(**kwargs) + + +def create_task(values): + return IMPL.create_task(values) + + +def update_task(id, values): + return IMPL.update_task(id, values) + + +def delete_task(id): + return IMPL.delete_task(id) diff --git a/mistral/db/v2/sqlalchemy/__init__.py b/mistral/db/v2/sqlalchemy/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/db/v2/sqlalchemy/api.py b/mistral/db/v2/sqlalchemy/api.py new file mode 100644 index 00000000..a093a619 --- /dev/null +++ b/mistral/db/v2/sqlalchemy/api.py @@ -0,0 +1,286 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys + +from oslo.db import exception as db_exc +import sqlalchemy as sa + +from mistral import context +from mistral.db.sqlalchemy import base as b +from mistral.db.v2.sqlalchemy import models +from mistral import exceptions as exc +from mistral.openstack.common import log as logging + + +LOG = logging.getLogger(__name__) + + +def get_backend(): + """Consumed by openstack common code. + + The backend is this module itself. + :return Name of db backend. + """ + return sys.modules[__name__] + + +def setup_db(): + try: + models.Workbook.metadata.create_all(b.get_engine()) + except sa.exc.OperationalError as e: + raise exc.DBException("Failed to setup database: %s" % e) + + +def drop_db(): + global _facade + + try: + # TODO(rakhmerov): How to setup for multiple versions? + models.Workbook.metadata.drop_all(b.get_engine()) + _facade = None + except Exception as e: + raise exc.DBException("Failed to drop database: %s" + e) + + +# Transaction management. + +def start_tx(): + b.start_tx() + + +def commit_tx(): + b.commit_tx() + + +def rollback_tx(): + b.rollback_tx() + + +def end_tx(): + b.end_tx() + + +# Workbooks. + +def get_workbook(name): + wb = _get_workbook(name) + + if not wb: + raise exc.NotFoundException( + "Workbook not found [workbook_name=%s]" % name) + + return wb + + +def get_workbooks(**kwargs): + return _get_workbooks(**kwargs) + + +@b.session_aware() +def create_workbook(values, session=None): + wb = models.Workbook() + + wb.update(values.copy()) + wb['project_id'] = context.ctx().project_id + + try: + wb.save(session=session) + except db_exc.DBDuplicateEntry as e: + raise exc.DBDuplicateEntry("Duplicate entry for Workbook: %s" + % e.columns) + + return wb + + +@b.session_aware() +def update_workbook(name, values, session=None): + wb = _get_workbook(name) + + if not wb: + raise exc.NotFoundException( + "Workbook not found [workbook_name=%s]" % name) + + wb.update(values.copy()) + wb['project_id'] = context.ctx().project_id + + return wb + + +@b.session_aware() +def delete_workbook(name, session=None): + wb = _get_workbook(name) + + if not wb: + raise exc.NotFoundException( + "Workbook not found [workbook_name=%s]" % name) + + session.delete(wb) + + +def _get_workbooks(**kwargs): + query = b.model_query(models.Workbook) + proj = query.filter_by(project_id=context.ctx().project_id, + **kwargs) + public = query.filter_by(scope='public', **kwargs) + + return proj.union(public).all() + + +@b.session_aware() +def _get_workbook(name, session=None): + query = b.model_query(models.Workbook) + + return query.filter_by(name=name, + project_id=context.ctx().project_id).first() + + +# Executions. + +def get_execution(id): + execution = _get_execution(id) + + if not execution: + raise exc.NotFoundException( + "Execution not found [execution_id=%s]" % id) + + return execution + + +def ensure_execution_exists(id): + get_execution(id) + + +def get_executions(**kwargs): + return _get_executions(**kwargs) + + +@b.session_aware() +def create_execution(values, session=None): + execution = models.Execution() + + execution.update(values.copy()) + + try: + execution.save(session=session) + except db_exc.DBDuplicateEntry as e: + raise exc.DBDuplicateEntry("Duplicate entry for Execution: %s" + % e.columns) + + return execution + + +@b.session_aware() +def update_execution(id, values, session=None): + execution = _get_execution(id) + + if not execution: + raise exc.NotFoundException( + "Execution not found [execution_id=%s]" % id) + + execution.update(values.copy()) + + return execution + + +@b.session_aware() +def delete_execution(id, session=None): + execution = _get_execution(id) + + if not execution: + raise exc.NotFoundException( + "Execution not found [execution_id=%s]" % id) + + session.delete(execution) + + +def _get_executions(**kwargs): + query = b.model_query(models.Execution) + + return query.filter_by(**kwargs).all() + + +def _get_execution(id): + query = b.model_query(models.Execution) + + return query.filter_by(id=id).first() + + +# Tasks. + +def get_task(id): + task = _get_task(id) + + if not task: + raise exc.NotFoundException( + "Task not found [task_id=%s]" % id) + + return task + + +def get_tasks(**kwargs): + return _get_tasks(**kwargs) + + +@b.session_aware() +def create_task(values, session=None): + task = models.Task() + + task.update(values) + + try: + task.save(session=session) + except db_exc.DBDuplicateEntry as e: + raise exc.DBDuplicateEntry("Duplicate entry for Task: %s" + % e.columns) + + return task + + +@b.session_aware() +def update_task(id, values, session=None): + task = _get_task(id) + + if not task: + raise exc.NotFoundException( + "Task not found [task_id=%s]" % id) + + task.update(values.copy()) + + return task + + +@b.session_aware() +def delete_task(id, session=None): + task = _get_task(id) + + if not task: + raise exc.NotFoundException( + "Task not found [task_id=%s]" % id) + + session.delete(task) + + +def _get_task(id): + query = b.model_query(models.Task) + + return query.filter_by(id=id).first() + + +def _get_tasks(**kwargs): + query = b.model_query(models.Task) + + return query.filter_by(**kwargs).all() diff --git a/mistral/db/v2/sqlalchemy/models.py b/mistral/db/v2/sqlalchemy/models.py new file mode 100644 index 00000000..921db969 --- /dev/null +++ b/mistral/db/v2/sqlalchemy/models.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2013 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy as sa +from sqlalchemy.orm import relationship + +from mistral.db.sqlalchemy import model_base as mb +from mistral.db.sqlalchemy import types as st + + +class Workbook(mb.MistralBase): + """Contains info about workbook (including definition in Mistral DSL).""" + + __tablename__ = 'workbooks_v2' + + __table_args__ = ( + sa.UniqueConstraint('name'), + ) + + id = mb._id_column() + name = sa.Column(sa.String(80), primary_key=True) + definition = sa.Column(sa.Text(), nullable=True) + spec = sa.Column(st.JsonDictType()) + description = sa.Column(sa.String(200)) + tags = sa.Column(st.JsonListType()) + scope = sa.Column(sa.String(80)) + project_id = sa.Column(sa.String(80)) + trust_id = sa.Column(sa.String(80)) + + +class Execution(mb.MistralBase): + """Contains workflow execution information.""" + + __tablename__ = 'executions_v2' + + id = mb._id_column() + wf_spec = sa.Column(st.JsonDictType()) + start_params = sa.Column(st.JsonDictType()) + state = sa.Column(sa.String(20)) + context = sa.Column(st.JsonDictType()) + + +class Task(mb.MistralBase): + """Contains task runtime information.""" + + __tablename__ = 'tasks_v2' + + # Main properties. + id = mb._id_column() + name = sa.Column(sa.String(80)) + requires = sa.Column(st.JsonListType()) + wf_name = sa.Column(sa.String(80)) + spec = sa.Column(st.JsonDictType()) + action_spec = sa.Column(st.JsonDictType()) + state = sa.Column(sa.String(20)) + tags = sa.Column(st.JsonListType()) + + # Data Flow properties. + in_context = sa.Column(st.JsonDictType()) + parameters = sa.Column(st.JsonDictType()) + output = sa.Column(st.JsonDictType()) + + # Runtime context like iteration_no of a repeater. + # Effectively internal engine properties which will be used to determine + # execution of a task. + runtime_context = sa.Column(st.JsonDictType()) + + # Relations. + execution_id = sa.Column(sa.String(36), sa.ForeignKey('executions_v2.id')) + execution = relationship('Execution', backref="tasks", lazy='joined') diff --git a/mistral/exceptions.py b/mistral/exceptions.py index df88fbdf..d9094882 100644 --- a/mistral/exceptions.py +++ b/mistral/exceptions.py @@ -47,6 +47,10 @@ class MistralException(Error): '%d: %s' % (self.http_code, self.message)) +class DBException(MistralException): + http_code = 400 + + class DataAccessException(MistralException): http_code = 400 diff --git a/mistral/tests/api/base.py b/mistral/tests/api/base.py index 491094cc..d5828eb2 100644 --- a/mistral/tests/api/base.py +++ b/mistral/tests/api/base.py @@ -20,7 +20,7 @@ import pecan import pecan.testing from webtest import app as webtest_app -from mistral.db.sqlalchemy import models as m +from mistral.db.v1.sqlalchemy import models from mistral.tests import base # Disable authentication for functional tests. @@ -32,7 +32,7 @@ __all__ = ['FunctionalTest'] # Group of methods to mock DB API calls. def create_db_workbook(values): - wb = m.Workbook() + wb = models.Workbook() wb.update(values) return wb @@ -48,7 +48,7 @@ def create_mock_workbooks(arr_of_values): def create_db_execution(values): - ex = m.WorkflowExecution() + ex = models.WorkflowExecution() ex.update(values) return ex @@ -64,7 +64,7 @@ def create_mock_executions(arr_of_values): def create_db_task(values): - t = m.Task() + t = models.Task() t.update(values) return t diff --git a/mistral/tests/api/test_auth.py b/mistral/tests/api/test_auth.py index 4d23a425..754980a6 100644 --- a/mistral/tests/api/test_auth.py +++ b/mistral/tests/api/test_auth.py @@ -22,7 +22,7 @@ import pecan import pecan.testing from mistral.db import api as db_api -from mistral.db.sqlalchemy import models as m +from mistral.db.v1.sqlalchemy import models from mistral.openstack.common import timeutils from mistral.tests.api import base @@ -62,7 +62,7 @@ PKI_TOKEN_VERIFIED = { def get_mock_workbook(values): - wb = m.Workbook() + wb = models.Workbook() wb.update(values) return wb diff --git a/mistral/tests/base.py b/mistral/tests/base.py index 2efd8788..3121b5ca 100644 --- a/mistral/tests/base.py +++ b/mistral/tests/base.py @@ -16,7 +16,6 @@ import pkg_resources as pkg import sys -import tempfile from oslo.config import cfg from oslo import messaging @@ -26,7 +25,8 @@ from stevedore import driver import testtools.matchers as ttm from mistral import context as auth_context -from mistral.db.sqlalchemy import api as db_api +from mistral.db.sqlalchemy import base as db_sa_base +from mistral.db.v1 import api as db_api_v1 from mistral import engine from mistral.engine import executor from mistral.openstack.common import log as logging @@ -105,11 +105,11 @@ class BaseTest(base.BaseTestCase): class DbTestCase(BaseTest): def setUp(self): super(DbTestCase, self).setUp() - _db_fd, self.db_path = tempfile.mkstemp() - cfg.CONF.set_default('connection', 'sqlite:///' + self.db_path, - group='database') - db_api.setup_db() - self.addCleanup(db_api.drop_db) + + cfg.CONF.set_default('connection', 'sqlite://', group='database') + db_api_v1.setup_db() + + self.addCleanup(db_api_v1.drop_db) self.ctx = auth_context.MistralContext(user_id='1-2-3-4', project_id='5-6-7-8', @@ -120,7 +120,7 @@ class DbTestCase(BaseTest): self.addCleanup(auth_context.set_ctx, None) def is_db_session_open(self): - return db_api._get_thread_local_session() is not None + return db_sa_base._get_thread_local_session() is not None class EngineTestCase(DbTestCase): diff --git a/mistral/tests/unit/actions/test_action_factory.py b/mistral/tests/unit/actions/test_action_factory.py index 13e8584b..8ef72da5 100644 --- a/mistral/tests/unit/actions/test_action_factory.py +++ b/mistral/tests/unit/actions/test_action_factory.py @@ -19,7 +19,7 @@ import json from mistral.actions import action_factory as a_f from mistral.actions import std_actions as std -from mistral.db.sqlalchemy import models +from mistral.db.v1.sqlalchemy import models from mistral.engine import data_flow from mistral import exceptions from mistral.openstack.common import log as logging diff --git a/mistral/tests/unit/db/test_sqlalchemy_db_api.py b/mistral/tests/unit/db/test_sqlalchemy_db_api.py index 6c638399..039c0d6c 100644 --- a/mistral/tests/unit/db/test_sqlalchemy_db_api.py +++ b/mistral/tests/unit/db/test_sqlalchemy_db_api.py @@ -15,7 +15,7 @@ # limitations under the License. from mistral import context as auth_context -from mistral.db.sqlalchemy import api as db_api +from mistral.db.v1.sqlalchemy import api as db_api from mistral import exceptions as exc from mistral.openstack.common import timeutils from mistral.tests import base as test_base diff --git a/mistral/tests/unit/engine/default/test_engine.py b/mistral/tests/unit/engine/default/test_engine.py index 720bb989..42a35271 100644 --- a/mistral/tests/unit/engine/default/test_engine.py +++ b/mistral/tests/unit/engine/default/test_engine.py @@ -19,7 +19,7 @@ from oslo.config import cfg from mistral.actions import std_actions from mistral import context as auth_context from mistral.db import api as db_api -from mistral.db.sqlalchemy import models +from mistral.db.v1.sqlalchemy import models from mistral import engine from mistral.engine.drivers.default import engine as concrete_engine from mistral.engine import executor diff --git a/mistral/tests/unit/engine/test_data_flow_module.py b/mistral/tests/unit/engine/test_data_flow_module.py index a2667f52..a226815f 100644 --- a/mistral/tests/unit/engine/test_data_flow_module.py +++ b/mistral/tests/unit/engine/test_data_flow_module.py @@ -17,7 +17,7 @@ import copy from mistral.db import api as db_api -from mistral.db.sqlalchemy import models +from mistral.db.v1.sqlalchemy import models from mistral.engine import data_flow from mistral.engine import states from mistral.openstack.common import log as logging diff --git a/mistral/tests/unit/engine/test_task_retry.py b/mistral/tests/unit/engine/test_task_retry.py index 558ef62a..b11db133 100644 --- a/mistral/tests/unit/engine/test_task_retry.py +++ b/mistral/tests/unit/engine/test_task_retry.py @@ -20,7 +20,7 @@ from oslo.config import cfg from mistral.actions import std_actions from mistral.db import api as db_api -from mistral.db.sqlalchemy import models as m +from mistral.db.v1.sqlalchemy import models as m from mistral import engine from mistral.engine.drivers.default import engine as concrete_engine from mistral.engine import states diff --git a/mistral/tests/unit/engine/test_transport.py b/mistral/tests/unit/engine/test_transport.py index df8389b5..88084809 100644 --- a/mistral/tests/unit/engine/test_transport.py +++ b/mistral/tests/unit/engine/test_transport.py @@ -24,7 +24,7 @@ eventlet.monkey_patch() from mistral.actions import std_actions from mistral.cmd import launch from mistral.db import api as db_api -from mistral.db.sqlalchemy import models +from mistral.db.v1.sqlalchemy import models from mistral.engine import states from mistral.openstack.common import importutils from mistral.openstack.common import log as logging diff --git a/mistral/tests/unit/workflow/test_reverse_workflow.py b/mistral/tests/unit/workflow/test_reverse_workflow.py index c177b796..3473bdaa 100644 --- a/mistral/tests/unit/workflow/test_reverse_workflow.py +++ b/mistral/tests/unit/workflow/test_reverse_workflow.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from mistral.db.sqlalchemy import models as m +from mistral.db.v2.sqlalchemy import models from mistral.openstack.common import log as logging from mistral.tests import base from mistral.workbook import parser as spec_parser @@ -32,7 +32,7 @@ class ReverseWorkflowHandlerTest(base.BaseTest): base.get_resource('dsl_v2/reverse_workflow.yaml') ) - exec_db = m.WorkflowExecution() + exec_db = models.Execution() exec_db.update({ 'id': '1-2-3-4', 'wf_spec': wf_spec.to_dict()