diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index 67cf41e..1a9abe8 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -206,15 +206,35 @@ def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None, {'file': lock_file_path}) -def internal_lock(name, semaphores=None): +class AcquireLockFailedException(Exception): + def __init__(self, lock_name): + self.message = "Failed to acquire the lock %s" % lock_name + + def __str__(self): + return self.message + + +def internal_lock(name, semaphores=None, blocking=True): + @contextlib.contextmanager + def nonblocking(lock): + """Try to acquire the internal lock without blocking.""" + if not lock.acquire(blocking=False): + raise AcquireLockFailedException(name) + try: + yield lock + finally: + lock.release() + if semaphores is None: semaphores = _semaphores - return semaphores.get(name) + lock = semaphores.get(name) + + return nonblocking(lock) if not blocking else lock @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None, - do_log=True, semaphores=None, delay=0.01, fair=False): + do_log=True, semaphores=None, delay=0.01, fair=False, blocking=True): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use @@ -247,6 +267,10 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, :param fair: Whether or not we want a "fair" lock where contending lockers will get the lock in the order in which they tried to acquire it. + :param blocking: Whether to wait forever to try to acquire the lock. + Incompatible with fair locks because those provided by the fasteners + module doesn't implements a non-blocking behavior. + .. versionchanged:: 0.2 Added *do_log* optional parameter. @@ -257,17 +281,23 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, if semaphores is not None: raise NotImplementedError(_('Specifying semaphores is not ' 'supported when using fair locks.')) - # The fastners module specifies that write_lock() provides fairness. + if blocking is not True: + raise NotImplementedError(_('Disabling blocking is not supported ' + 'when using fair locks.')) + # The fasteners module specifies that write_lock() provides fairness. int_lock = internal_fair_lock(name).write_lock() else: - int_lock = internal_lock(name, semaphores=semaphores) + int_lock = internal_lock(name, semaphores=semaphores, + blocking=blocking) with int_lock: if do_log: LOG.debug('Acquired lock "%(lock)s"', {'lock': name}) try: if external and not CONF.oslo_concurrency.disable_process_locking: ext_lock = external_lock(name, lock_file_prefix, lock_path) - ext_lock.acquire(delay=delay) + gotten = ext_lock.acquire(delay=delay, blocking=blocking) + if not gotten: + raise AcquireLockFailedException(name) if do_log: LOG.debug('Acquired external semaphore "%(lock)s"', {'lock': name}) @@ -314,7 +344,7 @@ def lock_with_prefix(lock_file_prefix): def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, - semaphores=None, delay=0.01, fair=False): + semaphores=None, delay=0.01, fair=False, blocking=True): """Synchronization decorator. Decorating a method like so:: @@ -347,10 +377,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, def inner(*args, **kwargs): t1 = timeutils.now() t2 = None + gotten = True try: with lock(name, lock_file_prefix, external, lock_path, do_log=False, semaphores=semaphores, delay=delay, - fair=fair): + fair=fair, blocking=blocking): t2 = timeutils.now() LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 'waited %(wait_secs)0.3fs', @@ -358,15 +389,18 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, 'function': reflection.get_callable_name(f), 'wait_secs': (t2 - t1)}) return f(*args, **kwargs) + except AcquireLockFailedException: + gotten = False finally: t3 = timeutils.now() if t2 is None: held_secs = "N/A" else: held_secs = "%0.3fs" % (t3 - t2) - LOG.debug('Lock "%(name)s" released by "%(function)s" :: held ' - '%(held_secs)s', + LOG.debug('Lock "%(name)s" "%(gotten)s" by "%(function)s" ::' + ' held %(held_secs)s', {'name': name, + 'gotten': 'released' if gotten else 'unacquired', 'function': reflection.get_callable_name(f), 'held_secs': held_secs}) return inner diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py index 0097bdc..4452467 100644 --- a/oslo_concurrency/tests/unit/test_lockutils.py +++ b/oslo_concurrency/tests/unit/test_lockutils.py @@ -186,6 +186,13 @@ class LockTestCase(test_base.BaseTestCase): pass self.assertRaises(NotImplementedError, do_test) + def test_fair_lock_with_nonblocking(self): + def do_test(): + with lockutils.lock('testlock', 'test-', fair=True, + blocking=False): + pass + self.assertRaises(NotImplementedError, do_test) + def test_nested_synchronized_external_works(self): """We can nest external syncs.""" self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency') @@ -430,6 +437,35 @@ class FileBasedLockingTestCase(test_base.BaseTestCase): time.sleep(.5) os._exit(0) + def test_interprocess_nonblocking_external_lock(self): + """Check that we're not actually blocking between processes.""" + + nb_calls = multiprocessing.Value('i', 0) + + @lockutils.synchronized('foo', blocking=False, external=True, + lock_path=self.lock_dir) + def foo(param): + """Simulate a long-running operation in a process.""" + param.value += 1 + time.sleep(.5) + + def other(param): + foo(param) + + process = multiprocessing.Process(target=other, args=(nb_calls, )) + process.start() + # Make sure the other process grabs the lock + start = time.time() + while not os.path.exists(os.path.join(self.lock_dir, 'foo')): + if time.time() - start > 5: + self.fail('Timed out waiting for process to grab lock') + time.sleep(0) + process1 = multiprocessing.Process(target=other, args=(nb_calls, )) + process1.start() + process1.join() + process.join() + self.assertEqual(1, nb_calls.value) + def test_interthread_external_lock(self): call_list = [] @@ -465,6 +501,62 @@ class FileBasedLockingTestCase(test_base.BaseTestCase): thread.join() self.assertEqual(['other', 'other', 'main', 'main'], call_list) + def test_interthread_nonblocking_external_lock(self): + call_list = [] + + @lockutils.synchronized('foo', external=True, blocking=False, + lock_path=self.lock_dir) + def foo(param): + """Simulate a long-running threaded operation.""" + call_list.append(param) + time.sleep(.5) + call_list.append(param) + + def other(param): + foo(param) + + thread = threading.Thread(target=other, args=('other',)) + thread.start() + # Make sure the other thread grabs the lock + start = time.time() + while not os.path.exists(os.path.join(self.lock_dir, 'foo')): + if time.time() - start > 5: + self.fail('Timed out waiting for thread to grab lock') + time.sleep(0) + thread1 = threading.Thread(target=other, args=('main',)) + thread1.start() + thread1.join() + thread.join() + self.assertEqual(['other', 'other'], call_list) + + def test_interthread_nonblocking_internal_lock(self): + call_list = [] + + @lockutils.synchronized('foo', blocking=False, + lock_path=self.lock_dir) + def foo(param): + # Simulate a long-running threaded operation. + call_list.append(param) + time.sleep(.5) + call_list.append(param) + + def other(param): + foo(param) + + thread = threading.Thread(target=other, args=('other',)) + thread.start() + # Make sure the other thread grabs the lock + start = time.time() + while not call_list: + if time.time() - start > 5: + self.fail('Timed out waiting for thread to grab lock') + time.sleep(0) + thread1 = threading.Thread(target=other, args=('main',)) + thread1.start() + thread1.join() + thread.join() + self.assertEqual(['other', 'other'], call_list) + def test_non_destructive(self): lock_file = os.path.join(self.lock_dir, 'not-destroyed') with open(lock_file, 'w') as f: