Merge "Fix multilock concurrency when shared by > 1 threads"

This commit is contained in:
Jenkins
2014-09-27 20:28:30 +00:00
committed by Gerrit Code Review
2 changed files with 318 additions and 24 deletions

View File

@@ -19,6 +19,7 @@ import threading
import time
from concurrent import futures
import mock
from taskflow import test
from taskflow.utils import lock_utils
@@ -85,6 +86,223 @@ def _spawn_variation(readers, writers, max_workers=None):
return (writer_times, reader_times)
class MultilockTest(test.TestCase):
def test_empty_error(self):
self.assertRaises(ValueError,
lock_utils.MultiLock, [])
self.assertRaises(ValueError,
lock_utils.MultiLock, ())
self.assertRaises(ValueError,
lock_utils.MultiLock, iter([]))
def test_creation(self):
locks = []
for _i in range(0, 10):
locks.append(threading.Lock())
n_lock = lock_utils.MultiLock(locks)
self.assertEqual(0, n_lock.obtained)
self.assertEqual(len(locks), len(n_lock))
def test_acquired(self):
lock1 = threading.Lock()
lock2 = threading.Lock()
n_lock = lock_utils.MultiLock((lock1, lock2))
self.assertTrue(n_lock.acquire())
try:
self.assertTrue(lock1.locked())
self.assertTrue(lock2.locked())
finally:
n_lock.release()
self.assertFalse(lock1.locked())
self.assertFalse(lock2.locked())
def test_acquired_context_manager(self):
lock1 = threading.Lock()
n_lock = lock_utils.MultiLock([lock1])
with n_lock as gotten:
self.assertTrue(gotten)
self.assertTrue(lock1.locked())
self.assertFalse(lock1.locked())
self.assertEqual(0, n_lock.obtained)
def test_partial_acquired(self):
lock1 = threading.Lock()
lock2 = mock.create_autospec(threading.Lock())
lock2.acquire.return_value = False
n_lock = lock_utils.MultiLock((lock1, lock2))
with n_lock as gotten:
self.assertFalse(gotten)
self.assertTrue(lock1.locked())
self.assertEqual(1, n_lock.obtained)
self.assertEqual(2, len(n_lock))
self.assertEqual(0, n_lock.obtained)
def test_partial_acquired_failure(self):
lock1 = threading.Lock()
lock2 = mock.create_autospec(threading.Lock())
lock2.acquire.side_effect = RuntimeError("Broke")
n_lock = lock_utils.MultiLock((lock1, lock2))
self.assertRaises(threading.ThreadError, n_lock.acquire)
self.assertEqual(1, n_lock.obtained)
n_lock.release()
def test_release_failure(self):
lock1 = threading.Lock()
lock2 = mock.create_autospec(threading.Lock())
lock2.acquire.return_value = True
lock2.release.side_effect = RuntimeError("Broke")
n_lock = lock_utils.MultiLock((lock1, lock2))
self.assertTrue(n_lock.acquire())
self.assertEqual(2, n_lock.obtained)
self.assertRaises(threading.ThreadError, n_lock.release)
self.assertEqual(2, n_lock.obtained)
lock2.release.side_effect = None
n_lock.release()
self.assertEqual(0, n_lock.obtained)
def test_release_partial_failure(self):
lock1 = threading.Lock()
lock2 = mock.create_autospec(threading.Lock())
lock2.acquire.return_value = True
lock2.release.side_effect = RuntimeError("Broke")
lock3 = threading.Lock()
n_lock = lock_utils.MultiLock((lock1, lock2, lock3))
self.assertTrue(n_lock.acquire())
self.assertEqual(3, n_lock.obtained)
self.assertRaises(threading.ThreadError, n_lock.release)
self.assertEqual(2, n_lock.obtained)
lock2.release.side_effect = None
n_lock.release()
self.assertEqual(0, n_lock.obtained)
def test_acquired_pass(self):
activated = collections.deque()
lock1 = threading.Lock()
lock2 = threading.Lock()
n_lock = lock_utils.MultiLock((lock1, lock2))
def critical_section():
start = time.time()
time.sleep(0.05)
end = time.time()
activated.append((start, end))
def run():
with n_lock:
critical_section()
threads = []
for _i in range(0, 20):
t = threading.Thread(target=run)
t.daemon = True
threads.append(t)
t.start()
while threads:
t = threads.pop()
t.join()
for (start, end) in activated:
self.assertEqual(1, _find_overlaps(activated, start, end))
self.assertFalse(lock1.locked())
self.assertFalse(lock2.locked())
def test_acquired_fail(self):
activated = collections.deque()
lock1 = threading.Lock()
lock2 = threading.Lock()
n_lock = lock_utils.MultiLock((lock1, lock2))
def run():
with n_lock:
start = time.time()
time.sleep(0.05)
end = time.time()
activated.append((start, end))
def run_fail():
try:
with n_lock:
raise RuntimeError()
except RuntimeError:
pass
threads = []
for i in range(0, 20):
if i % 2 == 1:
target = run_fail
else:
target = run
t = threading.Thread(target=target)
threads.append(t)
t.daemon = True
t.start()
while threads:
t = threads.pop()
t.join()
for (start, end) in activated:
self.assertEqual(1, _find_overlaps(activated, start, end))
self.assertFalse(lock1.locked())
self.assertFalse(lock2.locked())
def test_double_acquire_single(self):
activated = collections.deque()
def run():
start = time.time()
time.sleep(0.05)
end = time.time()
activated.append((start, end))
lock1 = threading.RLock()
lock2 = threading.RLock()
n_lock = lock_utils.MultiLock((lock1, lock2))
with n_lock:
run()
with n_lock:
run()
run()
for (start, end) in activated:
self.assertEqual(1, _find_overlaps(activated, start, end))
def test_double_acquire_many(self):
activated = collections.deque()
n_lock = lock_utils.MultiLock((threading.RLock(), threading.RLock()))
def critical_section():
start = time.time()
time.sleep(0.05)
end = time.time()
activated.append((start, end))
def run():
with n_lock:
critical_section()
with n_lock:
critical_section()
critical_section()
threads = []
for i in range(0, 20):
t = threading.Thread(target=run)
threads.append(t)
t.daemon = True
t.start()
while threads:
t = threads.pop()
t.join()
for (start, end) in activated:
self.assertEqual(1, _find_overlaps(activated, start, end))
def test_no_acquire_release(self):
lock1 = threading.Lock()
lock2 = threading.Lock()
n_lock = lock_utils.MultiLock((lock1, lock2))
self.assertRaises(threading.ThreadError, n_lock.release)
class ReadWriteLockTest(test.TestCase):
def test_writer_abort(self):
lock = lock_utils.ReaderWriterLock()

