Move synchronized body to a first-class function
synchronized is being widely used throughout OpenStack for managing locks by decorating methods / functions. However, it's functionality is trapped within that decorator definition and makes it difficult to manage locks by using it in other cases (like w/o decorating). This patch moves decorator's body into a first-class function called lock which is intended to be used as a context manager type (with statement). Some examples: with lockutils.lock("test") as sem: print('Inside the lock') Things I'm not 100% convinced: * The `lock` function yields a Semaphore when external is False and an InterProcessLock instance otherwise. Although it is not 'good' to yield different values depending on the input parameters, it's pretty explicit (by reading the documentation) that external locks are handled differently. Other options for this case are: 1. Always yield a Semaphore instance 2. Don't yield anything The patch doesn't break backward compatibility so no change is needed to existing projects. Implements blueprint cache-backend-abstraction Change-Id: I96b2ee84da5a2dcd7f0bd469f8a1e52b74e50c75
This commit is contained in:
parent
15c17fb1d0
commit
27d4b4121e
|
@ -16,6 +16,7 @@
|
|||
# under the License.
|
||||
|
||||
|
||||
import contextlib
|
||||
import errno
|
||||
import functools
|
||||
import os
|
||||
|
@ -135,6 +136,96 @@ else:
|
|||
_semaphores = weakref.WeakValueDictionary()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def lock(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
"""Context based lock
|
||||
|
||||
This function yields a `semaphore.Semaphore` instance unless external is
|
||||
True, in which case, it'll yield an InterProcessLock instance.
|
||||
|
||||
:param lock_file_prefix: The lock_file_prefix argument is used to provide
|
||||
lock files on disk with a meaningful prefix.
|
||||
|
||||
:param external: The external keyword argument denotes whether this lock
|
||||
should work across multiple processes. This means that if two different
|
||||
workers both run a a method decorated with @synchronized('mylock',
|
||||
external=True), only one of them will execute at a time.
|
||||
|
||||
:param lock_path: The lock_path keyword argument is used to specify a
|
||||
special location for external lock files to live. If nothing is set, then
|
||||
CONF.lock_path is used as a default.
|
||||
"""
|
||||
# NOTE(soren): If we ever go natively threaded, this will be racy.
|
||||
# See http://stackoverflow.com/questions/5390569/dyn
|
||||
# amically-allocating-and-destroying-mutexes
|
||||
sem = _semaphores.get(name, semaphore.Semaphore())
|
||||
if name not in _semaphores:
|
||||
# this check is not racy - we're already holding ref locally
|
||||
# so GC won't remove the item and there was no IO switch
|
||||
# (only valid in greenthreads)
|
||||
_semaphores[name] = sem
|
||||
|
||||
with sem:
|
||||
LOG.debug(_('Got semaphore "%(lock)s"'), {'lock': name})
|
||||
|
||||
# NOTE(mikal): I know this looks odd
|
||||
if not hasattr(local.strong_store, 'locks_held'):
|
||||
local.strong_store.locks_held = []
|
||||
local.strong_store.locks_held.append(name)
|
||||
|
||||
try:
|
||||
if external and not CONF.disable_process_locking:
|
||||
LOG.debug(_('Attempting to grab file lock "%(lock)s"'),
|
||||
{'lock': name})
|
||||
cleanup_dir = False
|
||||
|
||||
# We need a copy of lock_path because it is non-local
|
||||
local_lock_path = lock_path
|
||||
if not local_lock_path:
|
||||
local_lock_path = CONF.lock_path
|
||||
|
||||
if not local_lock_path:
|
||||
cleanup_dir = True
|
||||
local_lock_path = tempfile.mkdtemp()
|
||||
|
||||
if not os.path.exists(local_lock_path):
|
||||
fileutils.ensure_tree(local_lock_path)
|
||||
|
||||
def add_prefix(name, prefix):
|
||||
if not prefix:
|
||||
return name
|
||||
sep = '' if prefix.endswith('-') else '-'
|
||||
return '%s%s%s' % (prefix, sep, name)
|
||||
|
||||
# NOTE(mikal): the lock name cannot contain directory
|
||||
# separators
|
||||
lock_file_name = add_prefix(name.replace(os.sep, '_'),
|
||||
lock_file_prefix)
|
||||
|
||||
lock_file_path = os.path.join(local_lock_path, lock_file_name)
|
||||
|
||||
try:
|
||||
lock = InterProcessLock(lock_file_path)
|
||||
with lock as lock:
|
||||
LOG.debug(_('Got file lock "%(lock)s" at %(path)s'),
|
||||
{'lock': name, 'path': lock_file_path})
|
||||
yield lock
|
||||
finally:
|
||||
LOG.debug(_('Released file lock "%(lock)s" at %(path)s'),
|
||||
{'lock': name, 'path': lock_file_path})
|
||||
# NOTE(vish): This removes the tempdir if we needed
|
||||
# to create one. This is used to
|
||||
# cleanup the locks left behind by unit
|
||||
# tests.
|
||||
if cleanup_dir:
|
||||
shutil.rmtree(local_lock_path)
|
||||
else:
|
||||
yield sem
|
||||
|
||||
finally:
|
||||
local.strong_store.locks_held.remove(name)
|
||||
|
||||
|
||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
"""Synchronization decorator.
|
||||
|
||||
|
@ -157,105 +248,18 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
|||
...
|
||||
|
||||
This way only one of either foo or bar can be executing at a time.
|
||||
|
||||
:param lock_file_prefix: The lock_file_prefix argument is used to provide
|
||||
lock files on disk with a meaningful prefix.
|
||||
|
||||
:param external: The external keyword argument denotes whether this lock
|
||||
should work across multiple processes. This means that if two different
|
||||
workers both run a a method decorated with @synchronized('mylock',
|
||||
external=True), only one of them will execute at a time.
|
||||
|
||||
:param lock_path: The lock_path keyword argument is used to specify a
|
||||
special location for external lock files to live. If nothing is set, then
|
||||
CONF.lock_path is used as a default.
|
||||
"""
|
||||
|
||||
def wrap(f):
|
||||
@functools.wraps(f)
|
||||
def inner(*args, **kwargs):
|
||||
# NOTE(soren): If we ever go natively threaded, this will be racy.
|
||||
# See http://stackoverflow.com/questions/5390569/dyn
|
||||
# amically-allocating-and-destroying-mutexes
|
||||
sem = _semaphores.get(name, semaphore.Semaphore())
|
||||
if name not in _semaphores:
|
||||
# this check is not racy - we're already holding ref locally
|
||||
# so GC won't remove the item and there was no IO switch
|
||||
# (only valid in greenthreads)
|
||||
_semaphores[name] = sem
|
||||
with lock(name, lock_file_prefix, external, lock_path):
|
||||
LOG.debug(_('Got semaphore / lock "%(function)s"'),
|
||||
{'function': f.__name__})
|
||||
return f(*args, **kwargs)
|
||||
|
||||
with sem:
|
||||
LOG.debug(_('Got semaphore "%(lock)s" for method '
|
||||
'"%(method)s"...'), {'lock': name,
|
||||
'method': f.__name__})
|
||||
|
||||
# NOTE(mikal): I know this looks odd
|
||||
if not hasattr(local.strong_store, 'locks_held'):
|
||||
local.strong_store.locks_held = []
|
||||
local.strong_store.locks_held.append(name)
|
||||
|
||||
try:
|
||||
if external and not CONF.disable_process_locking:
|
||||
LOG.debug(_('Attempting to grab file lock "%(lock)s" '
|
||||
'for method "%(method)s"...'),
|
||||
{'lock': name, 'method': f.__name__})
|
||||
cleanup_dir = False
|
||||
|
||||
# We need a copy of lock_path because it is non-local
|
||||
local_lock_path = lock_path
|
||||
if not local_lock_path:
|
||||
local_lock_path = CONF.lock_path
|
||||
|
||||
if not local_lock_path:
|
||||
cleanup_dir = True
|
||||
local_lock_path = tempfile.mkdtemp()
|
||||
|
||||
if not os.path.exists(local_lock_path):
|
||||
fileutils.ensure_tree(local_lock_path)
|
||||
|
||||
def add_prefix(name, prefix):
|
||||
if not prefix:
|
||||
return name
|
||||
sep = '' if prefix.endswith('-') else '-'
|
||||
return '%s%s%s' % (prefix, sep, name)
|
||||
|
||||
# NOTE(mikal): the lock name cannot contain directory
|
||||
# separators
|
||||
lock_file_name = add_prefix(name.replace(os.sep, '_'),
|
||||
lock_file_prefix)
|
||||
|
||||
lock_file_path = os.path.join(local_lock_path,
|
||||
lock_file_name)
|
||||
|
||||
try:
|
||||
lock = InterProcessLock(lock_file_path)
|
||||
with lock:
|
||||
LOG.debug(_('Got file lock "%(lock)s" at '
|
||||
'%(path)s for method '
|
||||
'"%(method)s"...'),
|
||||
{'lock': name,
|
||||
'path': lock_file_path,
|
||||
'method': f.__name__})
|
||||
retval = f(*args, **kwargs)
|
||||
finally:
|
||||
LOG.debug(_('Released file lock "%(lock)s" at '
|
||||
'%(path)s for method "%(method)s"...'),
|
||||
{'lock': name,
|
||||
'path': lock_file_path,
|
||||
'method': f.__name__})
|
||||
# NOTE(vish): This removes the tempdir if we needed
|
||||
# to create one. This is used to
|
||||
# cleanup the locks left behind by unit
|
||||
# tests.
|
||||
if cleanup_dir:
|
||||
shutil.rmtree(local_lock_path)
|
||||
else:
|
||||
retval = f(*args, **kwargs)
|
||||
|
||||
finally:
|
||||
local.strong_store.locks_held.remove(name)
|
||||
|
||||
return retval
|
||||
LOG.debug(_('Semaphore / lock released "%(function)s"'),
|
||||
{'function': f.__name__})
|
||||
return inner
|
||||
return wrap
|
||||
|
||||
|
|
|
@ -22,12 +22,14 @@ import tempfile
|
|||
import eventlet
|
||||
from eventlet import greenpool
|
||||
from eventlet import greenthread
|
||||
from eventlet import semaphore
|
||||
|
||||
from openstack.common import lockutils
|
||||
from tests import utils
|
||||
|
||||
|
||||
class TestFileLocks(utils.BaseTestCase):
|
||||
|
||||
def test_concurrent_green_lock_succeeds(self):
|
||||
"""Verify spawn_n greenthreads with two locks run concurrently."""
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
|
@ -63,6 +65,7 @@ class TestFileLocks(utils.BaseTestCase):
|
|||
|
||||
|
||||
class LockTestCase(utils.BaseTestCase):
|
||||
|
||||
def test_synchronized_wrapped_function_metadata(self):
|
||||
@lockutils.synchronized('whatever', 'test-')
|
||||
def foo():
|
||||
|
@ -74,16 +77,16 @@ class LockTestCase(utils.BaseTestCase):
|
|||
self.assertEquals(foo.__name__, 'foo', "Wrapped function's name "
|
||||
"got mangled")
|
||||
|
||||
def test_synchronized_internally(self):
|
||||
def test_lock_internally(self):
|
||||
"""We can lock across multiple green threads."""
|
||||
saved_sem_num = len(lockutils._semaphores)
|
||||
seen_threads = list()
|
||||
|
||||
@lockutils.synchronized('testlock2', 'test-', external=False)
|
||||
def f(id):
|
||||
for x in range(10):
|
||||
seen_threads.append(id)
|
||||
greenthread.sleep(0)
|
||||
def f(_id):
|
||||
with lockutils.lock('testlock2', 'test-', external=False):
|
||||
for x in range(10):
|
||||
seen_threads.append(_id)
|
||||
greenthread.sleep(0)
|
||||
|
||||
threads = []
|
||||
pool = greenpool.GreenPool(10)
|
||||
|
@ -104,7 +107,7 @@ class LockTestCase(utils.BaseTestCase):
|
|||
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
|
||||
"Semaphore leak detected")
|
||||
|
||||
def test_nested_external_works(self):
|
||||
def test_nested_synchronized_external_works(self):
|
||||
"""We can nest external syncs."""
|
||||
tempdir = tempfile.mkdtemp()
|
||||
try:
|
||||
|
@ -125,34 +128,35 @@ class LockTestCase(utils.BaseTestCase):
|
|||
if os.path.exists(tempdir):
|
||||
shutil.rmtree(tempdir)
|
||||
|
||||
def _do_test_synchronized_externally(self):
|
||||
def _do_test_lock_externally(self):
|
||||
"""We can lock across multiple processes."""
|
||||
|
||||
@lockutils.synchronized('external', 'test-', external=True)
|
||||
def lock_files(handles_dir):
|
||||
# Open some files we can use for locking
|
||||
handles = []
|
||||
for n in range(50):
|
||||
path = os.path.join(handles_dir, ('file-%s' % n))
|
||||
handles.append(open(path, 'w'))
|
||||
|
||||
# Loop over all the handles and try locking the file
|
||||
# without blocking, keep a count of how many files we
|
||||
# were able to lock and then unlock. If the lock fails
|
||||
# we get an IOError and bail out with bad exit code
|
||||
count = 0
|
||||
for handle in handles:
|
||||
try:
|
||||
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
count += 1
|
||||
fcntl.flock(handle, fcntl.LOCK_UN)
|
||||
except IOError:
|
||||
os._exit(2)
|
||||
finally:
|
||||
handle.close()
|
||||
with lockutils.lock('external', 'test-', external=True):
|
||||
# Open some files we can use for locking
|
||||
handles = []
|
||||
for n in range(50):
|
||||
path = os.path.join(handles_dir, ('file-%s' % n))
|
||||
handles.append(open(path, 'w'))
|
||||
|
||||
# Check if we were able to open all files
|
||||
self.assertEqual(50, count)
|
||||
# Loop over all the handles and try locking the file
|
||||
# without blocking, keep a count of how many files we
|
||||
# were able to lock and then unlock. If the lock fails
|
||||
# we get an IOError and bail out with bad exit code
|
||||
count = 0
|
||||
for handle in handles:
|
||||
try:
|
||||
fcntl.flock(handle, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
count += 1
|
||||
fcntl.flock(handle, fcntl.LOCK_UN)
|
||||
except IOError:
|
||||
os._exit(2)
|
||||
finally:
|
||||
handle.close()
|
||||
|
||||
# Check if we were able to open all files
|
||||
self.assertEqual(50, count)
|
||||
|
||||
handles_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
|
@ -175,23 +179,23 @@ class LockTestCase(utils.BaseTestCase):
|
|||
if os.path.exists(handles_dir):
|
||||
shutil.rmtree(handles_dir, ignore_errors=True)
|
||||
|
||||
def test_synchronized_externally(self):
|
||||
def test_lock_externally(self):
|
||||
lock_dir = tempfile.mkdtemp()
|
||||
self.config(lock_path=lock_dir)
|
||||
|
||||
try:
|
||||
self._do_test_synchronized_externally()
|
||||
self._do_test_lock_externally()
|
||||
finally:
|
||||
if os.path.exists(lock_dir):
|
||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
||||
|
||||
def test_synchronized_externally_lock_dir_not_exist(self):
|
||||
def test_lock_externally_lock_dir_not_exist(self):
|
||||
lock_dir = tempfile.mkdtemp()
|
||||
os.rmdir(lock_dir)
|
||||
self.config(lock_path=lock_dir)
|
||||
|
||||
try:
|
||||
self._do_test_synchronized_externally()
|
||||
self._do_test_lock_externally()
|
||||
finally:
|
||||
if os.path.exists(lock_dir):
|
||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
||||
|
@ -239,3 +243,56 @@ class LockTestCase(utils.BaseTestCase):
|
|||
finally:
|
||||
if os.path.exists(lock_dir):
|
||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
||||
|
||||
def test_contextlock(self):
|
||||
lock_dir = tempfile.mkdtemp()
|
||||
|
||||
try:
|
||||
# Note(flaper87): Lock is not external, which means
|
||||
# a semaphore will be yielded
|
||||
with lockutils.lock("test") as sem:
|
||||
self.assertTrue(isinstance(sem, semaphore.Semaphore))
|
||||
|
||||
# NOTE(flaper87): Lock is external so an InterProcessLock
|
||||
# will be yielded.
|
||||
with lockutils.lock("test2", external=True,
|
||||
lock_path=lock_dir):
|
||||
path = os.path.join(lock_dir, "test2")
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
with lockutils.lock("test1",
|
||||
external=True,
|
||||
lock_path=lock_dir) as lock1:
|
||||
self.assertTrue(isinstance(lock1,
|
||||
lockutils.InterProcessLock))
|
||||
finally:
|
||||
if os.path.exists(lock_dir):
|
||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
||||
|
||||
def test_contextlock_unlocks(self):
|
||||
lock_dir = tempfile.mkdtemp()
|
||||
|
||||
sem = None
|
||||
|
||||
try:
|
||||
with lockutils.lock("test") as sem:
|
||||
self.assertTrue(isinstance(sem, semaphore.Semaphore))
|
||||
|
||||
with lockutils.lock("test2", external=True,
|
||||
lock_path=lock_dir):
|
||||
path = os.path.join(lock_dir, "test2")
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
# NOTE(flaper87): Lock should be free
|
||||
with lockutils.lock("test2", external=True,
|
||||
lock_path=lock_dir):
|
||||
path = os.path.join(lock_dir, "test2")
|
||||
self.assertTrue(os.path.exists(path))
|
||||
|
||||
# NOTE(flaper87): Lock should be free
|
||||
# but semaphore should already exist.
|
||||
with lockutils.lock("test") as sem2:
|
||||
self.assertEqual(sem, sem2)
|
||||
finally:
|
||||
if os.path.exists(lock_dir):
|
||||
shutil.rmtree(lock_dir, ignore_errors=True)
|
||||
|
|
Loading…
Reference in New Issue