From a9ccf0a64deb63e7ea07d232437da797462d8f4b Mon Sep 17 00:00:00 2001 From: Romain Poupin Date: Wed, 16 Jun 2021 11:43:59 +0200 Subject: [PATCH] Add support for non-blocking locks This adds support for a non-blocking behavior of the lock : the context-based lock now raises an AcquireLockFailedException if the lock can't be acquired on the first try. At a higher layer, we just caught this exception inside the 'synchronized' decorator to prevent calling the wrapped function. In which case, we then trace this acquisition failure. For now at least, disabling blocking is not supported when using fair locks because the ReaderWriterLock.write_lock() provided by the fasteners module doesn't implements this behavior. Change-Id: I409da79007c9ba4fb8585da881e3d56998b0b98b --- oslo_concurrency/lockutils.py | 54 +++++++++-- oslo_concurrency/tests/unit/test_lockutils.py | 92 +++++++++++++++++++ 2 files changed, 136 insertions(+), 10 deletions(-) diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index 67cf41e..1a9abe8 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -206,15 +206,35 @@ def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None, {'file': lock_file_path}) -def internal_lock(name, semaphores=None): +class AcquireLockFailedException(Exception): + def __init__(self, lock_name): + self.message = "Failed to acquire the lock %s" % lock_name + + def __str__(self): + return self.message + + +def internal_lock(name, semaphores=None, blocking=True): + @contextlib.contextmanager + def nonblocking(lock): + """Try to acquire the internal lock without blocking.""" + if not lock.acquire(blocking=False): + raise AcquireLockFailedException(name) + try: + yield lock + finally: + lock.release() + if semaphores is None: semaphores = _semaphores - return semaphores.get(name) + lock = semaphores.get(name) + + return nonblocking(lock) if not blocking else lock @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None, - do_log=True, semaphores=None, delay=0.01, fair=False): + do_log=True, semaphores=None, delay=0.01, fair=False, blocking=True): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use @@ -247,6 +267,10 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, :param fair: Whether or not we want a "fair" lock where contending lockers will get the lock in the order in which they tried to acquire it. + :param blocking: Whether to wait forever to try to acquire the lock. + Incompatible with fair locks because those provided by the fasteners + module doesn't implements a non-blocking behavior. + .. versionchanged:: 0.2 Added *do_log* optional parameter. @@ -257,17 +281,23 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, if semaphores is not None: raise NotImplementedError(_('Specifying semaphores is not ' 'supported when using fair locks.')) - # The fastners module specifies that write_lock() provides fairness. + if blocking is not True: + raise NotImplementedError(_('Disabling blocking is not supported ' + 'when using fair locks.')) + # The fasteners module specifies that write_lock() provides fairness. int_lock = internal_fair_lock(name).write_lock() else: - int_lock = internal_lock(name, semaphores=semaphores) + int_lock = internal_lock(name, semaphores=semaphores, + blocking=blocking) with int_lock: if do_log: LOG.debug('Acquired lock "%(lock)s"', {'lock': name}) try: if external and not CONF.oslo_concurrency.disable_process_locking: ext_lock = external_lock(name, lock_file_prefix, lock_path) - ext_lock.acquire(delay=delay) + gotten = ext_lock.acquire(delay=delay, blocking=blocking) + if not gotten: + raise AcquireLockFailedException(name) if do_log: LOG.debug('Acquired external semaphore "%(lock)s"', {'lock': name}) @@ -314,7 +344,7 @@ def lock_with_prefix(lock_file_prefix): def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, - semaphores=None, delay=0.01, fair=False): + semaphores=None, delay=0.01, fair=False, blocking=True): """Synchronization decorator. Decorating a method like so:: @@ -347,10 +377,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, def inner(*args, **kwargs): t1 = timeutils.now() t2 = None + gotten = True try: with lock(name, lock_file_prefix, external, lock_path, do_log=False, semaphores=semaphores, delay=delay, - fair=fair): + fair=fair, blocking=blocking): t2 = timeutils.now() LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 'waited %(wait_secs)0.3fs', @@ -358,15 +389,18 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, 'function': reflection.get_callable_name(f), 'wait_secs': (t2 - t1)}) return f(*args, **kwargs) + except AcquireLockFailedException: + gotten = False finally: t3 = timeutils.now() if t2 is None: held_secs = "N/A" else: held_secs = "%0.3fs" % (t3 - t2) - LOG.debug('Lock "%(name)s" released by "%(function)s" :: held ' - '%(held_secs)s', + LOG.debug('Lock "%(name)s" "%(gotten)s" by "%(function)s" ::' + ' held %(held_secs)s', {'name': name, + 'gotten': 'released' if gotten else 'unacquired', 'function': reflection.get_callable_name(f), 'held_secs': held_secs}) return inner diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py index 0097bdc..4452467 100644 --- a/oslo_concurrency/tests/unit/test_lockutils.py +++ b/oslo_concurrency/tests/unit/test_lockutils.py @@ -186,6 +186,13 @@ class LockTestCase(test_base.BaseTestCase): pass self.assertRaises(NotImplementedError, do_test) + def test_fair_lock_with_nonblocking(self): + def do_test(): + with lockutils.lock('testlock', 'test-', fair=True, + blocking=False): + pass + self.assertRaises(NotImplementedError, do_test) + def test_nested_synchronized_external_works(self): """We can nest external syncs.""" self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') @@ -430,6 +437,35 @@ class FileBasedLockingTestCase(test_base.BaseTestCase): time.sleep(.5) os._exit(0) + def test_interprocess_nonblocking_external_lock(self): + """Check that we're not actually blocking between processes.""" + + nb_calls = multiprocessing.Value('i', 0) + + @lockutils.synchronized('foo', blocking=False, external=True, + lock_path=self.lock_dir) + def foo(param): + """Simulate a long-running operation in a process.""" + param.value += 1 + time.sleep(.5) + + def other(param): + foo(param) + + process = multiprocessing.Process(target=other, args=(nb_calls, )) + process.start() + # Make sure the other process grabs the lock + start = time.time() + while not os.path.exists(os.path.join(self.lock_dir, 'foo')): + if time.time() - start > 5: + self.fail('Timed out waiting for process to grab lock') + time.sleep(0) + process1 = multiprocessing.Process(target=other, args=(nb_calls, )) + process1.start() + process1.join() + process.join() + self.assertEqual(1, nb_calls.value) + def test_interthread_external_lock(self): call_list = [] @@ -465,6 +501,62 @@ class FileBasedLockingTestCase(test_base.BaseTestCase): thread.join() self.assertEqual(['other', 'other', 'main', 'main'], call_list) + def test_interthread_nonblocking_external_lock(self): + call_list = [] + + @lockutils.synchronized('foo', external=True, blocking=False, + lock_path=self.lock_dir) + def foo(param): + """Simulate a long-running threaded operation.""" + call_list.append(param) + time.sleep(.5) + call_list.append(param) + + def other(param): + foo(param) + + thread = threading.Thread(target=other, args=('other',)) + thread.start() + # Make sure the other thread grabs the lock + start = time.time() + while not os.path.exists(os.path.join(self.lock_dir, 'foo')): + if time.time() - start > 5: + self.fail('Timed out waiting for thread to grab lock') + time.sleep(0) + thread1 = threading.Thread(target=other, args=('main',)) + thread1.start() + thread1.join() + thread.join() + self.assertEqual(['other', 'other'], call_list) + + def test_interthread_nonblocking_internal_lock(self): + call_list = [] + + @lockutils.synchronized('foo', blocking=False, + lock_path=self.lock_dir) + def foo(param): + # Simulate a long-running threaded operation. + call_list.append(param) + time.sleep(.5) + call_list.append(param) + + def other(param): + foo(param) + + thread = threading.Thread(target=other, args=('other',)) + thread.start() + # Make sure the other thread grabs the lock + start = time.time() + while not call_list: + if time.time() - start > 5: + self.fail('Timed out waiting for thread to grab lock') + time.sleep(0) + thread1 = threading.Thread(target=other, args=('main',)) + thread1.start() + thread1.join() + thread.join() + self.assertEqual(['other', 'other'], call_list) + def test_non_destructive(self): lock_file = os.path.join(self.lock_dir, 'not-destroyed') with open(lock_file, 'w') as f: