Use lockutils from "oslo concurrency" lib
Module 'lockutils' was released within 'oslo concurrency' lib and removed from oslo incubator. Use it from lib. Changes: - Removed local copy of 'lockutils' copied from incubator. - Config opt 'lock path' moved to separate config group 'oslo_concurrency'. Now it can be set for Devstack installation also by "OSLO_LOCK_PATH" env var. - Removed explicit usage of 'lockutils.synchronized' within Manila and reused it from manila.utils. Change-Id: I21257bc8c0a41dc1bb342deadbcf279c9f3b670d
This commit is contained in:
parent
67ccd12e42
commit
76f8d3b948
@ -56,6 +56,7 @@ MANILACLIENT_BRANCH=${MANILACLIENT_BRANCH:-master}
|
||||
|
||||
# set up default directories
|
||||
MANILA_DIR=${MANILA_DIR:=$DEST/manila}
|
||||
MANILA_LOCK_PATH=${MANILA_LOCK_PATH:=$OSLO_LOCK_PATH}
|
||||
MANILA_LOCK_PATH=${MANILA_LOCK_PATH:=$MANILA_DIR/manila_locks}
|
||||
MANILACLIENT_DIR=${MANILACLIENT_DIR:=$DEST/python-manilaclient}
|
||||
MANILA_STATE_PATH=${MANILA_STATE_PATH:=$DATA_DIR/manila}
|
||||
@ -247,7 +248,7 @@ function configure_manila {
|
||||
iniset $MANILA_CONF DEFAULT cinder_admin_password $SERVICE_PASSWORD
|
||||
iniset $MANILA_CONF DEFAULT neutron_admin_password $SERVICE_PASSWORD
|
||||
|
||||
iniset $MANILA_CONF DEFAULT lock_path $MANILA_LOCK_PATH
|
||||
iniset $MANILA_CONF oslo_concurrency lock_path $MANILA_LOCK_PATH
|
||||
|
||||
# Note: set up config group does not mean that this backend will be enabled.
|
||||
# To enable it, specify its name explicitly using "enabled_share_backends" opt.
|
||||
|
@ -1,326 +0,0 @@
|
||||
# Copyright 2011 OpenStack Foundation.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import errno
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import threading
|
||||
import time
|
||||
import weakref
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from manila.openstack.common import fileutils
|
||||
from manila.openstack.common._i18n import _, _LE, _LI
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
util_opts = [
|
||||
cfg.BoolOpt('disable_process_locking', default=False,
|
||||
help='Enables or disables inter-process locks.'),
|
||||
cfg.StrOpt('lock_path',
|
||||
default=os.environ.get("MANILA_LOCK_PATH"),
|
||||
help='Directory to use for lock files.')
|
||||
]
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(util_opts)
|
||||
|
||||
|
||||
def set_defaults(lock_path):
|
||||
cfg.set_defaults(util_opts, lock_path=lock_path)
|
||||
|
||||
|
||||
class _FileLock(object):
|
||||
"""Lock implementation which allows multiple locks, working around
|
||||
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
||||
not require any cleanup. Since the lock is always held on a file
|
||||
descriptor rather than outside of the process, the lock gets dropped
|
||||
automatically if the process crashes, even if __exit__ is not executed.
|
||||
|
||||
There are no guarantees regarding usage by multiple green threads in a
|
||||
single process here. This lock works only between processes. Exclusive
|
||||
access between local threads should be achieved using the semaphores
|
||||
in the @synchronized decorator.
|
||||
|
||||
Note these locks are released when the descriptor is closed, so it's not
|
||||
safe to close the file descriptor while another green thread holds the
|
||||
lock. Just opening and closing the lock file can break synchronisation,
|
||||
so lock files must be accessed only using this abstraction.
|
||||
"""
|
||||
|
||||
def __init__(self, name):
|
||||
self.lockfile = None
|
||||
self.fname = name
|
||||
|
||||
def acquire(self):
|
||||
basedir = os.path.dirname(self.fname)
|
||||
|
||||
if not os.path.exists(basedir):
|
||||
fileutils.ensure_tree(basedir)
|
||||
LOG.info(_LI('Created lock path: %s'), basedir)
|
||||
|
||||
self.lockfile = open(self.fname, 'w')
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Using non-blocking locks since green threads are not
|
||||
# patched to deal with blocking locking calls.
|
||||
# Also upon reading the MSDN docs for locking(), it seems
|
||||
# to have a laughable 10 attempts "blocking" mechanism.
|
||||
self.trylock()
|
||||
LOG.debug('Got file lock "%s"', self.fname)
|
||||
return True
|
||||
except IOError as e:
|
||||
if e.errno in (errno.EACCES, errno.EAGAIN):
|
||||
# external locks synchronise things like iptables
|
||||
# updates - give it some time to prevent busy spinning
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
raise threading.ThreadError(_("Unable to acquire lock on"
|
||||
" `%(filename)s` due to"
|
||||
" %(exception)s") %
|
||||
{'filename': self.fname,
|
||||
'exception': e})
|
||||
|
||||
def __enter__(self):
|
||||
self.acquire()
|
||||
return self
|
||||
|
||||
def release(self):
|
||||
try:
|
||||
self.unlock()
|
||||
self.lockfile.close()
|
||||
LOG.debug('Released file lock "%s"', self.fname)
|
||||
except IOError:
|
||||
LOG.exception(_LE("Could not release the acquired lock `%s`"),
|
||||
self.fname)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.release()
|
||||
|
||||
def exists(self):
|
||||
return os.path.exists(self.fname)
|
||||
|
||||
def trylock(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def unlock(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class _WindowsLock(_FileLock):
|
||||
def trylock(self):
|
||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
|
||||
def unlock(self):
|
||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
|
||||
|
||||
class _FcntlLock(_FileLock):
|
||||
def trylock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
def unlock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
||||
|
||||
|
||||
if os.name == 'nt':
|
||||
import msvcrt
|
||||
InterProcessLock = _WindowsLock
|
||||
else:
|
||||
import fcntl
|
||||
InterProcessLock = _FcntlLock
|
||||
|
||||
_semaphores = weakref.WeakValueDictionary()
|
||||
_semaphores_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_lock_path(name, lock_file_prefix, lock_path=None):
|
||||
# NOTE(mikal): the lock name cannot contain directory
|
||||
# separators
|
||||
name = name.replace(os.sep, '_')
|
||||
if lock_file_prefix:
|
||||
sep = '' if lock_file_prefix.endswith('-') else '-'
|
||||
name = '%s%s%s' % (lock_file_prefix, sep, name)
|
||||
|
||||
local_lock_path = lock_path or CONF.lock_path
|
||||
|
||||
if not local_lock_path:
|
||||
raise cfg.RequiredOptError('lock_path')
|
||||
|
||||
return os.path.join(local_lock_path, name)
|
||||
|
||||
|
||||
def external_lock(name, lock_file_prefix=None, lock_path=None):
|
||||
LOG.debug('Attempting to grab external lock "%(lock)s"',
|
||||
{'lock': name})
|
||||
|
||||
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
|
||||
|
||||
return InterProcessLock(lock_file_path)
|
||||
|
||||
|
||||
def remove_external_lock_file(name, lock_file_prefix=None):
|
||||
"""Remove an external lock file when it's not used anymore
|
||||
This will be helpful when we have a lot of lock files
|
||||
"""
|
||||
with internal_lock(name):
|
||||
lock_file_path = _get_lock_path(name, lock_file_prefix)
|
||||
try:
|
||||
os.remove(lock_file_path)
|
||||
except OSError:
|
||||
LOG.info(_LI('Failed to remove file %(file)s'),
|
||||
{'file': lock_file_path})
|
||||
|
||||
|
||||
def internal_lock(name):
|
||||
with _semaphores_lock:
|
||||
try:
|
||||
sem = _semaphores[name]
|
||||
LOG.debug('Using existing semaphore "%s"', name)
|
||||
except KeyError:
|
||||
sem = threading.Semaphore()
|
||||
_semaphores[name] = sem
|
||||
LOG.debug('Created new semaphore "%s"', name)
|
||||
|
||||
return sem
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
"""Context based lock
|
||||
|
||||
This function yields a `threading.Semaphore` instance (if we don't use
|
||||
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
|
||||
True, in which case, it'll yield an InterProcessLock instance.
|
||||
|
||||
:param lock_file_prefix: The lock_file_prefix argument is used to provide
|
||||
lock files on disk with a meaningful prefix.
|
||||
|
||||
:param external: The external keyword argument denotes whether this lock
|
||||
should work across multiple processes. This means that if two different
|
||||
workers both run a method decorated with @synchronized('mylock',
|
||||
external=True), only one of them will execute at a time.
|
||||
"""
|
||||
int_lock = internal_lock(name)
|
||||
with int_lock:
|
||||
LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
|
||||
try:
|
||||
if external and not CONF.disable_process_locking:
|
||||
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
||||
with ext_lock:
|
||||
yield ext_lock
|
||||
else:
|
||||
yield int_lock
|
||||
finally:
|
||||
LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
|
||||
|
||||
|
||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
"""Synchronization decorator.
|
||||
|
||||
Decorating a method like so::
|
||||
|
||||
@synchronized('mylock')
|
||||
def foo(self, *args):
|
||||
...
|
||||
|
||||
ensures that only one thread will execute the foo method at a time.
|
||||
|
||||
Different methods can share the same lock::
|
||||
|
||||
@synchronized('mylock')
|
||||
def foo(self, *args):
|
||||
...
|
||||
|
||||
@synchronized('mylock')
|
||||
def bar(self, *args):
|
||||
...
|
||||
|
||||
This way only one of either foo or bar can be executing at a time.
|
||||
"""
|
||||
|
||||
def wrap(f):
|
||||
@functools.wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
try:
|
||||
with lock(name, lock_file_prefix, external, lock_path):
|
||||
LOG.debug('Got semaphore / lock "%(function)s"',
|
||||
{'function': f.__name__})
|
||||
return f(*args, **kwargs)
|
||||
finally:
|
||||
LOG.debug('Semaphore / lock released "%(function)s"',
|
||||
{'function': f.__name__})
|
||||
return inner
|
||||
return wrap
|
||||
|
||||
|
||||
def synchronized_with_prefix(lock_file_prefix):
|
||||
"""Partial object generator for the synchronization decorator.
|
||||
|
||||
Redefine @synchronized in each project like so::
|
||||
|
||||
(in nova/utils.py)
|
||||
from nova.openstack.common import lockutils
|
||||
|
||||
synchronized = lockutils.synchronized_with_prefix('nova-')
|
||||
|
||||
|
||||
(in nova/foo.py)
|
||||
from nova import utils
|
||||
|
||||
@utils.synchronized('mylock')
|
||||
def bar(self, *args):
|
||||
...
|
||||
|
||||
The lock_file_prefix argument is used to provide lock files on disk with a
|
||||
meaningful prefix.
|
||||
"""
|
||||
|
||||
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
|
||||
|
||||
|
||||
def main(argv):
|
||||
"""Create a dir for locks and pass it to command from arguments
|
||||
|
||||
If you run this:
|
||||
python -m openstack.common.lockutils python setup.py testr <etc>
|
||||
|
||||
a temporary directory will be created for all your locks and passed to all
|
||||
your tests in an environment variable. The temporary dir will be deleted
|
||||
afterwards and the return value will be preserved.
|
||||
"""
|
||||
|
||||
lock_dir = tempfile.mkdtemp()
|
||||
os.environ["MANILA_LOCK_PATH"] = lock_dir
|
||||
try:
|
||||
ret_val = subprocess.call(argv[1:])
|
||||
finally:
|
||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
||||
return ret_val
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
sys.exit(main(sys.argv))
|
@ -20,6 +20,7 @@ __all__ = [
|
||||
import copy
|
||||
import itertools
|
||||
|
||||
import oslo_concurrency.opts
|
||||
|
||||
import manila.api.middleware.auth
|
||||
import manila.api.middleware.sizelimit
|
||||
@ -33,7 +34,6 @@ import manila.network
|
||||
import manila.network.linux.interface
|
||||
import manila.network.neutron.api
|
||||
import manila.openstack.common.eventlet_backdoor
|
||||
import manila.openstack.common.lockutils
|
||||
import manila.openstack.common.log
|
||||
import manila.openstack.common.policy
|
||||
import manila.openstack.common.sslutils
|
||||
@ -79,7 +79,6 @@ _global_opt_lists = [
|
||||
manila.network.network_opts,
|
||||
manila.network.neutron.api.neutron_opts,
|
||||
manila.openstack.common.eventlet_backdoor.eventlet_backdoor_opts,
|
||||
manila.openstack.common.lockutils.util_opts,
|
||||
manila.openstack.common.log.common_cli_opts,
|
||||
manila.openstack.common.log.generic_log_opts,
|
||||
manila.openstack.common.log.log_opts,
|
||||
@ -121,6 +120,8 @@ _opts = [
|
||||
("ssl", manila.openstack.common.sslutils.ssl_opts)
|
||||
]
|
||||
|
||||
_opts.extend(oslo_concurrency.opts.list_opts())
|
||||
|
||||
|
||||
def list_opts():
|
||||
"""Return a list of oslo.config options available in Manila."""
|
||||
|
@ -26,7 +26,6 @@ from manila.i18n import _
|
||||
from manila.i18n import _LE
|
||||
from manila.i18n import _LI
|
||||
from manila.i18n import _LW
|
||||
from manila.openstack.common import lockutils
|
||||
from manila.openstack.common import log as logging
|
||||
from manila.share.drivers.emc.plugins.vnx import constants
|
||||
from manila.share.drivers.emc.plugins.vnx import utils as vnx_utils
|
||||
@ -1410,7 +1409,7 @@ class NASCommandHelper(object):
|
||||
def allow_nfs_share_access(self, path, host_ip, mover_name):
|
||||
sharename = path.strip('/')
|
||||
|
||||
@lockutils.synchronized('emc-shareaccess-' + sharename)
|
||||
@utils.synchronized('emc-shareaccess-' + sharename)
|
||||
def do_allow_access(path, host_ip, mover_name):
|
||||
ok = (constants.STATUS_OK, '')
|
||||
status, share = self.get_nfs_share_by_path(path, mover_name)
|
||||
@ -1450,7 +1449,7 @@ class NASCommandHelper(object):
|
||||
def deny_nfs_share_access(self, path, host_ip, mover_name):
|
||||
sharename = path.strip('/')
|
||||
|
||||
@lockutils.synchronized('emc-shareaccess-' + sharename)
|
||||
@utils.synchronized('emc-shareaccess-' + sharename)
|
||||
def do_deny_access(path, host_ip, mover_name):
|
||||
ok = (constants.STATUS_OK, '')
|
||||
status, share = self.get_nfs_share_by_path(path, mover_name)
|
||||
|
@ -23,14 +23,15 @@ inline callbacks.
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
from oslo.config import cfg
|
||||
from oslo.config import fixture as config_fixture
|
||||
from oslo.messaging import conffixture as messaging_conffixture
|
||||
from oslo.utils import timeutils
|
||||
from oslo_concurrency import lockutils
|
||||
import six
|
||||
import testtools
|
||||
|
||||
@ -146,9 +147,13 @@ class TestCase(testtools.TestCase):
|
||||
self.stubs = StubOutForTesting(self)
|
||||
self.injected = []
|
||||
self._services = []
|
||||
CONF.set_override('fatal_exception_format_errors', True)
|
||||
self.flags(fatal_exception_format_errors=True)
|
||||
# This will be cleaned up by the NestedTempfile fixture
|
||||
CONF.set_override('lock_path', tempfile.mkdtemp())
|
||||
lock_path = self.useFixture(fixtures.TempDir()).path
|
||||
self.fixture = self.useFixture(config_fixture.Config(lockutils.CONF))
|
||||
self.fixture.config(lock_path=lock_path, group='oslo_concurrency')
|
||||
self.fixture.config(
|
||||
disable_process_locking=True, group='oslo_concurrency')
|
||||
|
||||
rpc.add_extra_exmods('manila.tests')
|
||||
self.addCleanup(rpc.clear_extra_exmods)
|
||||
|
@ -13,9 +13,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.serialization import jsonutils
|
||||
import webob
|
||||
@ -43,17 +40,11 @@ class AdminActionsTest(test.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(AdminActionsTest, self).setUp()
|
||||
self.tempdir = tempfile.mkdtemp()
|
||||
self.flags(rpc_backend='manila.openstack.common.rpc.impl_fake')
|
||||
self.flags(lock_path=self.tempdir)
|
||||
self.share_api = share_api.API()
|
||||
self.admin_context = context.RequestContext('admin', 'fake', True)
|
||||
self.member_context = context.RequestContext('fake', 'fake')
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tempdir)
|
||||
super(AdminActionsTest, self).tearDown()
|
||||
|
||||
def test_reset_status_as_admin(self):
|
||||
# current status is available
|
||||
share = db.share_create(self.admin_context, {'status': 'available'})
|
||||
|
@ -36,13 +36,13 @@ import netaddr
|
||||
from oslo.config import cfg
|
||||
from oslo.utils import importutils
|
||||
from oslo.utils import timeutils
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_concurrency import processutils
|
||||
import paramiko
|
||||
import six
|
||||
|
||||
from manila import exception
|
||||
from manila.i18n import _
|
||||
from manila.openstack.common import lockutils
|
||||
from manila.openstack.common import log as logging
|
||||
|
||||
|
||||
|
@ -4,7 +4,6 @@
|
||||
module=eventlet_backdoor
|
||||
module=fileutils
|
||||
module=local
|
||||
module=lockutils
|
||||
module=log
|
||||
module=loopingcall
|
||||
module=policy
|
||||
|
@ -180,7 +180,7 @@ function run_pep8 {
|
||||
bash -c "${wrapper} flake8"
|
||||
}
|
||||
|
||||
TESTRTESTS="python -m manila.openstack.common.lockutils python setup.py testr"
|
||||
TESTRTESTS="python setup.py testr"
|
||||
|
||||
if [ $never_venv -eq 0 ]
|
||||
then
|
||||
|
Loading…
Reference in New Issue
Block a user