Merge "Add support for fair locks"
This commit is contained in:
commit
0767ddf4c2
@ -47,6 +47,32 @@ sure that the names of the locks used are carefully chosen (typically by
|
|||||||
namespacing them to your app so that other apps will not chose the same
|
namespacing them to your app so that other apps will not chose the same
|
||||||
names).
|
names).
|
||||||
|
|
||||||
|
Enabling fair locking
|
||||||
|
=====================
|
||||||
|
|
||||||
|
By default there is no requirement that the lock is ``fair``. That is, it's
|
||||||
|
possible for a thread to block waiting for the lock, then have another thread
|
||||||
|
block waiting for the lock, and when the lock is released by the current owner
|
||||||
|
the second waiter could acquire the lock before the first. In an extreme case
|
||||||
|
you could have a whole string of other threads acquire the lock before the
|
||||||
|
first waiter acquires it, resulting in unpredictable amounts of latency.
|
||||||
|
|
||||||
|
For cases where this is a problem, it's possible to specify the use of fair
|
||||||
|
locks::
|
||||||
|
|
||||||
|
@lockutils.synchronized('not_thread_process_safe', fair=True)
|
||||||
|
def not_thread_process_safe():
|
||||||
|
pass
|
||||||
|
|
||||||
|
When using fair locks the lock itself is slightly more expensive (which
|
||||||
|
shouldn't matter in most cases), but it will ensure that all threads that
|
||||||
|
block waiting for the lock will acquire it in the order that they blocked.
|
||||||
|
|
||||||
|
The exception to this is when specifying both ``external`` and ``fair``
|
||||||
|
locks. In this case, the ordering *within* a given process will be fair, but
|
||||||
|
the ordering *between* processes will be determined by the behaviour of the
|
||||||
|
underlying OS.
|
||||||
|
|
||||||
Common ways to prefix/namespace the synchronized decorator
|
Common ways to prefix/namespace the synchronized decorator
|
||||||
==========================================================
|
==========================================================
|
||||||
|
|
||||||
|
@ -87,6 +87,49 @@ ReaderWriterLock = fasteners.ReaderWriterLock
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class FairLocks(object):
|
||||||
|
"""A garbage collected container of fair locks.
|
||||||
|
|
||||||
|
With a fair lock, contending lockers will get the lock in the order in
|
||||||
|
which they tried to acquire it.
|
||||||
|
|
||||||
|
This collection internally uses a weak value dictionary so that when a
|
||||||
|
lock is no longer in use (by any threads) it will automatically be
|
||||||
|
removed from this container by the garbage collector.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._locks = weakref.WeakValueDictionary()
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def get(self, name):
|
||||||
|
"""Gets (or creates) a lock with a given name.
|
||||||
|
|
||||||
|
:param name: The lock name to get/create (used to associate
|
||||||
|
previously created names with the same lock).
|
||||||
|
|
||||||
|
Returns an newly constructed lock (or an existing one if it was
|
||||||
|
already created for the given name).
|
||||||
|
"""
|
||||||
|
with self._lock:
|
||||||
|
try:
|
||||||
|
return self._locks[name]
|
||||||
|
except KeyError:
|
||||||
|
# The fasteners module specifies that
|
||||||
|
# ReaderWriterLock.write_lock() will give FIFO behaviour,
|
||||||
|
# so we don't need to do anything special ourselves.
|
||||||
|
rwlock = ReaderWriterLock()
|
||||||
|
self._locks[name] = rwlock
|
||||||
|
return rwlock
|
||||||
|
|
||||||
|
|
||||||
|
_fair_locks = FairLocks()
|
||||||
|
|
||||||
|
|
||||||
|
def internal_fair_lock(name):
|
||||||
|
return _fair_locks.get(name)
|
||||||
|
|
||||||
|
|
||||||
class Semaphores(object):
|
class Semaphores(object):
|
||||||
"""A garbage collected container of semaphores.
|
"""A garbage collected container of semaphores.
|
||||||
|
|
||||||
@ -170,7 +213,7 @@ def internal_lock(name, semaphores=None):
|
|||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||||
do_log=True, semaphores=None, delay=0.01):
|
do_log=True, semaphores=None, delay=0.01, fair=False):
|
||||||
"""Context based lock
|
"""Context based lock
|
||||||
|
|
||||||
This function yields a `threading.Semaphore` instance (if we don't use
|
This function yields a `threading.Semaphore` instance (if we don't use
|
||||||
@ -200,16 +243,26 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
|||||||
|
|
||||||
:param delay: Delay between acquisition attempts (in seconds).
|
:param delay: Delay between acquisition attempts (in seconds).
|
||||||
|
|
||||||
|
:param fair: Whether or not we want a "fair" lock where contending lockers
|
||||||
|
will get the lock in the order in which they tried to acquire it.
|
||||||
|
|
||||||
.. versionchanged:: 0.2
|
.. versionchanged:: 0.2
|
||||||
Added *do_log* optional parameter.
|
Added *do_log* optional parameter.
|
||||||
|
|
||||||
.. versionchanged:: 0.3
|
.. versionchanged:: 0.3
|
||||||
Added *delay* and *semaphores* optional parameters.
|
Added *delay* and *semaphores* optional parameters.
|
||||||
"""
|
"""
|
||||||
int_lock = internal_lock(name, semaphores=semaphores)
|
if fair:
|
||||||
|
if semaphores is not None:
|
||||||
|
raise NotImplementedError(_('Specifying semaphores is not '
|
||||||
|
'supported when using fair locks.'))
|
||||||
|
# The fastners module specifies that write_lock() provides fairness.
|
||||||
|
int_lock = internal_fair_lock(name).write_lock()
|
||||||
|
else:
|
||||||
|
int_lock = internal_lock(name, semaphores=semaphores)
|
||||||
with int_lock:
|
with int_lock:
|
||||||
if do_log:
|
if do_log:
|
||||||
LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
|
LOG.debug('Acquired lock "%(lock)s"', {'lock': name})
|
||||||
try:
|
try:
|
||||||
if external and not CONF.oslo_concurrency.disable_process_locking:
|
if external and not CONF.oslo_concurrency.disable_process_locking:
|
||||||
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
||||||
@ -225,11 +278,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
|||||||
yield int_lock
|
yield int_lock
|
||||||
finally:
|
finally:
|
||||||
if do_log:
|
if do_log:
|
||||||
LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
|
LOG.debug('Releasing lock "%(lock)s"', {'lock': name})
|
||||||
|
|
||||||
|
|
||||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||||
semaphores=None, delay=0.01):
|
semaphores=None, delay=0.01, fair=False):
|
||||||
"""Synchronization decorator.
|
"""Synchronization decorator.
|
||||||
|
|
||||||
Decorating a method like so::
|
Decorating a method like so::
|
||||||
@ -264,7 +317,8 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
|||||||
t2 = None
|
t2 = None
|
||||||
try:
|
try:
|
||||||
with lock(name, lock_file_prefix, external, lock_path,
|
with lock(name, lock_file_prefix, external, lock_path,
|
||||||
do_log=False, semaphores=semaphores, delay=delay):
|
do_log=False, semaphores=semaphores, delay=delay,
|
||||||
|
fair=fair):
|
||||||
t2 = timeutils.now()
|
t2 = timeutils.now()
|
||||||
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
||||||
'waited %(wait_secs)0.3fs',
|
'waited %(wait_secs)0.3fs',
|
||||||
|
@ -147,6 +147,45 @@ class LockTestCase(test_base.BaseTestCase):
|
|||||||
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
|
self.assertEqual(saved_sem_num, len(lockutils._semaphores),
|
||||||
"Semaphore leak detected")
|
"Semaphore leak detected")
|
||||||
|
|
||||||
|
def test_lock_internal_fair(self):
|
||||||
|
"""Check that we're actually fair."""
|
||||||
|
|
||||||
|
def f(_id):
|
||||||
|
with lockutils.lock('testlock', 'test-',
|
||||||
|
external=False, fair=True):
|
||||||
|
lock_holder.append(_id)
|
||||||
|
|
||||||
|
lock_holder = []
|
||||||
|
threads = []
|
||||||
|
# While holding the fair lock, spawn a bunch of threads that all try
|
||||||
|
# to acquire the lock. They will all block. Then release the lock
|
||||||
|
# and see what happens.
|
||||||
|
with lockutils.lock('testlock', 'test-', external=False, fair=True):
|
||||||
|
for i in range(10):
|
||||||
|
thread = threading.Thread(target=f, args=(i,))
|
||||||
|
threads.append(thread)
|
||||||
|
thread.start()
|
||||||
|
# Allow some time for the new thread to get queued onto the
|
||||||
|
# list of pending writers before continuing. This is gross
|
||||||
|
# but there's no way around it without using knowledge of
|
||||||
|
# fasteners internals.
|
||||||
|
time.sleep(0.5)
|
||||||
|
# Wait for all threads.
|
||||||
|
for thread in threads:
|
||||||
|
thread.join()
|
||||||
|
|
||||||
|
self.assertEqual(10, len(lock_holder))
|
||||||
|
# Check that the threads each got the lock in fair order.
|
||||||
|
for i in range(10):
|
||||||
|
self.assertEqual(i, lock_holder[i])
|
||||||
|
|
||||||
|
def test_fair_lock_with_semaphore(self):
|
||||||
|
def do_test():
|
||||||
|
s = lockutils.Semaphores()
|
||||||
|
with lockutils.lock('testlock', 'test-', semaphores=s, fair=True):
|
||||||
|
pass
|
||||||
|
self.assertRaises(NotImplementedError, do_test)
|
||||||
|
|
||||||
def test_nested_synchronized_external_works(self):
|
def test_nested_synchronized_external_works(self):
|
||||||
"""We can nest external syncs."""
|
"""We can nest external syncs."""
|
||||||
tempdir = tempfile.mkdtemp()
|
tempdir = tempfile.mkdtemp()
|
||||||
|
@ -0,0 +1,15 @@
|
|||||||
|
---
|
||||||
|
prelude: >
|
||||||
|
This release includes optional support for fair locks. When fair locks
|
||||||
|
are specified, blocking waiters will acquire the lock in the order that
|
||||||
|
they blocked.
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
We now have optional support for ``fair`` locks. When fair locks are
|
||||||
|
specified, blocking waiters will acquire the lock in the order that they
|
||||||
|
blocked. This can be useful to ensure that existing blocked waiters do
|
||||||
|
not wait indefinitely in the face of large numbers of new attempts to
|
||||||
|
acquire the lock. When specifying locks as both ``external`` and ``fair``,
|
||||||
|
the ordering *within* a given process will be fair, but the ordering
|
||||||
|
*between* processes will be determined by the behaviour of the underlying
|
||||||
|
OS.
|
Loading…
Reference in New Issue
Block a user