diff --git a/murano/db/migration/alembic_migrations/versions/007_add_locks.py b/murano/db/migration/alembic_migrations/versions/007_add_locks.py new file mode 100644 index 00000000..297e9164 --- /dev/null +++ b/murano/db/migration/alembic_migrations/versions/007_add_locks.py @@ -0,0 +1,47 @@ +# Copyright 2015 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from alembic import op +import sqlalchemy as sa + +"""add_locks + +Revision ID: 007 +Revises: 006 +Create Date: 2015-04-08 14:01:06.458512 + +""" + +# revision identifiers, used by Alembic. +revision = '007' +down_revision = '006' + +MYSQL_ENGINE = 'InnoDB' +MYSQL_CHARSET = 'utf8' + + +def upgrade(): + op.create_table( + 'locks', + sa.Column('id', sa.String(length=50), nullable=False), + sa.Column('ts', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id'), + mysql_engine=MYSQL_ENGINE, + mysql_charset=MYSQL_CHARSET) + + +def downgrade(): + op.drop_table('locks') diff --git a/murano/db/models.py b/murano/db/models.py index f38163e2..134ac6c8 100644 --- a/murano/db/models.py +++ b/murano/db/models.py @@ -311,10 +311,16 @@ class Class(Base, TimestampMixin): package_id = sa.Column(sa.String(36), sa.ForeignKey('package.id')) +class Lock(Base): + __tablename__ = 'locks' + id = sa.Column(sa.String(50), primary_key=True) + ts = sa.Column(sa.DateTime, nullable=False) + + def register_models(engine): """Creates database tables for all models with the given engine.""" models = (Environment, Status, Session, Task, - ApiStats, Package, Category, Class, Instance) + ApiStats, Package, Category, Class, Instance, Lock) for model in models: model.metadata.create_all(engine) @@ -322,6 +328,6 @@ def register_models(engine): def unregister_models(engine): """Drops database tables for all models with the given engine.""" models = (Environment, Status, Session, Task, - ApiStats, Package, Category, Class) + ApiStats, Package, Category, Class, Lock) for model in models: model.metadata.drop_all(engine) diff --git a/murano/db/session.py b/murano/db/session.py index 8fc4b8bf..5e2b9302 100644 --- a/murano/db/session.py +++ b/murano/db/session.py @@ -15,10 +15,13 @@ """Session management functions.""" import threading +from oslo.db import exception from oslo.db import options from oslo.db.sqlalchemy import session as db_session +from oslo.utils import timeutils from murano.common import config +from murano.db.models import Lock from murano.openstack.common import log as logging LOG = logging.getLogger(__name__) @@ -29,6 +32,8 @@ options.set_defaults(CONF) _FACADE = None _LOCK = threading.Lock() +MAX_LOCK_RETRIES = 10 + def _create_facade_lazily(): global _LOCK, _FACADE @@ -49,3 +54,40 @@ def get_engine(): def get_session(**kwargs): facade = _create_facade_lazily() return facade.get_session(**kwargs) + + +def get_lock(name, session=None): + if session is None: + session = get_session() + nested = False + else: + nested = session.transaction is not None + return _get_or_create_lock(name, session, nested) + + +def _get_or_create_lock(name, session, nested, retry=0): + if nested: + session.begin_nested() + else: + session.begin() + existing = session.query(Lock).get(name) + if existing is None: + try: + # no lock found, creating a new one + lock = Lock(id=name, ts=timeutils.utcnow()) + lock.save(session) + return session.transaction + # lock created and acquired + except exception.DBDuplicateEntry: + session.rollback() + if retry >= MAX_LOCK_RETRIES: + raise + else: + # other transaction has created a lock, repeat to acquire + # via update + return _get_or_create_lock(name, session, nested, retry + 1) + else: + # lock found, acquiring by doing update + existing.ts = timeutils.utcnow() + existing.save(session) + return session.transaction