diff --git a/requirements.txt b/requirements.txt index b0e0eb22..0b45b8c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,9 @@ six>=1.9.0 # Enum library made for <= python 3.3 enum34 +# For reader/writer + interprocess locks. +fasteners>=0.5 # Apache-2.0 + # Very nice graph library networkx>=1.8 diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py index e71d5b9c..940b9c41 100644 --- a/taskflow/persistence/backends/impl_dir.py +++ b/taskflow/persistence/backends/impl_dir.py @@ -20,11 +20,11 @@ import errno import os import shutil +import fasteners from oslo_serialization import jsonutils from taskflow import exceptions as exc from taskflow.persistence import path_based -from taskflow.utils import lock_utils from taskflow.utils import misc @@ -64,7 +64,7 @@ class DirBackend(path_based.PathBasedBackend): if not self._path: raise ValueError("Empty path is disallowed") self._path = os.path.abspath(self._path) - self.lock = lock_utils.ReaderWriterLock() + self.lock = fasteners.ReaderWriterLock() def get_connection(self): return Connection(self) @@ -97,7 +97,7 @@ class Connection(path_based.PathBasedConnection): @contextlib.contextmanager def _path_lock(self, path): lockfile = self._join_path(path, 'lock') - with lock_utils.InterProcessLock(lockfile) as lock: + with fasteners.InterProcessLock(lockfile) as lock: with _storagefailure_wrapper(): yield lock diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 960ebe0c..0aac58eb 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -20,12 +20,12 @@ import copy import itertools import posixpath as pp +import fasteners import six from taskflow import exceptions as exc from taskflow.persistence import path_based from taskflow.types import tree -from taskflow.utils import lock_utils class FakeInode(tree.Node): @@ -264,7 +264,7 @@ class MemoryBackend(path_based.PathBasedBackend): self._path = pp.sep self.memory = FakeFilesystem(deep_copy=self._conf.get('deep_copy', True)) - self.lock = lock_utils.ReaderWriterLock() + self.lock = fasteners.ReaderWriterLock() def get_connection(self): return Connection(self) diff --git a/taskflow/storage.py b/taskflow/storage.py index d40c5870..7478cdd5 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -16,6 +16,7 @@ import contextlib +import fasteners from oslo_utils import reflection from oslo_utils import uuidutils import six @@ -28,7 +29,6 @@ from taskflow import retry from taskflow import states from taskflow import task from taskflow.types import failure -from taskflow.utils import lock_utils from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -150,7 +150,7 @@ class Storage(object): self._flowdetail = flow_detail self._transients = {} self._injected_args = {} - self._lock = lock_utils.ReaderWriterLock() + self._lock = fasteners.ReaderWriterLock() self._ensure_matchers = [ ((task.BaseTask,), (logbook.TaskDetail, 'Task')), ((retry.Retry,), (logbook.RetryDetail, 'Retry')), @@ -308,46 +308,46 @@ class Storage(object): original_atom_detail.update(conn.update_atom_details(atom_detail)) return original_atom_detail - @lock_utils.read_locked + @fasteners.read_locked def get_atom_uuid(self, atom_name): """Gets an atoms uuid given a atoms name.""" source, _clone = self._atomdetail_by_name(atom_name) return source.uuid - @lock_utils.write_locked + @fasteners.write_locked def set_atom_state(self, atom_name, state): """Sets an atoms state.""" source, clone = self._atomdetail_by_name(atom_name, clone=True) clone.state = state self._with_connection(self._save_atom_detail, source, clone) - @lock_utils.read_locked + @fasteners.read_locked def get_atom_state(self, atom_name): """Gets the state of an atom given an atoms name.""" source, _clone = self._atomdetail_by_name(atom_name) return source.state - @lock_utils.write_locked + @fasteners.write_locked def set_atom_intention(self, atom_name, intention): """Sets the intention of an atom given an atoms name.""" source, clone = self._atomdetail_by_name(atom_name, clone=True) clone.intention = intention self._with_connection(self._save_atom_detail, source, clone) - @lock_utils.read_locked + @fasteners.read_locked def get_atom_intention(self, atom_name): """Gets the intention of an atom given an atoms name.""" source, _clone = self._atomdetail_by_name(atom_name) return source.intention - @lock_utils.read_locked + @fasteners.read_locked def get_atoms_states(self, atom_names): """Gets all atoms states given a set of names.""" return dict((name, (self.get_atom_state(name), self.get_atom_intention(name))) for name in atom_names) - @lock_utils.write_locked + @fasteners.write_locked def _update_atom_metadata(self, atom_name, update_with, expected_type=None): source, clone = self._atomdetail_by_name(atom_name, @@ -391,7 +391,7 @@ class Storage(object): self._update_atom_metadata(task_name, update_with, expected_type=logbook.TaskDetail) - @lock_utils.read_locked + @fasteners.read_locked def get_task_progress(self, task_name): """Get the progress of a task given a tasks name. @@ -405,7 +405,7 @@ class Storage(object): except KeyError: return 0.0 - @lock_utils.read_locked + @fasteners.read_locked def get_task_progress_details(self, task_name): """Get the progress details of a task given a tasks name. @@ -437,7 +437,7 @@ class Storage(object): LOG.warning("Atom %s did not supply result " "with index %r (name %s)", atom_name, index, name) - @lock_utils.write_locked + @fasteners.write_locked def save(self, atom_name, data, state=states.SUCCESS): """Put result for atom with id 'uuid' to storage.""" source, clone = self._atomdetail_by_name(atom_name, clone=True) @@ -451,7 +451,7 @@ class Storage(object): else: self._check_all_results_provided(result.name, data) - @lock_utils.write_locked + @fasteners.write_locked def save_retry_failure(self, retry_name, failed_atom_name, failure): """Save subflow failure to retry controller history.""" source, clone = self._atomdetail_by_name( @@ -467,7 +467,7 @@ class Storage(object): failures[failed_atom_name] = failure self._with_connection(self._save_atom_detail, source, clone) - @lock_utils.write_locked + @fasteners.write_locked def cleanup_retry_history(self, retry_name, state): """Cleanup history of retry atom with given name.""" source, clone = self._atomdetail_by_name( @@ -476,7 +476,7 @@ class Storage(object): clone.results = [] self._with_connection(self._save_atom_detail, source, clone) - @lock_utils.read_locked + @fasteners.read_locked def _get(self, atom_name, only_last=False): source, _clone = self._atomdetail_by_name(atom_name) if source.failure is not None: @@ -499,7 +499,7 @@ class Storage(object): """Gets the results for an atom with a given name from storage.""" return self._get(atom_name) - @lock_utils.read_locked + @fasteners.read_locked def get_failures(self): """Get list of failures that happened with this flow. @@ -511,7 +511,7 @@ class Storage(object): """Returns True if there are failed tasks in the storage.""" return bool(self._failures) - @lock_utils.write_locked + @fasteners.write_locked def reset(self, atom_name, state=states.PENDING): """Reset atom with given name (if the atom is not in a given state).""" if atom_name == self.injector_name: @@ -579,7 +579,7 @@ class Storage(object): else: save_persistent() - @lock_utils.write_locked + @fasteners.write_locked def inject(self, pairs, transient=False): """Add values into storage. @@ -656,7 +656,7 @@ class Storage(object): if provider not in entries: entries.append(provider) - @lock_utils.read_locked + @fasteners.read_locked def fetch(self, name, many_handler=None): """Fetch a named result.""" def _many_handler(values): @@ -691,7 +691,7 @@ class Storage(object): else: return many_handler(values) - @lock_utils.read_locked + @fasteners.read_locked def fetch_unsatisfied_args(self, atom_name, args_mapping, scope_walker=None, optional_args=None): """Fetch unsatisfied atom arguments using an atoms argument mapping. @@ -777,7 +777,7 @@ class Storage(object): missing.discard(bound_name) return missing - @lock_utils.read_locked + @fasteners.read_locked def fetch_all(self, many_handler=None): """Fetch all named results known so far.""" def _many_handler(values): @@ -794,7 +794,7 @@ class Storage(object): pass return results - @lock_utils.read_locked + @fasteners.read_locked def fetch_mapped_args(self, args_mapping, atom_name=None, scope_walker=None, optional_args=None): @@ -908,14 +908,14 @@ class Storage(object): bound_name, name, value, provider) return mapped_args - @lock_utils.write_locked + @fasteners.write_locked def set_flow_state(self, state): """Set flow details state and save it.""" source, clone = self._fetch_flowdetail(clone=True) clone.state = state self._with_connection(self._save_flow_detail, source, clone) - @lock_utils.write_locked + @fasteners.write_locked def update_flow_metadata(self, update_with): """Update flowdetails metadata and save it.""" if update_with: @@ -923,7 +923,7 @@ class Storage(object): clone.meta.update(update_with) self._with_connection(self._save_flow_detail, source, clone) - @lock_utils.read_locked + @fasteners.read_locked def get_flow_state(self): """Get state from flow details.""" source = self._flowdetail @@ -945,14 +945,14 @@ class Storage(object): failure = ad.failure return retry.History(ad.results, failure=failure) - @lock_utils.read_locked + @fasteners.read_locked def get_retry_history(self, retry_name): """Fetch a single retrys history.""" source, _clone = self._atomdetail_by_name( retry_name, expected_type=logbook.RetryDetail) return self._translate_into_history(source) - @lock_utils.read_locked + @fasteners.read_locked def get_retry_histories(self): """Fetch all retrys histories.""" histories = [] diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py index 3d40ed24..0c8213e9 100644 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ b/taskflow/tests/unit/test_utils_lock_utils.py @@ -15,16 +15,11 @@ # under the License. import collections -import random import threading import time -from concurrent import futures - from taskflow import test from taskflow.test import mock -from taskflow.tests import utils as test_utils -from taskflow.types import timing from taskflow.utils import lock_utils from taskflow.utils import misc from taskflow.utils import threading_utils @@ -50,51 +45,6 @@ def _find_overlaps(times, start, end): return overlaps -def _spawn_variation(readers, writers, max_workers=None): - start_stops = collections.deque() - lock = lock_utils.ReaderWriterLock() - - def read_func(ident): - with lock.read_lock(): - # TODO(harlowja): sometime in the future use a monotonic clock here - # to avoid problems that can be caused by ntpd resyncing the clock - # while we are actively running. - enter_time = now() - time.sleep(WORK_TIMES[ident % len(WORK_TIMES)]) - exit_time = now() - start_stops.append((lock.READER, enter_time, exit_time)) - time.sleep(NAPPY_TIME) - - def write_func(ident): - with lock.write_lock(): - enter_time = now() - time.sleep(WORK_TIMES[ident % len(WORK_TIMES)]) - exit_time = now() - start_stops.append((lock.WRITER, enter_time, exit_time)) - time.sleep(NAPPY_TIME) - - if max_workers is None: - max_workers = max(0, readers) + max(0, writers) - if max_workers > 0: - with futures.ThreadPoolExecutor(max_workers=max_workers) as e: - count = 0 - for _i in range(0, readers): - e.submit(read_func, count) - count += 1 - for _i in range(0, writers): - e.submit(write_func, count) - count += 1 - - writer_times = [] - reader_times = [] - for (lock_type, start, stop) in list(start_stops): - if lock_type == lock.WRITER: - writer_times.append((start, stop)) - else: - reader_times.append((start, stop)) - return (writer_times, reader_times) - - class MultilockTest(test.TestCase): THREAD_COUNT = 20 @@ -329,306 +279,3 @@ class MultilockTest(test.TestCase): lock2 = threading.Lock() n_lock = lock_utils.MultiLock((lock1, lock2)) self.assertRaises(threading.ThreadError, n_lock.release) - - -class ReadWriteLockTest(test.TestCase): - THREAD_COUNT = 20 - - def test_no_double_writers(self): - lock = lock_utils.ReaderWriterLock() - watch = timing.StopWatch(duration=5) - watch.start() - dups = collections.deque() - active = collections.deque() - - def acquire_check(me): - with lock.write_lock(): - if len(active) >= 1: - dups.append(me) - dups.extend(active) - active.append(me) - try: - time.sleep(random.random() / 100) - finally: - active.remove(me) - - def run(): - me = threading.current_thread() - while not watch.expired(): - acquire_check(me) - - threads = [] - for i in range(0, self.THREAD_COUNT): - t = threading_utils.daemon_thread(run) - threads.append(t) - t.start() - while threads: - t = threads.pop() - t.join() - - self.assertEqual([], list(dups)) - self.assertEqual([], list(active)) - - def test_no_concurrent_readers_writers(self): - lock = lock_utils.ReaderWriterLock() - watch = timing.StopWatch(duration=5) - watch.start() - dups = collections.deque() - active = collections.deque() - - def acquire_check(me, reader): - if reader: - lock_func = lock.read_lock - else: - lock_func = lock.write_lock - with lock_func(): - if not reader: - # There should be no-one else currently active, if there - # is ensure we capture them so that we can later blow-up - # the test. - if len(active) >= 1: - dups.append(me) - dups.extend(active) - active.append(me) - try: - time.sleep(random.random() / 100) - finally: - active.remove(me) - - def run(): - me = threading.current_thread() - while not watch.expired(): - acquire_check(me, random.choice([True, False])) - - threads = [] - for i in range(0, self.THREAD_COUNT): - t = threading_utils.daemon_thread(run) - threads.append(t) - t.start() - while threads: - t = threads.pop() - t.join() - - self.assertEqual([], list(dups)) - self.assertEqual([], list(active)) - - def test_writer_abort(self): - lock = lock_utils.ReaderWriterLock() - self.assertFalse(lock.owner) - - def blow_up(): - with lock.write_lock(): - self.assertEqual(lock.WRITER, lock.owner) - raise RuntimeError("Broken") - - self.assertRaises(RuntimeError, blow_up) - self.assertFalse(lock.owner) - - def test_reader_abort(self): - lock = lock_utils.ReaderWriterLock() - self.assertFalse(lock.owner) - - def blow_up(): - with lock.read_lock(): - self.assertEqual(lock.READER, lock.owner) - raise RuntimeError("Broken") - - self.assertRaises(RuntimeError, blow_up) - self.assertFalse(lock.owner) - - def test_double_reader_abort(self): - lock = lock_utils.ReaderWriterLock() - activated = collections.deque() - - def double_bad_reader(): - with lock.read_lock(): - with lock.read_lock(): - raise RuntimeError("Broken") - - def happy_writer(): - with lock.write_lock(): - activated.append(lock.owner) - - with futures.ThreadPoolExecutor(max_workers=20) as e: - for i in range(0, 20): - if i % 2 == 0: - e.submit(double_bad_reader) - else: - e.submit(happy_writer) - - self.assertEqual(10, len([a for a in activated if a == 'w'])) - - def test_double_reader_writer(self): - lock = lock_utils.ReaderWriterLock() - activated = collections.deque() - active = threading_utils.Event() - - def double_reader(): - with lock.read_lock(): - active.set() - while not lock.has_pending_writers: - time.sleep(0.001) - with lock.read_lock(): - activated.append(lock.owner) - - def happy_writer(): - with lock.write_lock(): - activated.append(lock.owner) - - reader = threading_utils.daemon_thread(double_reader) - reader.start() - self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT)) - - writer = threading_utils.daemon_thread(happy_writer) - writer.start() - - reader.join() - writer.join() - self.assertEqual(2, len(activated)) - self.assertEqual(['r', 'w'], list(activated)) - - def test_reader_chaotic(self): - lock = lock_utils.ReaderWriterLock() - activated = collections.deque() - - def chaotic_reader(blow_up): - with lock.read_lock(): - if blow_up: - raise RuntimeError("Broken") - else: - activated.append(lock.owner) - - def happy_writer(): - with lock.write_lock(): - activated.append(lock.owner) - - with futures.ThreadPoolExecutor(max_workers=20) as e: - for i in range(0, 20): - if i % 2 == 0: - e.submit(chaotic_reader, blow_up=bool(i % 4 == 0)) - else: - e.submit(happy_writer) - - writers = [a for a in activated if a == 'w'] - readers = [a for a in activated if a == 'r'] - self.assertEqual(10, len(writers)) - self.assertEqual(5, len(readers)) - - def test_writer_chaotic(self): - lock = lock_utils.ReaderWriterLock() - activated = collections.deque() - - def chaotic_writer(blow_up): - with lock.write_lock(): - if blow_up: - raise RuntimeError("Broken") - else: - activated.append(lock.owner) - - def happy_reader(): - with lock.read_lock(): - activated.append(lock.owner) - - with futures.ThreadPoolExecutor(max_workers=20) as e: - for i in range(0, 20): - if i % 2 == 0: - e.submit(chaotic_writer, blow_up=bool(i % 4 == 0)) - else: - e.submit(happy_reader) - - writers = [a for a in activated if a == 'w'] - readers = [a for a in activated if a == 'r'] - self.assertEqual(5, len(writers)) - self.assertEqual(10, len(readers)) - - def test_single_reader_writer(self): - results = [] - lock = lock_utils.ReaderWriterLock() - with lock.read_lock(): - self.assertTrue(lock.is_reader()) - self.assertEqual(0, len(results)) - with lock.write_lock(): - results.append(1) - self.assertTrue(lock.is_writer()) - with lock.read_lock(): - self.assertTrue(lock.is_reader()) - self.assertEqual(1, len(results)) - self.assertFalse(lock.is_reader()) - self.assertFalse(lock.is_writer()) - - def test_reader_to_writer(self): - lock = lock_utils.ReaderWriterLock() - - def writer_func(): - with lock.write_lock(): - pass - - with lock.read_lock(): - self.assertRaises(RuntimeError, writer_func) - self.assertFalse(lock.is_writer()) - - self.assertFalse(lock.is_reader()) - self.assertFalse(lock.is_writer()) - - def test_writer_to_reader(self): - lock = lock_utils.ReaderWriterLock() - - def reader_func(): - with lock.read_lock(): - self.assertTrue(lock.is_writer()) - self.assertTrue(lock.is_reader()) - - with lock.write_lock(): - self.assertIsNone(reader_func()) - self.assertFalse(lock.is_reader()) - - self.assertFalse(lock.is_reader()) - self.assertFalse(lock.is_writer()) - - def test_double_writer(self): - lock = lock_utils.ReaderWriterLock() - with lock.write_lock(): - self.assertFalse(lock.is_reader()) - self.assertTrue(lock.is_writer()) - with lock.write_lock(): - self.assertTrue(lock.is_writer()) - self.assertTrue(lock.is_writer()) - - self.assertFalse(lock.is_reader()) - self.assertFalse(lock.is_writer()) - - def test_double_reader(self): - lock = lock_utils.ReaderWriterLock() - with lock.read_lock(): - self.assertTrue(lock.is_reader()) - self.assertFalse(lock.is_writer()) - with lock.read_lock(): - self.assertTrue(lock.is_reader()) - self.assertTrue(lock.is_reader()) - - self.assertFalse(lock.is_reader()) - self.assertFalse(lock.is_writer()) - - def test_multi_reader_multi_writer(self): - writer_times, reader_times = _spawn_variation(10, 10) - self.assertEqual(10, len(writer_times)) - self.assertEqual(10, len(reader_times)) - for (start, stop) in writer_times: - self.assertEqual(0, _find_overlaps(reader_times, start, stop)) - self.assertEqual(1, _find_overlaps(writer_times, start, stop)) - for (start, stop) in reader_times: - self.assertEqual(0, _find_overlaps(writer_times, start, stop)) - - def test_multi_reader_single_writer(self): - writer_times, reader_times = _spawn_variation(9, 1) - self.assertEqual(1, len(writer_times)) - self.assertEqual(9, len(reader_times)) - start, stop = writer_times[0] - self.assertEqual(0, _find_overlaps(reader_times, start, stop)) - - def test_multi_writer(self): - writer_times, reader_times = _spawn_variation(0, 10) - self.assertEqual(10, len(writer_times)) - self.assertEqual(0, len(reader_times)) - for (start, stop) in writer_times: - self.assertEqual(1, _find_overlaps(writer_times, start, stop)) diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py index 58945c0f..7b1b026f 100644 --- a/taskflow/utils/lock_utils.py +++ b/taskflow/utils/lock_utils.py @@ -19,23 +19,14 @@ # pulls in oslo.cfg) and is reduced to only what taskflow currently wants to # use from that code. -import collections import contextlib -import errno -import os import threading -import time -from oslo_utils import importutils import six from taskflow import logging from taskflow.utils import misc -# Used for the reader-writer lock get the right thread 'hack' (needed below). -eventlet = importutils.try_import('eventlet') -eventlet_patcher = importutils.try_import('eventlet.patcher') - LOG = logging.getLogger(__name__) @@ -96,203 +87,6 @@ def locked(*args, **kwargs): return decorator -def read_locked(*args, **kwargs): - """Acquires & releases a read lock around call into decorated method. - - NOTE(harlowja): if no attribute name is provided then by default the - attribute named '_lock' is looked for (this attribute is expected to be - the rw-lock object) in the instance object this decorator is attached to. - """ - - def decorator(f): - attr_name = kwargs.get('lock', '_lock') - - @six.wraps(f) - def wrapper(self, *args, **kwargs): - rw_lock = getattr(self, attr_name) - with rw_lock.read_lock(): - return f(self, *args, **kwargs) - - return wrapper - - # This is needed to handle when the decorator has args or the decorator - # doesn't have args, python is rather weird here... - if kwargs or not args: - return decorator - else: - if len(args) == 1: - return decorator(args[0]) - else: - return decorator - - -def write_locked(*args, **kwargs): - """Acquires & releases a write lock around call into decorated method. - - NOTE(harlowja): if no attribute name is provided then by default the - attribute named '_lock' is looked for (this attribute is expected to be - the rw-lock object) in the instance object this decorator is attached to. - """ - - def decorator(f): - attr_name = kwargs.get('lock', '_lock') - - @six.wraps(f) - def wrapper(self, *args, **kwargs): - rw_lock = getattr(self, attr_name) - with rw_lock.write_lock(): - return f(self, *args, **kwargs) - - return wrapper - - # This is needed to handle when the decorator has args or the decorator - # doesn't have args, python is rather weird here... - if kwargs or not args: - return decorator - else: - if len(args) == 1: - return decorator(args[0]) - else: - return decorator - - -class ReaderWriterLock(object): - """A reader/writer lock. - - This lock allows for simultaneous readers to exist but only one writer - to exist for use-cases where it is useful to have such types of locks. - - Currently a reader can not escalate its read lock to a write lock and - a writer can not acquire a read lock while it is waiting on the write - lock. - - In the future these restrictions may be relaxed. - - This can be eventually removed if http://bugs.python.org/issue8800 ever - gets accepted into the python standard threading library... - """ - WRITER = 'w' - READER = 'r' - - @staticmethod - def _fetch_current_thread_functor(): - # Until https://github.com/eventlet/eventlet/issues/172 is resolved - # or addressed we have to use complicated workaround to get a object - # that will not be recycled; the usage of threading.current_thread() - # doesn't appear to currently be monkey patched and therefore isn't - # reliable to use (and breaks badly when used as all threads share - # the same current_thread() object)... - if eventlet is not None and eventlet_patcher is not None: - if eventlet_patcher.is_monkey_patched('thread'): - return lambda: eventlet.getcurrent() - return lambda: threading.current_thread() - - def __init__(self): - self._writer = None - self._pending_writers = collections.deque() - self._readers = collections.deque() - self._cond = threading.Condition() - self._current_thread = self._fetch_current_thread_functor() - - @property - def has_pending_writers(self): - """Returns if there are writers waiting to become the *one* writer.""" - return bool(self._pending_writers) - - def is_writer(self, check_pending=True): - """Returns if the caller is the active writer or a pending writer.""" - me = self._current_thread() - if self._writer == me: - return True - if check_pending: - return me in self._pending_writers - else: - return False - - @property - def owner(self): - """Returns whether the lock is locked by a writer or reader.""" - with self._cond: - # Obtain the lock to ensure we get a accurate view of the actual - # owner that isn't likely to change when we are reading it... - if self._writer is not None: - return self.WRITER - if self._readers: - return self.READER - return None - - def is_reader(self): - """Returns if the caller is one of the readers.""" - me = self._current_thread() - return me in self._readers - - @contextlib.contextmanager - def read_lock(self): - """Context manager that grants a read lock. - - Will wait until no active or pending writers. - - Raises a RuntimeError if a pending writer tries to acquire - a read lock. - """ - me = self._current_thread() - if me in self._pending_writers: - raise RuntimeError("Writer %s can not acquire a read lock" - " while waiting for the write lock" - % me) - with self._cond: - while True: - # No active writer, or we are the writer; - # we are good to become a reader. - if self._writer is None or self._writer == me: - self._readers.append(me) - break - # An active writer; guess we have to wait. - self._cond.wait() - try: - yield self - finally: - # I am no longer a reader, remove *one* occurrence of myself. - # If the current thread acquired two read locks, then it will - # still have to remove that other read lock; this allows for - # basic reentrancy to be possible. - with self._cond: - self._readers.remove(me) - self._cond.notify_all() - - @contextlib.contextmanager - def write_lock(self): - """Context manager that grants a write lock. - - Will wait until no active readers. Blocks readers after acquiring. - - Raises a RuntimeError if an active reader attempts to acquire a lock. - """ - me = self._current_thread() - if self.is_reader(): - raise RuntimeError("Reader %s to writer privilege" - " escalation not allowed" % me) - if self.is_writer(check_pending=False): - # Already the writer; this allows for basic reentrancy. - yield self - else: - with self._cond: - self._pending_writers.append(me) - while True: - # No readers, and no active writer, am I next?? - if len(self._readers) == 0 and self._writer is None: - if self._pending_writers[0] == me: - self._writer = self._pending_writers.popleft() - break - self._cond.wait() - try: - yield self - finally: - with self._cond: - self._writer = None - self._cond.notify_all() - - class MultiLock(object): """A class which attempts to obtain & release many locks at once. @@ -411,107 +205,3 @@ class MultiLock(object): # At the end only clear it off, so that under partial failure we don't # lose any locks... self._lock_stacks.pop() - - -class _InterProcessLock(object): - """An interprocess locking implementation. - - This is a lock implementation which allows multiple locks, working around - issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does - not require any cleanup. Since the lock is always held on a file - descriptor rather than outside of the process, the lock gets dropped - automatically if the process crashes, even if __exit__ is not executed. - - There are no guarantees regarding usage by multiple green threads in a - single process here. This lock works only between processes. - - Note these locks are released when the descriptor is closed, so it's not - safe to close the file descriptor while another green thread holds the - lock. Just opening and closing the lock file can break synchronisation, - so lock files must be accessed only using this abstraction. - """ - - def __init__(self, name): - self.lockfile = None - self.fname = name - - def acquire(self): - basedir = os.path.dirname(self.fname) - - if not os.path.exists(basedir): - misc.ensure_tree(basedir) - LOG.debug('Created lock path: %s', basedir) - - self.lockfile = open(self.fname, 'w') - - while True: - try: - # Using non-blocking locks since green threads are not - # patched to deal with blocking locking calls. - # Also upon reading the MSDN docs for locking(), it seems - # to have a laughable 10 attempts "blocking" mechanism. - self.trylock() - LOG.debug('Got file lock "%s"', self.fname) - return True - except IOError as e: - if e.errno in (errno.EACCES, errno.EAGAIN): - # external locks synchronise things like iptables - # updates - give it some time to prevent busy spinning - time.sleep(0.01) - else: - raise threading.ThreadError("Unable to acquire lock on" - " `%(filename)s` due to" - " %(exception)s" % - { - 'filename': self.fname, - 'exception': e, - }) - - def __enter__(self): - self.acquire() - return self - - def release(self): - try: - self.unlock() - self.lockfile.close() - LOG.debug('Released file lock "%s"', self.fname) - except IOError: - LOG.exception("Could not release the acquired lock `%s`", - self.fname) - - def __exit__(self, exc_type, exc_val, exc_tb): - self.release() - - def exists(self): - return os.path.exists(self.fname) - - def trylock(self): - raise NotImplementedError() - - def unlock(self): - raise NotImplementedError() - - -class _WindowsLock(_InterProcessLock): - def trylock(self): - msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1) - - def unlock(self): - msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1) - - -class _PosixLock(_InterProcessLock): - def trylock(self): - fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) - - def unlock(self): - fcntl.lockf(self.lockfile, fcntl.LOCK_UN) - - -if os.name == 'nt': - import msvcrt - InterProcessLock = _WindowsLock -else: - import fcntl - InterProcessLock = _PosixLock