View File

@@ -291,46 +291,122 @@ class MultiLock(object):
"""A class which attempts to obtain & release many locks at once.
It is typically useful as a context manager around many locks (instead of
having to nest individual lock context managers).
having to nest individual lock context managers, which can become pretty
awkward looking).
NOTE(harlowja): The locks that will be obtained will be in the order the
locks are given in the constructor, they will be acquired in order and
released in reverse order (so ordering matters).
"""
def __init__(self, locks):
assert len(locks) > 0, "Zero locks requested"
if not isinstance(locks, tuple):
locks = tuple(locks)
if len(locks) <= 0:
raise ValueError("Zero locks requested")
self._locks = locks
self._locked = [False] * len(locks)
self._local = threading.local()
@property
def _lock_stacks(self):
# This is weird, but this is how thread locals work (in that each
# thread will need to check if it has already created the attribute and
# if not then create it and set it to the thread local variable...)
#
# This isn't done in the constructor since the constructor is only
# activated by one of the many threads that could use this object,
# and that means that the attribute will only exist for that one
# thread.
try:
return self._local.stacks
except AttributeError:
self._local.stacks = []
return self._local.stacks
def __enter__(self):
self.acquire()
return self.acquire()
@property
def obtained(self):
"""Returns how many locks were last acquired/obtained."""
try:
return self._lock_stacks[-1]
except IndexError:
return 0
def __len__(self):
return len(self._locks)
def acquire(self):
"""This will attempt to acquire all the locks given in the constructor.
def is_locked(lock):
# NOTE(harlowja): reentrant locks (rlock) don't have this
# attribute, but normal non-reentrant locks do, how odd...
if hasattr(lock, 'locked'):
return lock.locked()
return False
If all the locks can not be acquired (and say only X of Y locks could
be acquired then this will return false to signify that not all the
locks were able to be acquired, you can later use the :attr:`.obtained`
property to determine how many were obtained during the last
acquisition attempt).
for i in range(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
NOTE(harlowja): When not all locks were acquired it is still required
to release since under partial acquisition the acquired locks
must still be released. For example if 4 out of 5 locks were acquired
this will return false, but the user **must** still release those
other 4 to avoid causing locking issues...
"""
gotten = 0
for lock in self._locks:
try:
acked = lock.acquire()
except (threading.ThreadError, RuntimeError) as e:
# If we have already gotten some set of the desired locks
# make sure we track that and ensure that we later release them
# instead of losing them.
if gotten:
self._lock_stacks.append(gotten)
raise threading.ThreadError(
"Unable to acquire lock %s/%s due to '%s'"
% (gotten + 1, len(self._locks), e))
else:
if not acked:
break
else:
gotten += 1
if gotten:
self._lock_stacks.append(gotten)
return gotten == len(self._locks)
def __exit__(self, type, value, traceback):
self.release()
def release(self):
for (i, locked) in enumerate(self._locked):
"""Releases any past acquired locks (partial or otherwise)."""
height = len(self._lock_stacks)
if not height:
# Raise the same error type as the threading.Lock raises so that
# it matches the behavior of the built-in class (it's odd though
# that the threading.RLock raises a runtime error on this same
# method instead...)
raise threading.ThreadError('Release attempted on unlocked lock')
# Cleans off one level of the stack (this is done so that if there
# are multiple __enter__() and __exit__() pairs active that this will
# only remove one level (the last one), and not all levels...
leftover = self._lock_stacks[-1]
while leftover:
lock = self._locks[leftover - 1]
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
lock.release()
except (threading.ThreadError, RuntimeError) as e:
# Ensure that we adjust the lock stack under failure so that
# if release is attempted again that we do not try to release
# the locks we already released...
self._lock_stacks[-1] = leftover
raise threading.ThreadError(
"Unable to release lock %s/%s due to '%s'"
% (leftover, len(self._locks), e))
else:
leftover -= 1
# At the end only clear it off, so that under partial failure we don't
# lose any locks...
self._lock_stacks.pop()
class _InterProcessLock(object):