Update openstack/common/lockutils

The following commits are in this update:

79e6bc6 fix lockutils.lock() to make it thread-safe
ace5120 Add main() to lockutils that creates temp dir for locks
537d8e2 Allow lockutils to get lock_path conf from envvar
d498c42 Fix to properly log when we release a semaphore
29d387c Add LockFixture to lockutils
3e3ac0c Modify lockutils.py due to dispose of eventlet
90b6a65 Fix locking bug
27d4b41 Move synchronized body to a first-class function
15c17fb Make lock_file_prefix optional
1a2df89 Enable H302 hacking check
b41862d Use param keyword for docstrings

Change-Id: Ic0fb3b7de5817dbd69da761f625689c523932bc4
This commit is contained in:
Michael Still 2013-11-16 19:05:57 +11:00
parent 128051bf8b
commit ea944df962
1 changed files with 120 additions and 93 deletions

View File

@ -16,19 +16,22 @@
# under the License.
import contextlib
import errno
import functools
import os
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import weakref
from eventlet import semaphore
from oslo.config import cfg
from manila.openstack.common import fileutils
from manila.openstack.common.gettextutils import _
from manila.openstack.common.gettextutils import _ # noqa
from manila.openstack.common import local
from manila.openstack.common import log as logging
@ -40,8 +43,8 @@ util_opts = [
cfg.BoolOpt('disable_process_locking', default=False,
help='Whether to disable inter-process locks'),
cfg.StrOpt('lock_path',
help=('Directory to use for lock files. Default to a '
'temp directory'))
default=os.environ.get("MANILA_LOCK_PATH"),
help=('Directory to use for lock files.'))
]
@ -133,9 +136,88 @@ else:
InterProcessLock = _PosixLock
_semaphores = weakref.WeakValueDictionary()
_semaphores_lock = threading.Lock()
def synchronized(name, lock_file_prefix, external=False, lock_path=None):
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
eventlet.monkey_patch(), else `semaphore.Semaphore`) unless external is
True, in which case, it'll yield an InterProcessLock instance.
:param lock_file_prefix: The lock_file_prefix argument is used to provide
lock files on disk with a meaningful prefix.
:param external: The external keyword argument denotes whether this lock
should work across multiple processes. This means that if two different
workers both run a a method decorated with @synchronized('mylock',
external=True), only one of them will execute at a time.
:param lock_path: The lock_path keyword argument is used to specify a
special location for external lock files to live. If nothing is set, then
CONF.lock_path is used as a default.
"""
with _semaphores_lock:
try:
sem = _semaphores[name]
except KeyError:
sem = threading.Semaphore()
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
{'lock': name})
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path or CONF.lock_path
if not local_lock_path:
raise cfg.RequiredOptError('lock_path')
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
LOG.info(_('Created lock path: %s'), local_lock_path)
def add_prefix(name, prefix):
if not prefix:
return name
sep = '' if prefix.endswith('-') else '-'
return '%s%s%s' % (prefix, sep, name)
# NOTE(mikal): the lock name cannot contain directory
# separators
lock_file_name = add_prefix(name.replace(os.sep, '_'),
lock_file_prefix)
lock_file_path = os.path.join(local_lock_path, lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock as lock:
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
yield lock
finally:
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
{'lock': name, 'path': lock_file_path})
else:
yield sem
finally:
local.strong_store.locks_held.remove(name)
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
"""Synchronization decorator.
Decorating a method like so::
@ -157,98 +239,19 @@ def synchronized(name, lock_file_prefix, external=False, lock_path=None):
...
This way only one of either foo or bar can be executing at a time.
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
The external keyword argument denotes whether this lock should work across
multiple processes. This means that if two different workers both run a
a method decorated with @synchronized('mylock', external=True), only one
of them will execute at a time.
The lock_path keyword argument is used to specify a special location for
external lock files to live. If nothing is set, then CONF.lock_path is
used as a default.
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
# NOTE(soren): If we ever go natively threaded, this will be racy.
# See http://stackoverflow.com/questions/5390569/dyn
# amically-allocating-and-destroying-mutexes
sem = _semaphores.get(name, semaphore.Semaphore())
if name not in _semaphores:
# this check is not racy - we're already holding ref locally
# so GC won't remove the item and there was no IO switch
# (only valid in greenthreads)
_semaphores[name] = sem
with sem:
LOG.debug(_('Got semaphore "%(lock)s" for method '
'"%(method)s"...'), {'lock': name,
'method': f.__name__})
# NOTE(mikal): I know this looks odd
if not hasattr(local.strong_store, 'locks_held'):
local.strong_store.locks_held = []
local.strong_store.locks_held.append(name)
try:
if external and not CONF.disable_process_locking:
LOG.debug(_('Attempting to grab file lock "%(lock)s" '
'for method "%(method)s"...'),
{'lock': name, 'method': f.__name__})
cleanup_dir = False
# We need a copy of lock_path because it is non-local
local_lock_path = lock_path
if not local_lock_path:
local_lock_path = CONF.lock_path
if not local_lock_path:
cleanup_dir = True
local_lock_path = tempfile.mkdtemp()
if not os.path.exists(local_lock_path):
fileutils.ensure_tree(local_lock_path)
# NOTE(mikal): the lock name cannot contain directory
# separators
safe_name = name.replace(os.sep, '_')
lock_file_name = '%s%s' % (lock_file_prefix, safe_name)
lock_file_path = os.path.join(local_lock_path,
lock_file_name)
try:
lock = InterProcessLock(lock_file_path)
with lock:
LOG.debug(_('Got file lock "%(lock)s" at '
'%(path)s for method '
'"%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
retval = f(*args, **kwargs)
finally:
LOG.debug(_('Released file lock "%(lock)s" at '
'%(path)s for method "%(method)s"...'),
{'lock': name,
'path': lock_file_path,
'method': f.__name__})
# NOTE(vish): This removes the tempdir if we needed
# to create one. This is used to
# cleanup the locks left behind by unit
# tests.
if cleanup_dir:
shutil.rmtree(local_lock_path)
else:
retval = f(*args, **kwargs)
finally:
local.strong_store.locks_held.remove(name)
return retval
try:
with lock(name, lock_file_prefix, external, lock_path):
LOG.debug(_('Got semaphore / lock "%(function)s"'),
{'function': f.__name__})
return f(*args, **kwargs)
finally:
LOG.debug(_('Semaphore / lock released "%(function)s"'),
{'function': f.__name__})
return inner
return wrap
@ -272,7 +275,31 @@ def synchronized_with_prefix(lock_file_prefix):
...
The lock_file_prefix argument is used to provide lock files on disk with a
meaningful prefix. The prefix should end with a hyphen ('-') if specified.
meaningful prefix.
"""
return functools.partial(synchronized, lock_file_prefix=lock_file_prefix)
def main(argv):
"""Create a dir for locks and pass it to command from arguments
If you run this:
python -m openstack.common.lockutils python setup.py testr <etc>
a temporary directory will be created for all your locks and passed to all
your tests in an environment variable. The temporary dir will be deleted
afterwards and the return value will be preserved.
"""
lock_dir = tempfile.mkdtemp()
os.environ["MANILA_LOCK_PATH"] = lock_dir
try:
ret_val = subprocess.call(argv[1:])
finally:
shutil.rmtree(lock_dir, ignore_errors=True)
return ret_val
if __name__ == '__main__':
sys.exit(main(sys.argv))