Implement climate db api

And make sqlalchemy implementation

Change-Id: I2f52b4ebd4bc5937e45c34c78f56864f3e91b7ae
This commit is contained in:
Nikolaj Starodubtsev 2013-09-05 13:59:17 +04:00 committed by Dina Belova
parent 62d1f038a3
commit bc8cfb861b
13 changed files with 1134 additions and 1 deletions

14
climate/db/__init__.py Normal file
View File

@ -0,0 +1,14 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -0,0 +1,222 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Defines interface for DB access.
Functions in this module are imported into the climate.db namespace. Call these
functions from climate.db namespace, not the climate.db.api namespace.
All functions in this module return objects that implement a dictionary-like
interface.
**Related Flags**
:db_backend: string to lookup in the list of LazyPluggable backends.
`sqlalchemy` is the only supported backend right now.
:sql_connection: string specifying the sqlalchemy connection to use, like:
`sqlite:///var/lib/climate/climate.sqlite`.
"""
from oslo.config import cfg
from climate.openstack.common.db import api as db_api
from climate.openstack.common import log as logging
CONF = cfg.CONF
_BACKEND_MAPPING = {
'sqlalchemy': 'climate.db.sqlalchemy.api',
}
IMPL = db_api.DBAPI(backend_mapping=_BACKEND_MAPPING)
LOG = logging.getLogger(__name__)
def setup_db():
"""Set up database, create tables, etc.
Return True on success, False otherwise
"""
return IMPL.setup_db()
def drop_db():
"""Drop database.
Return True on success, False otherwise
"""
return IMPL.drop_db()
## Helpers for building constraints / equality checks
def constraint(**conditions):
"""Return a constraint object suitable for use with some updates."""
return IMPL.constraint(**conditions)
def equal_any(*values):
"""Return an equality condition object suitable for use in a constraint.
Equal_any conditions require that a model object's attribute equal any
one of the given values.
"""
return IMPL.equal_any(*values)
def not_equal(*values):
"""Return an inequality condition object suitable for use in a constraint.
Not_equal conditions require that a model object's attribute differs from
all of the given values.
"""
return IMPL.not_equal(*values)
def to_dict(func):
def decorator(*args, **kwargs):
res = func(*args, **kwargs)
if isinstance(res, list):
return [item.to_dict() for item in res]
if res:
return res.to_dict()
else:
return None
return decorator
#Reservation
def reservation_create(context, reservation_values):
"""Create a reservation from the values"""
return IMPL.reservation_create(context, reservation_values)
@to_dict
def reservation_get_all_by_lease(context, lease_id):
"""Return all reservations belongs to specific lease"""
return IMPL.reservation_get_all_by_lease(context, lease_id)
@to_dict
def reservation_get(context, reservation_id):
"""Return specific reservation"""
return IMPL.reservation_get(context, reservation_id)
def reservation_destroy(context, reservation_id):
"""Delete specific reservation"""
IMPL.reservation_destroy(context, reservation_id)
def reservation_update(context, reservation_id, reservation_values):
"""Update reservation"""
IMPL.reservation_update(context, reservation_id, reservation_values)
#Lease
def lease_create(context, lease_values):
"""Create a lease from values"""
return IMPL.lease_create(context, lease_values)
@to_dict
def lease_get_all(context):
"""Return all leases"""
return IMPL.lease_get_all(context)
@to_dict
def lease_get_all_by_tenant(context, tenant_id):
"""Return all leases in specific tenant"""
return IMPL.lease_get_all_by_tenant(context, tenant_id)
@to_dict
def lease_get_all_by_user(context, user_id):
"""Return all leases belongs to specific user"""
return IMPL.lease_get_all_by_user(context, user_id)
@to_dict
def lease_get(context, lease_id):
"""Return lease"""
return IMPL.lease_get(context, lease_id)
@to_dict
def lease_list(context):
"""Return a list of all existing leases"""
return IMPL.lease_list(context)
def lease_destroy(context, lease_id):
"""Delete lease or raise if not exists"""
IMPL.lease_destroy(context, lease_id)
def lease_update(context, lease_id, lease_values):
"""Update lease or raise if not exists"""
IMPL.lease_update(context, lease_id, lease_values)
#Events
@to_dict
def event_create(context, event_values):
"""Create an event from values"""
return IMPL.event_create(context, event_values)
@to_dict
def event_get_all(context):
"""Return all events"""
return IMPL.event_get_all(context)
@to_dict
def event_get(context, event_id):
"""Return a specific event"""
return IMPL.event_get(context, event_id)
@to_dict
def event_get_all_sorted_by_filters(context, sort_key=None, filters=None):
"""Return instances sorted by param"""
return IMPL.event_get_all_sorted_by_filters(context, sort_key, filters)
@to_dict
def event_list(context, param):
"""Return a list of events"""
return IMPL.event_list(context)
def event_destroy(context, event_id):
"""Delete event or raise if not exists"""
IMPL.event_destroy(context, event_id)
def event_update(context, event_id, event_values):
"""Update event or raise if not exists"""
IMPL.event_update(context, event_id, event_values)

34
climate/db/base.py Normal file
View File

@ -0,0 +1,34 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base class for classes that need modular database access."""
from oslo.config import cfg
from climate.openstack.common import importutils
db_driver_opt = cfg.StrOpt('db_driver',
default='climate.db',
help='Driver to use for database access')
CONF = cfg.CONF
CONF.register_opt(db_driver_opt)
class Base(object):
"""DB driver is injected in the init method."""
def __init__(self):
self.db = importutils.import_module(CONF.db_driver)

