Add no double writers thread test

To ensure that the rw-lock continues to work as
expected add a test which spins up a large number
of threads and then has them content on a single
lock and has those threads record if another thread is
also active in the same critical region. Each thread then
sleeps (while in critical region) and then releases. This
repeats for up to 5 seconds and then finishes; at that point
there should have been no simultaneous access (if there was
this would indicate the rw-lock is broken).

Change-Id: Ia293dda989ad924be3daca01eb79ca04bf60c79b
This commit is contained in:
Joshua Harlow
2015-03-08 18:38:38 -07:00
parent bd1dfce36a
commit 89087fd11d

View File

@@ -15,6 +15,7 @@
# under the License.
import collections
import random
import threading
import time
@@ -23,6 +24,7 @@ from concurrent import futures
from taskflow import test
from taskflow.test import mock
from taskflow.tests import utils as test_utils
from taskflow.types import timing
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import threading_utils
@@ -330,6 +332,43 @@ class MultilockTest(test.TestCase):
class ReadWriteLockTest(test.TestCase):
THREAD_COUNT = 20
def test_no_double_writers(self):
lock = lock_utils.ReaderWriterLock()
watch = timing.StopWatch(duration=5)
watch.start()
dups = collections.deque()
active = collections.deque()
def acquire_check(me):
with lock.write_lock():
if len(active) >= 1:
dups.append(me)
dups.extend(active)
active.append(me)
try:
time.sleep(random.random() / 100)
finally:
active.remove(me)
def run():
me = threading.current_thread()
while not watch.expired():
acquire_check(me)
threads = []
for i in range(0, self.THREAD_COUNT):
t = threading_utils.daemon_thread(run)
threads.append(t)
t.start()
while threads:
t = threads.pop()
t.join()
self.assertEqual([], list(dups))
self.assertEqual([], list(active))
def test_writer_abort(self):
lock = lock_utils.ReaderWriterLock()
self.assertFalse(lock.owner)