Replace lock_utils lock(s) with fasteners package
The usage of this package allows us to get rid of the reader/writer lock (since that package provides that) as well as the interprocess lock as all of these are now provided by that package instead. Change-Id: I87990b46d397f6df779de7028bcc40e28621e1ba
This commit is contained in:
committed by
Joshua Harlow
parent
e183fc9dbc
commit
c3afeb5280
@@ -16,6 +16,9 @@ six>=1.9.0
|
||||
# Enum library made for <= python 3.3
|
||||
enum34
|
||||
|
||||
# For reader/writer + interprocess locks.
|
||||
fasteners>=0.5 # Apache-2.0
|
||||
|
||||
# Very nice graph library
|
||||
networkx>=1.8
|
||||
|
||||
|
||||
@@ -20,11 +20,11 @@ import errno
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import fasteners
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.persistence import path_based
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
|
||||
@@ -64,7 +64,7 @@ class DirBackend(path_based.PathBasedBackend):
|
||||
if not self._path:
|
||||
raise ValueError("Empty path is disallowed")
|
||||
self._path = os.path.abspath(self._path)
|
||||
self.lock = lock_utils.ReaderWriterLock()
|
||||
self.lock = fasteners.ReaderWriterLock()
|
||||
|
||||
def get_connection(self):
|
||||
return Connection(self)
|
||||
@@ -97,7 +97,7 @@ class Connection(path_based.PathBasedConnection):
|
||||
@contextlib.contextmanager
|
||||
def _path_lock(self, path):
|
||||
lockfile = self._join_path(path, 'lock')
|
||||
with lock_utils.InterProcessLock(lockfile) as lock:
|
||||
with fasteners.InterProcessLock(lockfile) as lock:
|
||||
with _storagefailure_wrapper():
|
||||
yield lock
|
||||
|
||||
|
||||
@@ -20,12 +20,12 @@ import copy
|
||||
import itertools
|
||||
import posixpath as pp
|
||||
|
||||
import fasteners
|
||||
import six
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.persistence import path_based
|
||||
from taskflow.types import tree
|
||||
from taskflow.utils import lock_utils
|
||||
|
||||
|
||||
class FakeInode(tree.Node):
|
||||
@@ -261,7 +261,7 @@ class MemoryBackend(path_based.PathBasedBackend):
|
||||
self._path = pp.sep
|
||||
self.memory = FakeFilesystem(deep_copy=self._conf.get('deep_copy',
|
||||
True))
|
||||
self.lock = lock_utils.ReaderWriterLock()
|
||||
self.lock = fasteners.ReaderWriterLock()
|
||||
|
||||
def get_connection(self):
|
||||
return Connection(self)
|
||||
|
||||
@@ -16,6 +16,7 @@
|
||||
|
||||
import contextlib
|
||||
|
||||
import fasteners
|
||||
from oslo_utils import reflection
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
@@ -28,7 +29,6 @@ from taskflow import retry
|
||||
from taskflow import states
|
||||
from taskflow import task
|
||||
from taskflow.types import failure
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import misc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@@ -150,7 +150,7 @@ class Storage(object):
|
||||
self._flowdetail = flow_detail
|
||||
self._transients = {}
|
||||
self._injected_args = {}
|
||||
self._lock = lock_utils.ReaderWriterLock()
|
||||
self._lock = fasteners.ReaderWriterLock()
|
||||
self._ensure_matchers = [
|
||||
((task.BaseTask,), self._ensure_task),
|
||||
((retry.Retry,), self._ensure_retry),
|
||||
@@ -334,46 +334,46 @@ class Storage(object):
|
||||
original_atom_detail.update(conn.update_atom_details(atom_detail))
|
||||
return original_atom_detail
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_atom_uuid(self, atom_name):
|
||||
"""Gets an atoms uuid given a atoms name."""
|
||||
source, _clone = self._atomdetail_by_name(atom_name)
|
||||
return source.uuid
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def set_atom_state(self, atom_name, state):
|
||||
"""Sets an atoms state."""
|
||||
source, clone = self._atomdetail_by_name(atom_name, clone=True)
|
||||
clone.state = state
|
||||
self._with_connection(self._save_atom_detail, source, clone)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_atom_state(self, atom_name):
|
||||
"""Gets the state of an atom given an atoms name."""
|
||||
source, _clone = self._atomdetail_by_name(atom_name)
|
||||
return source.state
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def set_atom_intention(self, atom_name, intention):
|
||||
"""Sets the intention of an atom given an atoms name."""
|
||||
source, clone = self._atomdetail_by_name(atom_name, clone=True)
|
||||
clone.intention = intention
|
||||
self._with_connection(self._save_atom_detail, source, clone)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_atom_intention(self, atom_name):
|
||||
"""Gets the intention of an atom given an atoms name."""
|
||||
source, _clone = self._atomdetail_by_name(atom_name)
|
||||
return source.intention
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_atoms_states(self, atom_names):
|
||||
"""Gets all atoms states given a set of names."""
|
||||
return dict((name, (self.get_atom_state(name),
|
||||
self.get_atom_intention(name)))
|
||||
for name in atom_names)
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def _update_atom_metadata(self, atom_name, update_with,
|
||||
expected_type=None):
|
||||
source, clone = self._atomdetail_by_name(atom_name,
|
||||
@@ -417,7 +417,7 @@ class Storage(object):
|
||||
self._update_atom_metadata(task_name, update_with,
|
||||
expected_type=logbook.TaskDetail)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_task_progress(self, task_name):
|
||||
"""Get the progress of a task given a tasks name.
|
||||
|
||||
@@ -431,7 +431,7 @@ class Storage(object):
|
||||
except KeyError:
|
||||
return 0.0
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_task_progress_details(self, task_name):
|
||||
"""Get the progress details of a task given a tasks name.
|
||||
|
||||
@@ -463,7 +463,7 @@ class Storage(object):
|
||||
LOG.warning("Atom %s did not supply result "
|
||||
"with index %r (name %s)", atom_name, index, name)
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def save(self, atom_name, data, state=states.SUCCESS):
|
||||
"""Put result for atom with id 'uuid' to storage."""
|
||||
source, clone = self._atomdetail_by_name(atom_name, clone=True)
|
||||
@@ -477,7 +477,7 @@ class Storage(object):
|
||||
else:
|
||||
self._check_all_results_provided(result.name, data)
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def save_retry_failure(self, retry_name, failed_atom_name, failure):
|
||||
"""Save subflow failure to retry controller history."""
|
||||
source, clone = self._atomdetail_by_name(
|
||||
@@ -493,7 +493,7 @@ class Storage(object):
|
||||
failures[failed_atom_name] = failure
|
||||
self._with_connection(self._save_atom_detail, source, clone)
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def cleanup_retry_history(self, retry_name, state):
|
||||
"""Cleanup history of retry atom with given name."""
|
||||
source, clone = self._atomdetail_by_name(
|
||||
@@ -502,7 +502,7 @@ class Storage(object):
|
||||
clone.results = []
|
||||
self._with_connection(self._save_atom_detail, source, clone)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def _get(self, atom_name, only_last=False):
|
||||
source, _clone = self._atomdetail_by_name(atom_name)
|
||||
if source.failure is not None:
|
||||
@@ -525,7 +525,7 @@ class Storage(object):
|
||||
"""Gets the results for an atom with a given name from storage."""
|
||||
return self._get(atom_name)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_failures(self):
|
||||
"""Get list of failures that happened with this flow.
|
||||
|
||||
@@ -537,7 +537,7 @@ class Storage(object):
|
||||
"""Returns True if there are failed tasks in the storage."""
|
||||
return bool(self._failures)
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def reset(self, atom_name, state=states.PENDING):
|
||||
"""Reset atom with given name (if the atom is not in a given state)."""
|
||||
if atom_name == self.injector_name:
|
||||
@@ -605,7 +605,7 @@ class Storage(object):
|
||||
else:
|
||||
save_persistent()
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def inject(self, pairs, transient=False):
|
||||
"""Add values into storage.
|
||||
|
||||
@@ -682,7 +682,7 @@ class Storage(object):
|
||||
if provider not in entries:
|
||||
entries.append(provider)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def fetch(self, name, many_handler=None):
|
||||
"""Fetch a named result."""
|
||||
def _many_handler(values):
|
||||
@@ -717,7 +717,7 @@ class Storage(object):
|
||||
else:
|
||||
return many_handler(values)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def fetch_unsatisfied_args(self, atom_name, args_mapping,
|
||||
scope_walker=None, optional_args=None):
|
||||
"""Fetch unsatisfied atom arguments using an atoms argument mapping.
|
||||
@@ -803,7 +803,7 @@ class Storage(object):
|
||||
missing.discard(bound_name)
|
||||
return missing
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def fetch_all(self, many_handler=None):
|
||||
"""Fetch all named results known so far."""
|
||||
def _many_handler(values):
|
||||
@@ -820,7 +820,7 @@ class Storage(object):
|
||||
pass
|
||||
return results
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def fetch_mapped_args(self, args_mapping,
|
||||
atom_name=None, scope_walker=None,
|
||||
optional_args=None):
|
||||
@@ -934,14 +934,14 @@ class Storage(object):
|
||||
bound_name, name, value, provider)
|
||||
return mapped_args
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def set_flow_state(self, state):
|
||||
"""Set flow details state and save it."""
|
||||
source, clone = self._fetch_flowdetail(clone=True)
|
||||
clone.state = state
|
||||
self._with_connection(self._save_flow_detail, source, clone)
|
||||
|
||||
@lock_utils.write_locked
|
||||
@fasteners.write_locked
|
||||
def update_flow_metadata(self, update_with):
|
||||
"""Update flowdetails metadata and save it."""
|
||||
if update_with:
|
||||
@@ -949,7 +949,7 @@ class Storage(object):
|
||||
clone.meta.update(update_with)
|
||||
self._with_connection(self._save_flow_detail, source, clone)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_flow_state(self):
|
||||
"""Get state from flow details."""
|
||||
source = self._flowdetail
|
||||
@@ -971,14 +971,14 @@ class Storage(object):
|
||||
failure = ad.failure
|
||||
return retry.History(ad.results, failure=failure)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_retry_history(self, retry_name):
|
||||
"""Fetch a single retrys history."""
|
||||
source, _clone = self._atomdetail_by_name(
|
||||
retry_name, expected_type=logbook.RetryDetail)
|
||||
return self._translate_into_history(source)
|
||||
|
||||
@lock_utils.read_locked
|
||||
@fasteners.read_locked
|
||||
def get_retry_histories(self):
|
||||
"""Fetch all retrys histories."""
|
||||
histories = []
|
||||
|
||||
@@ -15,16 +15,11 @@
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import random
|
||||
import threading
|
||||
import time
|
||||
|
||||
from concurrent import futures
|
||||
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.types import timing
|
||||
from taskflow.utils import lock_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils
|
||||
@@ -50,51 +45,6 @@ def _find_overlaps(times, start, end):
|
||||
return overlaps
|
||||
|
||||
|
||||
def _spawn_variation(readers, writers, max_workers=None):
|
||||
start_stops = collections.deque()
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
|
||||
def read_func(ident):
|
||||
with lock.read_lock():
|
||||
# TODO(harlowja): sometime in the future use a monotonic clock here
|
||||
# to avoid problems that can be caused by ntpd resyncing the clock
|
||||
# while we are actively running.
|
||||
enter_time = now()
|
||||
time.sleep(WORK_TIMES[ident % len(WORK_TIMES)])
|
||||
exit_time = now()
|
||||
start_stops.append((lock.READER, enter_time, exit_time))
|
||||
time.sleep(NAPPY_TIME)
|
||||
|
||||
def write_func(ident):
|
||||
with lock.write_lock():
|
||||
enter_time = now()
|
||||
time.sleep(WORK_TIMES[ident % len(WORK_TIMES)])
|
||||
exit_time = now()
|
||||
start_stops.append((lock.WRITER, enter_time, exit_time))
|
||||
time.sleep(NAPPY_TIME)
|
||||
|
||||
if max_workers is None:
|
||||
max_workers = max(0, readers) + max(0, writers)
|
||||
if max_workers > 0:
|
||||
with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
|
||||
count = 0
|
||||
for _i in range(0, readers):
|
||||
e.submit(read_func, count)
|
||||
count += 1
|
||||
for _i in range(0, writers):
|
||||
e.submit(write_func, count)
|
||||
count += 1
|
||||
|
||||
writer_times = []
|
||||
reader_times = []
|
||||
for (lock_type, start, stop) in list(start_stops):
|
||||
if lock_type == lock.WRITER:
|
||||
writer_times.append((start, stop))
|
||||
else:
|
||||
reader_times.append((start, stop))
|
||||
return (writer_times, reader_times)
|
||||
|
||||
|
||||
class MultilockTest(test.TestCase):
|
||||
THREAD_COUNT = 20
|
||||
|
||||
@@ -329,306 +279,3 @@ class MultilockTest(test.TestCase):
|
||||
lock2 = threading.Lock()
|
||||
n_lock = lock_utils.MultiLock((lock1, lock2))
|
||||
self.assertRaises(threading.ThreadError, n_lock.release)
|
||||
|
||||
|
||||
class ReadWriteLockTest(test.TestCase):
|
||||
THREAD_COUNT = 20
|
||||
|
||||
def test_no_double_writers(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
watch = timing.StopWatch(duration=5)
|
||||
watch.start()
|
||||
dups = collections.deque()
|
||||
active = collections.deque()
|
||||
|
||||
def acquire_check(me):
|
||||
with lock.write_lock():
|
||||
if len(active) >= 1:
|
||||
dups.append(me)
|
||||
dups.extend(active)
|
||||
active.append(me)
|
||||
try:
|
||||
time.sleep(random.random() / 100)
|
||||
finally:
|
||||
active.remove(me)
|
||||
|
||||
def run():
|
||||
me = threading.current_thread()
|
||||
while not watch.expired():
|
||||
acquire_check(me)
|
||||
|
||||
threads = []
|
||||
for i in range(0, self.THREAD_COUNT):
|
||||
t = threading_utils.daemon_thread(run)
|
||||
threads.append(t)
|
||||
t.start()
|
||||
while threads:
|
||||
t = threads.pop()
|
||||
t.join()
|
||||
|
||||
self.assertEqual([], list(dups))
|
||||
self.assertEqual([], list(active))
|
||||
|
||||
def test_no_concurrent_readers_writers(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
watch = timing.StopWatch(duration=5)
|
||||
watch.start()
|
||||
dups = collections.deque()
|
||||
active = collections.deque()
|
||||
|
||||
def acquire_check(me, reader):
|
||||
if reader:
|
||||
lock_func = lock.read_lock
|
||||
else:
|
||||
lock_func = lock.write_lock
|
||||
with lock_func():
|
||||
if not reader:
|
||||
# There should be no-one else currently active, if there
|
||||
# is ensure we capture them so that we can later blow-up
|
||||
# the test.
|
||||
if len(active) >= 1:
|
||||
dups.append(me)
|
||||
dups.extend(active)
|
||||
active.append(me)
|
||||
try:
|
||||
time.sleep(random.random() / 100)
|
||||
finally:
|
||||
active.remove(me)
|
||||
|
||||
def run():
|
||||
me = threading.current_thread()
|
||||
while not watch.expired():
|
||||
acquire_check(me, random.choice([True, False]))
|
||||
|
||||
threads = []
|
||||
for i in range(0, self.THREAD_COUNT):
|
||||
t = threading_utils.daemon_thread(run)
|
||||
threads.append(t)
|
||||
t.start()
|
||||
while threads:
|
||||
t = threads.pop()
|
||||
t.join()
|
||||
|
||||
self.assertEqual([], list(dups))
|
||||
self.assertEqual([], list(active))
|
||||
|
||||
def test_writer_abort(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
self.assertFalse(lock.owner)
|
||||
|
||||
def blow_up():
|
||||
with lock.write_lock():
|
||||
self.assertEqual(lock.WRITER, lock.owner)
|
||||
raise RuntimeError("Broken")
|
||||
|
||||
self.assertRaises(RuntimeError, blow_up)
|
||||
self.assertFalse(lock.owner)
|
||||
|
||||
def test_reader_abort(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
self.assertFalse(lock.owner)
|
||||
|
||||
def blow_up():
|
||||
with lock.read_lock():
|
||||
self.assertEqual(lock.READER, lock.owner)
|
||||
raise RuntimeError("Broken")
|
||||
|
||||
self.assertRaises(RuntimeError, blow_up)
|
||||
self.assertFalse(lock.owner)
|
||||
|
||||
def test_double_reader_abort(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
activated = collections.deque()
|
||||
|
||||
def double_bad_reader():
|
||||
with lock.read_lock():
|
||||
with lock.read_lock():
|
||||
raise RuntimeError("Broken")
|
||||
|
||||
def happy_writer():
|
||||
with lock.write_lock():
|
||||
activated.append(lock.owner)
|
||||
|
||||
with futures.ThreadPoolExecutor(max_workers=20) as e:
|
||||
for i in range(0, 20):
|
||||
if i % 2 == 0:
|
||||
e.submit(double_bad_reader)
|
||||
else:
|
||||
e.submit(happy_writer)
|
||||
|
||||
self.assertEqual(10, len([a for a in activated if a == 'w']))
|
||||
|
||||
def test_double_reader_writer(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
activated = collections.deque()
|
||||
active = threading_utils.Event()
|
||||
|
||||
def double_reader():
|
||||
with lock.read_lock():
|
||||
active.set()
|
||||
while not lock.has_pending_writers:
|
||||
time.sleep(0.001)
|
||||
with lock.read_lock():
|
||||
activated.append(lock.owner)
|
||||
|
||||
def happy_writer():
|
||||
with lock.write_lock():
|
||||
activated.append(lock.owner)
|
||||
|
||||
reader = threading_utils.daemon_thread(double_reader)
|
||||
reader.start()
|
||||
self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT))
|
||||
|
||||
writer = threading_utils.daemon_thread(happy_writer)
|
||||
writer.start()
|
||||
|
||||
reader.join()
|
||||
writer.join()
|
||||
self.assertEqual(2, len(activated))
|
||||
self.assertEqual(['r', 'w'], list(activated))
|
||||
|
||||
def test_reader_chaotic(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
activated = collections.deque()
|
||||
|
||||
def chaotic_reader(blow_up):
|
||||
with lock.read_lock():
|
||||
if blow_up:
|
||||
raise RuntimeError("Broken")
|
||||
else:
|
||||
activated.append(lock.owner)
|
||||
|
||||
def happy_writer():
|
||||
with lock.write_lock():
|
||||
activated.append(lock.owner)
|
||||
|
||||
with futures.ThreadPoolExecutor(max_workers=20) as e:
|
||||
for i in range(0, 20):
|
||||
if i % 2 == 0:
|
||||
e.submit(chaotic_reader, blow_up=bool(i % 4 == 0))
|
||||
else:
|
||||
e.submit(happy_writer)
|
||||
|
||||
writers = [a for a in activated if a == 'w']
|
||||
readers = [a for a in activated if a == 'r']
|
||||
self.assertEqual(10, len(writers))
|
||||
self.assertEqual(5, len(readers))
|
||||
|
||||
def test_writer_chaotic(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
activated = collections.deque()
|
||||
|
||||
def chaotic_writer(blow_up):
|
||||
with lock.write_lock():
|
||||
if blow_up:
|
||||
raise RuntimeError("Broken")
|
||||
else:
|
||||
activated.append(lock.owner)
|
||||
|
||||
def happy_reader():
|
||||
with lock.read_lock():
|
||||
activated.append(lock.owner)
|
||||
|
||||
with futures.ThreadPoolExecutor(max_workers=20) as e:
|
||||
for i in range(0, 20):
|
||||
if i % 2 == 0:
|
||||
e.submit(chaotic_writer, blow_up=bool(i % 4 == 0))
|
||||
else:
|
||||
e.submit(happy_reader)
|
||||
|
||||
writers = [a for a in activated if a == 'w']
|
||||
readers = [a for a in activated if a == 'r']
|
||||
self.assertEqual(5, len(writers))
|
||||
self.assertEqual(10, len(readers))
|
||||
|
||||
def test_single_reader_writer(self):
|
||||
results = []
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
with lock.read_lock():
|
||||
self.assertTrue(lock.is_reader())
|
||||
self.assertEqual(0, len(results))
|
||||
with lock.write_lock():
|
||||
results.append(1)
|
||||
self.assertTrue(lock.is_writer())
|
||||
with lock.read_lock():
|
||||
self.assertTrue(lock.is_reader())
|
||||
self.assertEqual(1, len(results))
|
||||
self.assertFalse(lock.is_reader())
|
||||
self.assertFalse(lock.is_writer())
|
||||
|
||||
def test_reader_to_writer(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
|
||||
def writer_func():
|
||||
with lock.write_lock():
|
||||
pass
|
||||
|
||||
with lock.read_lock():
|
||||
self.assertRaises(RuntimeError, writer_func)
|
||||
self.assertFalse(lock.is_writer())
|
||||
|
||||
self.assertFalse(lock.is_reader())
|
||||
self.assertFalse(lock.is_writer())
|
||||
|
||||
def test_writer_to_reader(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
|
||||
def reader_func():
|
||||
with lock.read_lock():
|
||||
self.assertTrue(lock.is_writer())
|
||||
self.assertTrue(lock.is_reader())
|
||||
|
||||
with lock.write_lock():
|
||||
self.assertIsNone(reader_func())
|
||||
self.assertFalse(lock.is_reader())
|
||||
|
||||
self.assertFalse(lock.is_reader())
|
||||
self.assertFalse(lock.is_writer())
|
||||
|
||||
def test_double_writer(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
with lock.write_lock():
|
||||
self.assertFalse(lock.is_reader())
|
||||
self.assertTrue(lock.is_writer())
|
||||
with lock.write_lock():
|
||||
self.assertTrue(lock.is_writer())
|
||||
self.assertTrue(lock.is_writer())
|
||||
|
||||
self.assertFalse(lock.is_reader())
|
||||
self.assertFalse(lock.is_writer())
|
||||
|
||||
def test_double_reader(self):
|
||||
lock = lock_utils.ReaderWriterLock()
|
||||
with lock.read_lock():
|
||||
self.assertTrue(lock.is_reader())
|
||||
self.assertFalse(lock.is_writer())
|
||||
with lock.read_lock():
|
||||
self.assertTrue(lock.is_reader())
|
||||
self.assertTrue(lock.is_reader())
|
||||
|
||||
self.assertFalse(lock.is_reader())
|
||||
self.assertFalse(lock.is_writer())
|
||||
|
||||
def test_multi_reader_multi_writer(self):
|
||||
writer_times, reader_times = _spawn_variation(10, 10)
|
||||
self.assertEqual(10, len(writer_times))
|
||||
self.assertEqual(10, len(reader_times))
|
||||
for (start, stop) in writer_times:
|
||||
self.assertEqual(0, _find_overlaps(reader_times, start, stop))
|
||||
self.assertEqual(1, _find_overlaps(writer_times, start, stop))
|
||||
for (start, stop) in reader_times:
|
||||
self.assertEqual(0, _find_overlaps(writer_times, start, stop))
|
||||
|
||||
def test_multi_reader_single_writer(self):
|
||||
writer_times, reader_times = _spawn_variation(9, 1)
|
||||
self.assertEqual(1, len(writer_times))
|
||||
self.assertEqual(9, len(reader_times))
|
||||
start, stop = writer_times[0]
|
||||
self.assertEqual(0, _find_overlaps(reader_times, start, stop))
|
||||
|
||||
def test_multi_writer(self):
|
||||
writer_times, reader_times = _spawn_variation(0, 10)
|
||||
self.assertEqual(10, len(writer_times))
|
||||
self.assertEqual(0, len(reader_times))
|
||||
for (start, stop) in writer_times:
|
||||
self.assertEqual(1, _find_overlaps(writer_times, start, stop))
|
||||
|
||||
@@ -19,23 +19,14 @@
|
||||
# pulls in oslo.cfg) and is reduced to only what taskflow currently wants to
|
||||
# use from that code.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import errno
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_utils import importutils
|
||||
import six
|
||||
|
||||
from taskflow import logging
|
||||
from taskflow.utils import misc
|
||||
|
||||
# Used for the reader-writer lock get the right thread 'hack' (needed below).
|
||||
eventlet = importutils.try_import('eventlet')
|
||||
eventlet_patcher = importutils.try_import('eventlet.patcher')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@@ -96,203 +87,6 @@ def locked(*args, **kwargs):
|
||||
return decorator
|
||||
|
||||
|
||||
def read_locked(*args, **kwargs):
|
||||
"""Acquires & releases a read lock around call into decorated method.
|
||||
|
||||
NOTE(harlowja): if no attribute name is provided then by default the
|
||||
attribute named '_lock' is looked for (this attribute is expected to be
|
||||
the rw-lock object) in the instance object this decorator is attached to.
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
attr_name = kwargs.get('lock', '_lock')
|
||||
|
||||
@six.wraps(f)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
rw_lock = getattr(self, attr_name)
|
||||
with rw_lock.read_lock():
|
||||
return f(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
# This is needed to handle when the decorator has args or the decorator
|
||||
# doesn't have args, python is rather weird here...
|
||||
if kwargs or not args:
|
||||
return decorator
|
||||
else:
|
||||
if len(args) == 1:
|
||||
return decorator(args[0])
|
||||
else:
|
||||
return decorator
|
||||
|
||||
|
||||
def write_locked(*args, **kwargs):
|
||||
"""Acquires & releases a write lock around call into decorated method.
|
||||
|
||||
NOTE(harlowja): if no attribute name is provided then by default the
|
||||
attribute named '_lock' is looked for (this attribute is expected to be
|
||||
the rw-lock object) in the instance object this decorator is attached to.
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
attr_name = kwargs.get('lock', '_lock')
|
||||
|
||||
@six.wraps(f)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
rw_lock = getattr(self, attr_name)
|
||||
with rw_lock.write_lock():
|
||||
return f(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
# This is needed to handle when the decorator has args or the decorator
|
||||
# doesn't have args, python is rather weird here...
|
||||
if kwargs or not args:
|
||||
return decorator
|
||||
else:
|
||||
if len(args) == 1:
|
||||
return decorator(args[0])
|
||||
else:
|
||||
return decorator
|
||||
|
||||
|
||||
class ReaderWriterLock(object):
|
||||
"""A reader/writer lock.
|
||||
|
||||
This lock allows for simultaneous readers to exist but only one writer
|
||||
to exist for use-cases where it is useful to have such types of locks.
|
||||
|
||||
Currently a reader can not escalate its read lock to a write lock and
|
||||
a writer can not acquire a read lock while it is waiting on the write
|
||||
lock.
|
||||
|
||||
In the future these restrictions may be relaxed.
|
||||
|
||||
This can be eventually removed if http://bugs.python.org/issue8800 ever
|
||||
gets accepted into the python standard threading library...
|
||||
"""
|
||||
WRITER = 'w'
|
||||
READER = 'r'
|
||||
|
||||
@staticmethod
|
||||
def _fetch_current_thread_functor():
|
||||
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
|
||||
# or addressed we have to use complicated workaround to get a object
|
||||
# that will not be recycled; the usage of threading.current_thread()
|
||||
# doesn't appear to currently be monkey patched and therefore isn't
|
||||
# reliable to use (and breaks badly when used as all threads share
|
||||
# the same current_thread() object)...
|
||||
if eventlet is not None and eventlet_patcher is not None:
|
||||
if eventlet_patcher.is_monkey_patched('thread'):
|
||||
return lambda: eventlet.getcurrent()
|
||||
return lambda: threading.current_thread()
|
||||
|
||||
def __init__(self):
|
||||
self._writer = None
|
||||
self._pending_writers = collections.deque()
|
||||
self._readers = collections.deque()
|
||||
self._cond = threading.Condition()
|
||||
self._current_thread = self._fetch_current_thread_functor()
|
||||
|
||||
@property
|
||||
def has_pending_writers(self):
|
||||
"""Returns if there are writers waiting to become the *one* writer."""
|
||||
return bool(self._pending_writers)
|
||||
|
||||
def is_writer(self, check_pending=True):
|
||||
"""Returns if the caller is the active writer or a pending writer."""
|
||||
me = self._current_thread()
|
||||
if self._writer == me:
|
||||
return True
|
||||
if check_pending:
|
||||
return me in self._pending_writers
|
||||
else:
|
||||
return False
|
||||
|
||||
@property
|
||||
def owner(self):
|
||||
"""Returns whether the lock is locked by a writer or reader."""
|
||||
with self._cond:
|
||||
# Obtain the lock to ensure we get a accurate view of the actual
|
||||
# owner that isn't likely to change when we are reading it...
|
||||
if self._writer is not None:
|
||||
return self.WRITER
|
||||
if self._readers:
|
||||
return self.READER
|
||||
return None
|
||||
|
||||
def is_reader(self):
|
||||
"""Returns if the caller is one of the readers."""
|
||||
me = self._current_thread()
|
||||
return me in self._readers
|
||||
|
||||
@contextlib.contextmanager
|
||||
def read_lock(self):
|
||||
"""Context manager that grants a read lock.
|
||||
|
||||
Will wait until no active or pending writers.
|
||||
|
||||
Raises a RuntimeError if a pending writer tries to acquire
|
||||
a read lock.
|
||||
"""
|
||||
me = self._current_thread()
|
||||
if me in self._pending_writers:
|
||||
raise RuntimeError("Writer %s can not acquire a read lock"
|
||||
" while waiting for the write lock"
|
||||
% me)
|
||||
with self._cond:
|
||||
while True:
|
||||
# No active writer, or we are the writer;
|
||||
# we are good to become a reader.
|
||||
if self._writer is None or self._writer == me:
|
||||
self._readers.append(me)
|
||||
break
|
||||
# An active writer; guess we have to wait.
|
||||
self._cond.wait()
|
||||
try:
|
||||
yield self
|
||||
finally:
|
||||
# I am no longer a reader, remove *one* occurrence of myself.
|
||||
# If the current thread acquired two read locks, then it will
|
||||
# still have to remove that other read lock; this allows for
|
||||
# basic reentrancy to be possible.
|
||||
with self._cond:
|
||||
self._readers.remove(me)
|
||||
self._cond.notify_all()
|
||||
|
||||
@contextlib.contextmanager
|
||||
def write_lock(self):
|
||||
"""Context manager that grants a write lock.
|
||||
|
||||
Will wait until no active readers. Blocks readers after acquiring.
|
||||
|
||||
Raises a RuntimeError if an active reader attempts to acquire a lock.
|
||||
"""
|
||||
me = self._current_thread()
|
||||
if self.is_reader():
|
||||
raise RuntimeError("Reader %s to writer privilege"
|
||||
" escalation not allowed" % me)
|
||||
if self.is_writer(check_pending=False):
|
||||
# Already the writer; this allows for basic reentrancy.
|
||||
yield self
|
||||
else:
|
||||
with self._cond:
|
||||
self._pending_writers.append(me)
|
||||
while True:
|
||||
# No readers, and no active writer, am I next??
|
||||
if len(self._readers) == 0 and self._writer is None:
|
||||
if self._pending_writers[0] == me:
|
||||
self._writer = self._pending_writers.popleft()
|
||||
break
|
||||
self._cond.wait()
|
||||
try:
|
||||
yield self
|
||||
finally:
|
||||
with self._cond:
|
||||
self._writer = None
|
||||
self._cond.notify_all()
|
||||
|
||||
|
||||
class MultiLock(object):
|
||||
"""A class which attempts to obtain & release many locks at once.
|
||||
|
||||
@@ -411,107 +205,3 @@ class MultiLock(object):
|
||||
# At the end only clear it off, so that under partial failure we don't
|
||||
# lose any locks...
|
||||
self._lock_stacks.pop()
|
||||
|
||||
|
||||
class _InterProcessLock(object):
|
||||
"""An interprocess locking implementation.
|
||||
|
||||
This is a lock implementation which allows multiple locks, working around
|
||||
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
||||
not require any cleanup. Since the lock is always held on a file
|
||||
descriptor rather than outside of the process, the lock gets dropped
|
||||
automatically if the process crashes, even if __exit__ is not executed.
|
||||
|
||||
There are no guarantees regarding usage by multiple green threads in a
|
||||
single process here. This lock works only between processes.
|
||||
|
||||
Note these locks are released when the descriptor is closed, so it's not
|
||||
safe to close the file descriptor while another green thread holds the
|
||||
lock. Just opening and closing the lock file can break synchronisation,
|
||||
so lock files must be accessed only using this abstraction.
|
||||
"""
|
||||
|
||||
def __init__(self, name):
|
||||
self.lockfile = None
|
||||
self.fname = name
|
||||
|
||||
def acquire(self):
|
||||
basedir = os.path.dirname(self.fname)
|
||||
|
||||
if not os.path.exists(basedir):
|
||||
misc.ensure_tree(basedir)
|
||||
LOG.debug('Created lock path: %s', basedir)
|
||||
|
||||
self.lockfile = open(self.fname, 'w')
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Using non-blocking locks since green threads are not
|
||||
# patched to deal with blocking locking calls.
|
||||
# Also upon reading the MSDN docs for locking(), it seems
|
||||
# to have a laughable 10 attempts "blocking" mechanism.
|
||||
self.trylock()
|
||||
LOG.debug('Got file lock "%s"', self.fname)
|
||||
return True
|
||||
except IOError as e:
|
||||
if e.errno in (errno.EACCES, errno.EAGAIN):
|
||||
# external locks synchronise things like iptables
|
||||
# updates - give it some time to prevent busy spinning
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
raise threading.ThreadError("Unable to acquire lock on"
|
||||
" `%(filename)s` due to"
|
||||
" %(exception)s" %
|
||||
{
|
||||
'filename': self.fname,
|
||||
'exception': e,
|
||||
})
|
||||
|
||||
def __enter__(self):
|
||||
self.acquire()
|
||||
return self
|
||||
|
||||
def release(self):
|
||||
try:
|
||||
self.unlock()
|
||||
self.lockfile.close()
|
||||
LOG.debug('Released file lock "%s"', self.fname)
|
||||
except IOError:
|
||||
LOG.exception("Could not release the acquired lock `%s`",
|
||||
self.fname)
|
||||
|
||||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||||
self.release()
|
||||
|
||||
def exists(self):
|
||||
return os.path.exists(self.fname)
|
||||
|
||||
def trylock(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def unlock(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class _WindowsLock(_InterProcessLock):
|
||||
def trylock(self):
|
||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
|
||||
|
||||
def unlock(self):
|
||||
msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
|
||||
|
||||
|
||||
class _PosixLock(_InterProcessLock):
|
||||
def trylock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
def unlock(self):
|
||||
fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
|
||||
|
||||
|
||||
if os.name == 'nt':
|
||||
import msvcrt
|
||||
InterProcessLock = _WindowsLock
|
||||
else:
|
||||
import fcntl
|
||||
InterProcessLock = _PosixLock
|
||||
|
||||
Reference in New Issue
Block a user