From 2d26c29e67569d277d33cf406c49db6b320780d9 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 5 Feb 2014 16:37:02 -0800 Subject: [PATCH] Fix deadlock on waiting for pending_writers to be empty Deadlock can occur when a reader X waits for pending writers to be empty but the reader X already owns a read lock (since it will then wait for pending writers to be empty which it will never be, since pending writers is only empty when there are no readers - but since the current thread X is getting another read lock it result in deadlock for the reader and deadlock for the writer). Closes-Bug: #1276868 Change-Id: Ibe13eafa4bd0bc817e8bfe3cf303c54298778b37 --- taskflow/tests/unit/test_utils_lock_utils.py | 52 ++++++++++++++++++++ taskflow/utils/lock_utils.py | 14 ++++-- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py index 6846b13a..80037b85 100644 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ b/taskflow/tests/unit/test_utils_lock_utils.py @@ -17,6 +17,7 @@ # under the License. import collections +import threading import time from concurrent import futures @@ -89,6 +90,57 @@ class ReadWriteLockTest(test.TestCase): self.assertRaises(RuntimeError, blow_up) self.assertFalse(lock.owner) + def test_double_reader_abort(self): + lock = lock_utils.ReaderWriterLock() + activated = collections.deque() + + def double_bad_reader(): + with lock.read_lock(): + with lock.read_lock(): + raise RuntimeError("Broken") + + def happy_writer(): + with lock.write_lock(): + activated.append(lock.owner) + + with futures.ThreadPoolExecutor(max_workers=20) as e: + for i in range(0, 20): + if i % 2 == 0: + e.submit(double_bad_reader) + else: + e.submit(happy_writer) + + self.assertEqual(10, len([a for a in activated if a == 'w'])) + + def test_double_reader_writer(self): + lock = lock_utils.ReaderWriterLock() + activated = collections.deque() + active = threading.Event() + + def double_reader(): + with lock.read_lock(): + active.set() + while lock.pending_writers == 0: + time.sleep(0.001) + with lock.read_lock(): + activated.append(lock.owner) + + def happy_writer(): + with lock.write_lock(): + activated.append(lock.owner) + + reader = threading.Thread(target=double_reader) + reader.start() + active.wait() + + writer = threading.Thread(target=happy_writer) + writer.start() + + reader.join() + writer.join() + self.assertEqual(2, len(activated)) + self.assertEqual(['r', 'w'], list(activated)) + def test_reader_chaotic(self): lock = lock_utils.ReaderWriterLock() activated = collections.deque() diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py index dc987df9..c1a73355 100644 --- a/taskflow/utils/lock_utils.py +++ b/taskflow/utils/lock_utils.py @@ -86,6 +86,14 @@ class ReaderWriterLock(object): self._readers = collections.deque() self._cond = threading.Condition() + @property + def pending_writers(self): + self._cond.acquire() + try: + return len(self._pending_writers) + finally: + self._cond.release() + def is_writer(self, check_pending=True): """Returns if the caller is the active writer or a pending writer.""" self._cond.acquire() @@ -138,11 +146,11 @@ class ReaderWriterLock(object): self._cond.acquire() try: while True: - # No active or pending writers; we are good to become a reader. - if self._writer is None and len(self._pending_writers) == 0: + # No active writer; we are good to become a reader. + if self._writer is None: self._readers.append(me) break - # Some writers; guess we have to wait. + # An active writer; guess we have to wait. self._cond.wait() finally: self._cond.release()