Add another probabilistic rw-lock test

This test works like the no-concurrent writer
one but tests reader and writes for a duration
and makes sure no collisions happen during this
period.

Change-Id: I11c1b39d34e4c83fa832c3b89e838e0c4635e750
This commit is contained in:
Joshua Harlow
2015-03-09 08:42:25 -07:00
parent c111ad751a
commit 47c0269578

View File

@@ -369,6 +369,49 @@ class ReadWriteLockTest(test.TestCase):
self.assertEqual([], list(dups))
self.assertEqual([], list(active))
def test_no_concurrent_readers_writers(self):
lock = lock_utils.ReaderWriterLock()
watch = timing.StopWatch(duration=5)
watch.start()
dups = collections.deque()
active = collections.deque()
def acquire_check(me, reader):
if reader:
lock_func = lock.read_lock
else:
lock_func = lock.write_lock
with lock_func():
if not reader:
# There should be no-one else currently active, if there
# is ensure we capture them so that we can later blow-up
# the test.
if len(active) >= 1:
dups.append(me)
dups.extend(active)
active.append(me)
try:
time.sleep(random.random() / 100)
finally:
active.remove(me)
def run():
me = threading.current_thread()
while not watch.expired():
acquire_check(me, random.choice([True, False]))
threads = []
for i in range(0, self.THREAD_COUNT):
t = threading_utils.daemon_thread(run)
threads.append(t)
t.start()
while threads:
t = threads.pop()
t.join()
self.assertEqual([], list(dups))
self.assertEqual([], list(active))
def test_writer_abort(self):
lock = lock_utils.ReaderWriterLock()
self.assertFalse(lock.owner)