Add support for fair locks
This adds support for a "fair" variant of the lock. When there are multiple entities within a single process that are blocked waiting for the lock the fair lock will ensure that they acquire the lock in FIFO order. For now at least, when fair locks are in use we don't support the "semaphores" argument. If external locks are enabled, the inter-process ordering will be determined by the underlying OS lock ordering and process scheduling. Change-Id: I37577becff4978bf643c65fa9bc2d78d342ea35a
This commit is contained in:
parent
9584101c99
commit
2b55da68ae
@ -47,6 +47,32 @@ sure that the names of the locks used are carefully chosen (typically by
|
|||||||
namespacing them to your app so that other apps will not chose the same
|
namespacing them to your app so that other apps will not chose the same
|
||||||
names).
|
names).
|
||||||
|
|
||||||
|
Enabling fair locking
|
||||||
|
=====================
|
||||||
|
|
||||||
|
By default there is no requirement that the lock is ``fair``. That is, it's
|
||||||
|
possible for a thread to block waiting for the lock, then have another thread
|
||||||
|
block waiting for the lock, and when the lock is released by the current owner
|
||||||
|
the second waiter could acquire the lock before the first. In an extreme case
|
||||||
|
you could have a whole string of other threads acquire the lock before the
|
||||||
|
first waiter acquires it, resulting in unpredictable amounts of latency.
|
||||||
|
|
||||||
|
For cases where this is a problem, it's possible to specify the use of fair
|
||||||
|
locks::
|
||||||
|
|
||||||
|
@lockutils.synchronized('not_thread_process_safe', fair=True)
|
||||||
|
def not_thread_process_safe():
|
||||||
|
pass
|
||||||
|
|
||||||
|
When using fair locks the lock itself is slightly more expensive (which
|
||||||
|
shouldn't matter in most cases), but it will ensure that all threads that
|
||||||
|
block waiting for the lock will acquire it in the order that they blocked.
|
||||||
|
|
||||||
|
The exception to this is when specifying both ``external`` and ``fair``
|
||||||
|
locks. In this case, the ordering *within* a given process will be fair, but
|
||||||
|
the ordering *between* processes will be determined by the behaviour of the
|
||||||
|
underlying OS.
|
||||||
|
|
||||||
Common ways to prefix/namespace the synchronized decorator
|
Common ways to prefix/namespace the synchronized decorator
|
||||||
==========================================================
|
==========================================================
|
||||||
|
|
||||||
|
@ -87,6 +87,49 @@ ReaderWriterLock = fasteners.ReaderWriterLock
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class FairLocks(object):
|
||||||
|
"""A garbage collected container of fair locks.
|
||||||
|
|
||||||
|
With a fair lock, contending lockers will get the lock in the order in
|
||||||
|
which they tried to acquire it.
|
||||||
|
|
||||||
|
This collection internally uses a weak value dictionary so that when a
|
||||||
|
lock is no longer in use (by any threads) it will automatically be
|
||||||
|
removed from this container by the garbage collector.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._locks = weakref.WeakValueDictionary()
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def get(self, name):
|
||||||
|
"""Gets (or creates) a lock with a given name.
|
||||||
|
|
||||||
|
:param name: The lock name to get/create (used to associate
|
||||||
|
previously created names with the same lock).
|
||||||
|
|
||||||
|
Returns an newly constructed lock (or an existing one if it was
|
||||||
|
already created for the given name).
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
try:
|
||||||
|
return self._locks[name]
|
||||||
|
except KeyError:
|
||||||
|
# The fasteners module specifies that
|
||||||
|
# ReaderWriterLock.write_lock() will give FIFO behaviour,
|
||||||
|
# so we don't need to do anything special ourselves.
|
||||||
|
rwlock = ReaderWriterLock()
|
||||||
|
self._locks[name] = rwlock
|
||||||
|
return rwlock
|
||||||
|
|
||||||
|
|
||||||
|
_fair_locks = FairLocks()
|
||||||
|
|
||||||
|
|
||||||
|
def internal_fair_lock(name):
|
||||||
|
return _fair_locks.get(name)
|
||||||
|
|
||||||
|
|
||||||
class Semaphores(object):
|
class Semaphores(object):
|
||||||
"""A garbage collected container of semaphores.
|
"""A garbage collected container of semaphores.
|
||||||
|
|
||||||
@ -170,7 +213,7 @@ def internal_lock(name, semaphores=None):
|
|||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||||
do_log=True, semaphores=None, delay=0.01):
|
do_log=True, semaphores=None, delay=0.01, fair=False):
|
||||||
"""Context based lock
|
"""Context based lock
|
||||||
|
|
||||||
This function yields a `threading.Semaphore` instance (if we don't use
|
This function yields a `threading.Semaphore` instance (if we don't use
|
||||||
@ -200,16 +243,26 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
|||||||
|
|
||||||
:param delay: Delay between acquisition attempts (in seconds).
|
:param delay: Delay between acquisition attempts (in seconds).
|
||||||
|
|
||||||
|
:param fair: Whether or not we want a "fair" lock where contending lockers
|
||||||
|
will get the lock in the order in which they tried to acquire it.
|
||||||
|
|
||||||
.. versionchanged:: 0.2
|
.. versionchanged:: 0.2
|
||||||
Added *do_log* optional parameter.
|
Added *do_log* optional parameter.
|
||||||
|
|
||||||
.. versionchanged:: 0.3
|
.. versionchanged:: 0.3
|
||||||
Added *delay* and *semaphores* optional parameters.
|
Added *delay* and *semaphores* optional parameters.
|
||||||
"""
|
"""
|
||||||
|
if fair:
|
||||||
|
if semaphores is not None:
|
||||||
|
raise NotImplementedError(_('Specifying semaphores is not '
|
||||||
|
'supported when using fair locks.'))
|
||||||
|
# The fastners module specifies that write_lock() provides fairness.
|
||||||
|
int_lock = internal_fair_lock(name).write_lock()
|
||||||
|
else:
|
||||||
int_lock = internal_lock(name, semaphores=semaphores)
|
int_lock = internal_lock(name, semaphores=semaphores)
|
||||||
with int_lock:
|
with int_lock:
|
||||||
if do_log:
|
if do_log:
|
||||||
LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
|
LOG.debug('Acquired lock "%(lock)s"', {'lock': name})
|
||||||
try:
|
try:
|
||||||
if external and not CONF.oslo_concurrency.disable_process_locking:
|
if external and not CONF.oslo_concurrency.disable_process_locking:
|
||||||
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
||||||
@ -225,11 +278,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
|||||||
yield int_lock
|
yield int_lock
|
||||||
finally:
|
finally:
|
||||||
if do_log:
|
if do_log:
|
||||||
LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
|
LOG.debug('Releasing lock "%(lock)s"', {'lock': name})
|
||||||
|
|
||||||
|
|
||||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||||
semaphores=None, delay=0.01):
|
semaphores=None, delay=0.01, fair=False):
|
||||||
"""Synchronization decorator.
|
"""Synchronization decorator.
|
||||||
|
|
||||||
Decorating a method like so::
|
Decorating a method like so::
|
||||||
@ -264,7 +317,8 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
|||||||
t2 = None
|
t2 = None
|
||||||
try:
|
try:
|
||||||
with lock(name, lock_file_prefix, external, lock_path,
|
with lock(name, lock_file_prefix, external, lock_path,
|
||||||
do_log=False, semaphores=semaphores, delay=delay):
|
do_log=False, semaphores=semaphores, delay=delay,
|
||||||
|
fair=fair):
|
||||||
t2 = timeutils.now()
|
t2 = timeutils.now()
|
||||||
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
||||||
'waited %(wait_secs)0.3fs',
|
'waited %(wait_secs)0.3fs',
|
||||||
|
@ -147,6 +147,45 @@ class LockTestCase(test_base.BaseTestCase):
|
|||||||
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
|
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
|
||||||
"Semaphore leak detected")
|
"Semaphore leak detected")
|
||||||
|
|
||||||
|
def test_lock_internal_fair(self):
|
||||||
|
"""Check that we're actually fair."""
|
||||||
|
|
||||||
|
def f(_id):
|
||||||
|
with lockutils.lock('testlock', 'test-',
|
||||||
|
external=False, fair=True):
|
||||||
|
lock_holder.append(_id)
|
||||||
|
|
||||||
|
lock_holder = []
|
||||||
|
threads = []
|
||||||
|
# While holding the fair lock, spawn a bunch of threads that all try
|
||||||
|
# to acquire the lock. They will all block. Then release the lock
|
||||||
|
# and see what happens.
|
||||||
|
with lockutils.lock('testlock', 'test-', external=False, fair=True):
|
||||||
|
for i in range(10):
|
||||||
|
thread = threading.Thread(target=f, args=(i,))
|
||||||
|
threads.append(thread)
|
||||||
|
thread.start()
|
||||||
|
# Allow some time for the new thread to get queued onto the
|
||||||
|
# list of pending writers before continuing. This is gross
|
||||||
|
# but there's no way around it without using knowledge of
|
||||||
|
# fasteners internals.
|
||||||
|
time.sleep(0.5)
|
||||||
|
# Wait for all threads.
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
self.assertEqual(10, len(lock_holder))
|
||||||
|
# Check that the threads each got the lock in fair order.
|
||||||
|
for i in range(10):
|
||||||
|
self.assertEqual(i, lock_holder[i])
|
||||||
|
|
||||||
|
def test_fair_lock_with_semaphore(self):
|
||||||
|
def do_test():
|
||||||
|
s = lockutils.Semaphores()
|
||||||
|
with lockutils.lock('testlock', 'test-', semaphores=s, fair=True):
|
||||||
|
pass
|
||||||
|
self.assertRaises(NotImplementedError, do_test)
|
||||||
|
|
||||||
def test_nested_synchronized_external_works(self):
|
def test_nested_synchronized_external_works(self):
|
||||||
"""We can nest external syncs."""
|
"""We can nest external syncs."""
|
||||||
tempdir = tempfile.mkdtemp()
|
tempdir = tempfile.mkdtemp()
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
---
|
||||||
|
prelude: >
|
||||||
|
This release includes optional support for fair locks. When fair locks
|
||||||
|
are specified, blocking waiters will acquire the lock in the order that
|
||||||
|
they blocked.
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
We now have optional support for ``fair`` locks. When fair locks are
|
||||||
|
specified, blocking waiters will acquire the lock in the order that they
|
||||||
|
blocked. This can be useful to ensure that existing blocked waiters do
|
||||||
|
not wait indefinitely in the face of large numbers of new attempts to
|
||||||
|
acquire the lock. When specifying locks as both ``external`` and ``fair``,
|
||||||
|
the ordering *within* a given process will be fair, but the ordering
|
||||||
|
*between* processes will be determined by the behaviour of the underlying
|
||||||
|
OS.
|
Loading…
Reference in New Issue
Block a user