View File

View File

@ -0,0 +1,51 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from climate.db import api as db_api
CONF = cfg.CONF
def map_status(status):
return 'Success' if status else 'Fail'
def db_sync():
drop_status = db_api.drop_db()
print("Dropping database: %s" % map_status(drop_status))
start_status = db_api.setup_db()
print("Creating database: %s" % map_status(start_status))
def add_command_parsers(subparsers):
parser = subparsers.add_parser('db-sync')
parser.set_defaults(func=db_sync)
command_opt = cfg.SubCommandOpt('command',
title='Command',
help='Available commands',
handler=add_command_parsers)
CONF.register_cli_opt(command_opt)
def main():
CONF()
CONF.command.func()

View File

View File

@ -0,0 +1,362 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of SQLAlchemy backend."""
import sys
import sqlalchemy as sa
from sqlalchemy.sql.expression import asc
from sqlalchemy.sql.expression import desc
from climate.db.sqlalchemy import models
from climate.openstack.common.db import exception as db_exc
from climate.openstack.common.db.sqlalchemy import session as db_session
from climate.openstack.common import log as logging
LOG = logging.getLogger(__name__)
get_engine = db_session.get_engine
get_session = db_session.get_session
def get_backend():
"""The backend is this module itself."""
return sys.modules[__name__]
def model_query(model, context, session=None, project_only=None):
"""Query helper.
:param model: base model to query
:param context: context to query under
:param project_only: if present and context is user-type, then restrict
query to match the context's project_id.
"""
session = session or get_session()
query = session.query(model)
if project_only:
query = query.filter_by(tenant_id=context.project_id)
return query
def column_query(context, *columns, **kwargs):
session = kwargs.get("session") or get_session()
query = session.query(*columns)
if kwargs.get("project_only"):
query = query.filter_by(tenant_id=context.tenant_id)
return query
def setup_db():
try:
engine = db_session.get_engine(sqlite_fk=True)
models.Lease.metadata.create_all(engine)
except sa.exc.OperationalError as e:
LOG.error("Database registration exception: %s", e)
return False
return True
def drop_db():
try:
engine = db_session.get_engine(sqlite_fk=True)
models.Lease.metadata.drop_all(engine)
except Exception as e:
LOG.error("Database shutdown exception: %s", e)
return False
return True
## Helpers for building constraints / equality checks
def constraint(**conditions):
return Constraint(conditions)
def equal_any(*values):
return EqualityCondition(values)
def not_equal(*values):
return InequalityCondition(values)
class Constraint(object):
def __init__(self, conditions):
self.conditions = conditions
def apply(self, model, query):
for key, condition in self.conditions.iteritems():
for clause in condition.clauses(getattr(model, key)):
query = query.filter(clause)
return query
class EqualityCondition(object):
def __init__(self, values):
self.values = values
def clauses(self, field):
return sa.or_([field == value for value in self.values])
class InequalityCondition(object):
def __init__(self, values):
self.values = values
def clauses(self, field):
return [field != value for value in self.values]
#Reservation
def _reservation_get(context, session, reservation_id):
query = model_query(models.Reservation, context, session)
return query.filter_by(id=reservation_id).first()
def reservation_get(context, reservation_id):
return _reservation_get(context, get_session(), reservation_id)
def reservation_get_all(context):
query = model_query(models.Reservation, context, get_session())
return query.all()
def reservation_get_all_by_lease_id(context, lease_id):
reservations = model_query(models.Reservation, context, get_session()).\
all()
result = []
for r in reservations:
if r['lease_id'] == lease_id:
result = result.append(r)
return result
def reservation_create(context, values):
values = values.copy()
reservation = models.Reservation()
reservation.update(values)
session = get_session()
with session.begin():
try:
reservation.save(session=session)
except db_exc.DBDuplicateEntry as e:
# raise exception about duplicated columns (e.columns)
raise RuntimeError("DBDuplicateEntry: %s" % e.columns)
return reservation_get(context, reservation.id)
def reservation_update(context, reservation_id, values):
session = get_session()
with session.begin():
reservation = _reservation_get(context, session, reservation_id)
reservation.update(values)
reservation.save(session=session)
return reservation_get(context, reservation_id)
def reservation_destroy(context, reservation_id):
session = get_session()
with session.begin():
reservation = _reservation_get(context, session, reservation_id)
if not reservation:
# raise not found error
raise RuntimeError("Reservation not found!")
session.delete(reservation)
#Lease
def _lease_get(context, session, lease_id):
query = model_query(models.Lease, context, session)
return query.filter_by(id=lease_id).first()
def lease_get(context, lease_id):
return _lease_get(context, get_session(), lease_id)
def lease_get_all(context):
query = model_query(models.Lease, context, get_session())
return query.all()
def lease_get_all_by_tenant(context, tenant_id):
raise NotImplementedError
def lease_get_all_by_user(context, user_id):
raise NotImplementedError
def lease_list(context):
return model_query(models.Lease, context, get_session()).all()
def lease_create(context, values):
values = values.copy()
lease = models.Lease()
reservations = values.pop("reservations", [])
events = values.pop("events", [])
lease.update(values)
session = get_session()
with session.begin():
try:
lease.save(session=session)
for r in reservations:
reservation = models.Reservation()
reservation.update({"lease_id": lease.id})
reservation.update(r)
reservation.save(session=session)
for e in events:
event = models.Event()
event.update({"lease_id": lease.id})
event.update(e)
event.save(session=session)
except db_exc.DBDuplicateEntry as e:
# raise exception about duplicated columns (e.columns)
raise RuntimeError("DBDuplicateEntry: %s" % e.columns)
return lease_get(context, lease.id)
def lease_update(context, lease_id, values):
session = get_session()
with session.begin():
lease = _lease_get(context, session, lease_id)
lease.update(values)
lease.save(session=session)
return lease_get(context, lease_id)
def lease_destroy(context, lease_id):
session = get_session()
with session.begin():
lease = _lease_get(context, session, lease_id)
if not lease:
# raise not found error
raise RuntimeError("Lease not found!")
session.delete(lease)
#Event
def _event_get(context, session, event_id):
query = model_query(models.Event, context, session)
return query.filter_by(id=event_id).first()
def _event_get_all(context, session):
query = model_query(models.Event, context, session)
return query
def event_get(context, event_id):
return _event_get(context, get_session(), event_id)
def event_get_all(context):
return _event_get_all(context, get_session()).all()
def event_get_all_sorted_by_filters(context, sort_key, sort_dir, filters):
"""Return events filtered and sorted by name of the field."""
sort_fn = {'desc': desc, 'asc': asc}
events_query = _event_get_all(context, get_session())
if 'status' in filters:
events_query = \
events_query.filter(models.Event.status == filters['status'])
events_query = events_query.order_by(
sort_fn[sort_dir](getattr(models.Event, sort_key))
)
return events_query.all()
def event_list(context):
events = model_query(models.Event, context, get_session()).all()
result = []
for ev in events:
result = result.append(ev['id'])
return result
def event_create(context, values):
values = values.copy()
event = models.Event()
event.update(values)
session = get_session()
with session.begin():
try:
event.save(session=session)
except db_exc.DBDuplicateEntry as e:
# raise exception about duplicated columns (e.columns)
raise RuntimeError("DBDuplicateEntry: %s" % e.columns)
return event_get(context, event.id)
def event_update(context, event_id, values):
session = get_session()
with session.begin():
event = _event_get(context, session, event_id)
event.update(values)
event.save(session=session)
return event_get(context, event_id)
def event_destroy(context, event_id):
session = get_session()
with session.begin():
event = _event_get(context, session, event_id)
if not event:
# raise not found error
raise RuntimeError("Event not found!")
session.delete(event)

View File

@ -0,0 +1,47 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from climate.openstack.common.db.sqlalchemy import models as oslo_models
from sqlalchemy.ext import declarative
from sqlalchemy import inspect
class _ClimateBase(oslo_models.ModelBase, oslo_models.TimestampMixin):
"""Base class for all Climate SQLAlchemy DB Models."""
def to_dict(self):
"""sqlalchemy based automatic to_dict method."""
d = {}
# if a column is unloaded at this point, it is
# probably deferred. We do not want to access it
# here and thereby cause it to load...
unloaded = inspect(self).unloaded
for col in self.__table__.columns:
if col.name not in unloaded:
d[col.name] = getattr(self, col.name)
datetime_to_str(d, 'created_at')
datetime_to_str(d, 'updated_at')
return d
def datetime_to_str(dct, attr_name):
if dct.get(attr_name) is not None:
dct[attr_name] = dct[attr_name].isoformat(' ')
ClimateBase = declarative.declarative_base(cls=_ClimateBase)

View File

@ -0,0 +1,92 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy as sa
from sqlalchemy.orm import relationship
from climate.db.sqlalchemy import model_base as mb
from climate.openstack.common import uuidutils
## Helpers
def _generate_unicode_uuid():
return unicode(uuidutils.generate_uuid())
def _id_column():
return sa.Column(sa.String(36),
primary_key=True,
default=_generate_unicode_uuid)
## Main objects: Lease, Reservation, Event
class Lease(mb.ClimateBase):
"""Contains all info about lease."""
__tablename__ = 'leases'
__table_args__ = (
sa.UniqueConstraint('name'),
)
id = _id_column()
name = sa.Column(sa.String(80), nullable=False)
start_date = sa.Column(sa.DateTime, nullable=False)
end_date = sa.Column(sa.DateTime, nullable=False)
trust = sa.Column(sa.String(36), nullable=False)
reservations = relationship('Reservation', cascade="all,delete",
backref='lease', lazy='joined')
events = relationship('Event', cascade="all,delete",
backref='lease', lazy='joined')
def to_dict(self):
d = super(Lease, self).to_dict()
d['reservations'] = [r.to_dict() for r in self.reservations]
d['events'] = [e.to_dict() for e in self.events]
return d
class Reservation(mb.ClimateBase):
"""Specifies group of nodes within a cluster."""
__tablename__ = 'reservations'
id = _id_column()
lease_id = sa.Column(sa.String(36),
sa.ForeignKey('leases.id'),
nullable=False)
resource_id = sa.Column(sa.String(36))
resource_type = sa.Column(sa.String(66))
status = sa.Column(sa.String(13))
def to_dict(self):
return super(Reservation, self).to_dict()
class Event(mb.ClimateBase):
"""An events occurring with the lease."""
__tablename__ = 'events'
id = _id_column()
lease_id = sa.Column(sa.String(36), sa.ForeignKey('leases.id'))
event_type = sa.Column(sa.String(66))
time = sa.Column(sa.DateTime)
status = sa.Column(sa.String(13))
def to_dict(self):
return super(Event, self).to_dict()

View File

@ -0,0 +1,34 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sqlalchemy as sa
from climate.openstack.common import jsonutils
class JsonEncoded(sa.TypeDecorator):
"""Represents an immutable structure as a json-encoded string."""
impl = sa.Text
def process_bind_param(self, value, dialect):
if value is not None:
value = jsonutils.dumps(value)
return value
def process_result_value(self, value, dialect):
if value is not None:
value = jsonutils.loads(value)
return value

View File

@ -0,0 +1,276 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import functools
import os
import time
import weakref
from eventlet import semaphore
from oslo.config import cfg
from climate.openstack.common import fileutils
from climate.openstack.common.gettextutils import _ # noqa
from climate.openstack.common import local
from climate.openstack.common import log as logging
LOG = logging.getLogger(__name__)
util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
help=('Directory to use for lock files.'))
]
CONF = cfg.CONF
CONF.register_opts(util_opts)
def set_defaults(lock_path):
cfg.set_defaults(util_opts, lock_path=lock_path)
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
descriptor rather than outside of the process, the lock gets dropped
automatically if the process crashes, even if __exit__ is not executed.
There are no guarantees regarding usage by multiple green threads in a
single process here. This lock works only between processes. Exclusive
access between local threads should be achieved using the semaphores
in the @synchronized decorator.
Note these locks are released when the descriptor is closed, so it's not
safe to close the file descriptor while another green thread holds the
lock. Just opening and closing the lock file can break synchronisation,
so lock files must be accessed only using this abstraction.
"""
def __init__(self, name):
self.lockfile = None
self.fname = name
def __enter__(self):
self.lockfile = open(self.fname, 'w')
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
# external locks synchronise things like iptables
# updates - give it some time to prevent busy spinning
time.sleep(0.01)
else:
raise
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.unlock()
self.lockfile.close()
except IOError:
LOG.exception(_("Could not release the acquired lock `%s`"),
self.fname)
def trylock(self):
raise NotImplementedError()
def unlock(self):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
def trylock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
def trylock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self):
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
else:
import fcntl
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `semaphore.Semaphore` instance unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@synchronized('mylock')
def foo(self, *args):
...
ensures that only one thread will execute the foo method at a time.
Different methods can share the same lock::
@synchronized('mylock')
def foo(self, *args):
...
@synchronized('mylock')
def bar(self, *args):
...
This way only one of either foo or bar can be executing at a time.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
{'function': f.__name__})
return f(*args, **kwargs)
LOG.debug(_('Semaphore / lock released "%(function)s"'),
{'function': f.__name__})
return inner
return wrap
def synchronized_with_prefix(lock_file_prefix):
"""Partial object generator for the synchronization decorator.
Redefine @synchronized in each project like so::
(in nova/utils.py)
from nova.openstack.common import lockutils
synchronized = lockutils.synchronized_with_prefix('nova-')
(in nova/foo.py)
from nova import utils
@utils.synchronized('mylock')
def bar(self, *args):
...
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix.
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)

View File

@ -1,3 +1,3 @@
[DEFAULT]
modules=rpc,service,setup,gettextutils,importutils,local,eventlet_backdoor,log,jsonutils,policy,timeutils,notifier,threadgroup,loopingcall,network_utils,excutils, db, db.sqlalchemy
modules=rpc,service,setup,gettextutils,importutils,local,eventlet_backdoor,log,jsonutils,policy,timeutils,notifier,threadgroup,loopingcall,network_utils,excutils, db, db.sqlalchemy, lockutils
base=climate

View File

@ -2,3 +2,4 @@ eventlet
iso8601
oslo.config>=1.1.0
python-novaclient
SQLAlchemy>=0.7.8,<0.8