A Galera-compliant database locking solution

A common scenario in database interaction is to check some values in
database and update them if some condition is met (or is not met). To
prevent race conditions in such scenarios it is recomended to isolate
appropriate transactions at read level, so two "select" operations are
not executed in parallel. This may be achieved either by modifying
transaction_isolation_level or by doing "selects" with a "for update"
keyword, which puts a lock on the entries being selected thus blocking
any other selects which may run in parallel transaction.

The problem is that SELECT FOR UPDATE is not fully supported in Galera
clusters which are a common tool for building HA openstack environments.
Such statements may have non-determenistic results - see [1] for more
details.

This patch intoruduces a simple exclusive lock mechanics: it executes a
simple insert or update operation into a special table. Any other
transactions attempting to do the same will block until the first one
completes in one way or another. This results in a simple but reliable
locking solution, which may be not as effecient as dedicated distributed
lock systems but still do its job nicely.

Partial-bug: #1440094
Change-Id: Ib606f33b832b04a3ecac27e1e10018ea8fd503a5
This commit is contained in:
Alexander Tivelkov 2015-04-13 15:52:38 +03:00
parent 5f545f413f
commit f2ba54b62b
3 changed files with 97 additions and 2 deletions

View File

@ -0,0 +1,47 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from alembic import op
import sqlalchemy as sa
"""add_locks
Revision ID: 007
Revises: 006
Create Date: 2015-04-08 14:01:06.458512
"""
# revision identifiers, used by Alembic.
revision = '007'
down_revision = '006'
MYSQL_ENGINE = 'InnoDB'
MYSQL_CHARSET = 'utf8'
def upgrade():
op.create_table(
'locks',
sa.Column('id', sa.String(length=50), nullable=False),
sa.Column('ts', sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint('id'),
mysql_engine=MYSQL_ENGINE,
mysql_charset=MYSQL_CHARSET)
def downgrade():
op.drop_table('locks')

View File

@ -311,10 +311,16 @@ class Class(Base, TimestampMixin):
package_id = sa.Column(sa.String(36), sa.ForeignKey('package.id')) package_id = sa.Column(sa.String(36), sa.ForeignKey('package.id'))
class Lock(Base):
__tablename__ = 'locks'
id = sa.Column(sa.String(50), primary_key=True)
ts = sa.Column(sa.DateTime, nullable=False)
def register_models(engine): def register_models(engine):
"""Creates database tables for all models with the given engine.""" """Creates database tables for all models with the given engine."""
models = (Environment, Status, Session, Task, models = (Environment, Status, Session, Task,
ApiStats, Package, Category, Class, Instance) ApiStats, Package, Category, Class, Instance, Lock)
for model in models: for model in models:
model.metadata.create_all(engine) model.metadata.create_all(engine)
@ -322,6 +328,6 @@ def register_models(engine):
def unregister_models(engine): def unregister_models(engine):
"""Drops database tables for all models with the given engine.""" """Drops database tables for all models with the given engine."""
models = (Environment, Status, Session, Task, models = (Environment, Status, Session, Task,
ApiStats, Package, Category, Class) ApiStats, Package, Category, Class, Lock)
for model in models: for model in models:
model.metadata.drop_all(engine) model.metadata.drop_all(engine)

View File

@ -15,10 +15,13 @@
"""Session management functions.""" """Session management functions."""
import threading import threading
from oslo.db import exception
from oslo.db import options from oslo.db import options
from oslo.db.sqlalchemy import session as db_session from oslo.db.sqlalchemy import session as db_session
from oslo.utils import timeutils
from murano.common import config from murano.common import config
from murano.db.models import Lock
from murano.openstack.common import log as logging from murano.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -29,6 +32,8 @@ options.set_defaults(CONF)
_FACADE = None _FACADE = None
_LOCK = threading.Lock() _LOCK = threading.Lock()
MAX_LOCK_RETRIES = 10
def _create_facade_lazily(): def _create_facade_lazily():
global _LOCK, _FACADE global _LOCK, _FACADE
@ -49,3 +54,40 @@ def get_engine():
def get_session(**kwargs): def get_session(**kwargs):
facade = _create_facade_lazily() facade = _create_facade_lazily()
return facade.get_session(**kwargs) return facade.get_session(**kwargs)
def get_lock(name, session=None):
if session is None:
session = get_session()
nested = False
else:
nested = session.transaction is not None
return _get_or_create_lock(name, session, nested)
def _get_or_create_lock(name, session, nested, retry=0):
if nested:
session.begin_nested()
else:
session.begin()
existing = session.query(Lock).get(name)
if existing is None:
try:
# no lock found, creating a new one
lock = Lock(id=name, ts=timeutils.utcnow())
lock.save(session)
return session.transaction
# lock created and acquired
except exception.DBDuplicateEntry:
session.rollback()
if retry >= MAX_LOCK_RETRIES:
raise
else:
# other transaction has created a lock, repeat to acquire
# via update
return _get_or_create_lock(name, session, nested, retry + 1)
else:
# lock found, acquiring by doing update
existing.ts = timeutils.utcnow()
existing.save(session)
return session.transaction