Merge "Replace lock_utils lock(s) with fasteners package"
This commit is contained in:
		@@ -16,6 +16,9 @@ six>=1.9.0
 | 
			
		||||
# Enum library made for <= python 3.3
 | 
			
		||||
enum34
 | 
			
		||||
 | 
			
		||||
# For reader/writer + interprocess locks.
 | 
			
		||||
fasteners>=0.5 # Apache-2.0
 | 
			
		||||
 | 
			
		||||
# Very nice graph library
 | 
			
		||||
networkx>=1.8
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -20,11 +20,11 @@ import errno
 | 
			
		||||
import os
 | 
			
		||||
import shutil
 | 
			
		||||
 | 
			
		||||
import fasteners
 | 
			
		||||
from oslo_serialization import jsonutils
 | 
			
		||||
 | 
			
		||||
from taskflow import exceptions as exc
 | 
			
		||||
from taskflow.persistence import path_based
 | 
			
		||||
from taskflow.utils import lock_utils
 | 
			
		||||
from taskflow.utils import misc
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -64,7 +64,7 @@ class DirBackend(path_based.PathBasedBackend):
 | 
			
		||||
        if not self._path:
 | 
			
		||||
            raise ValueError("Empty path is disallowed")
 | 
			
		||||
        self._path = os.path.abspath(self._path)
 | 
			
		||||
        self.lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        self.lock = fasteners.ReaderWriterLock()
 | 
			
		||||
 | 
			
		||||
    def get_connection(self):
 | 
			
		||||
        return Connection(self)
 | 
			
		||||
@@ -97,7 +97,7 @@ class Connection(path_based.PathBasedConnection):
 | 
			
		||||
    @contextlib.contextmanager
 | 
			
		||||
    def _path_lock(self, path):
 | 
			
		||||
        lockfile = self._join_path(path, 'lock')
 | 
			
		||||
        with lock_utils.InterProcessLock(lockfile) as lock:
 | 
			
		||||
        with fasteners.InterProcessLock(lockfile) as lock:
 | 
			
		||||
            with _storagefailure_wrapper():
 | 
			
		||||
                yield lock
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -20,12 +20,12 @@ import copy
 | 
			
		||||
import itertools
 | 
			
		||||
import posixpath as pp
 | 
			
		||||
 | 
			
		||||
import fasteners
 | 
			
		||||
import six
 | 
			
		||||
 | 
			
		||||
from taskflow import exceptions as exc
 | 
			
		||||
from taskflow.persistence import path_based
 | 
			
		||||
from taskflow.types import tree
 | 
			
		||||
from taskflow.utils import lock_utils
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class FakeInode(tree.Node):
 | 
			
		||||
@@ -264,7 +264,7 @@ class MemoryBackend(path_based.PathBasedBackend):
 | 
			
		||||
            self._path = pp.sep
 | 
			
		||||
        self.memory = FakeFilesystem(deep_copy=self._conf.get('deep_copy',
 | 
			
		||||
                                                              True))
 | 
			
		||||
        self.lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        self.lock = fasteners.ReaderWriterLock()
 | 
			
		||||
 | 
			
		||||
    def get_connection(self):
 | 
			
		||||
        return Connection(self)
 | 
			
		||||
 
 | 
			
		||||
@@ -16,6 +16,7 @@
 | 
			
		||||
 | 
			
		||||
import contextlib
 | 
			
		||||
 | 
			
		||||
import fasteners
 | 
			
		||||
from oslo_utils import reflection
 | 
			
		||||
from oslo_utils import uuidutils
 | 
			
		||||
import six
 | 
			
		||||
@@ -28,7 +29,6 @@ from taskflow import retry
 | 
			
		||||
from taskflow import states
 | 
			
		||||
from taskflow import task
 | 
			
		||||
from taskflow.types import failure
 | 
			
		||||
from taskflow.utils import lock_utils
 | 
			
		||||
from taskflow.utils import misc
 | 
			
		||||
 | 
			
		||||
LOG = logging.getLogger(__name__)
 | 
			
		||||
@@ -150,7 +150,7 @@ class Storage(object):
 | 
			
		||||
        self._flowdetail = flow_detail
 | 
			
		||||
        self._transients = {}
 | 
			
		||||
        self._injected_args = {}
 | 
			
		||||
        self._lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        self._lock = fasteners.ReaderWriterLock()
 | 
			
		||||
        self._ensure_matchers = [
 | 
			
		||||
            ((task.BaseTask,), (logbook.TaskDetail, 'Task')),
 | 
			
		||||
            ((retry.Retry,), (logbook.RetryDetail, 'Retry')),
 | 
			
		||||
@@ -308,46 +308,46 @@ class Storage(object):
 | 
			
		||||
        original_atom_detail.update(conn.update_atom_details(atom_detail))
 | 
			
		||||
        return original_atom_detail
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_atom_uuid(self, atom_name):
 | 
			
		||||
        """Gets an atoms uuid given a atoms name."""
 | 
			
		||||
        source, _clone = self._atomdetail_by_name(atom_name)
 | 
			
		||||
        return source.uuid
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def set_atom_state(self, atom_name, state):
 | 
			
		||||
        """Sets an atoms state."""
 | 
			
		||||
        source, clone = self._atomdetail_by_name(atom_name, clone=True)
 | 
			
		||||
        clone.state = state
 | 
			
		||||
        self._with_connection(self._save_atom_detail, source, clone)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_atom_state(self, atom_name):
 | 
			
		||||
        """Gets the state of an atom given an atoms name."""
 | 
			
		||||
        source, _clone = self._atomdetail_by_name(atom_name)
 | 
			
		||||
        return source.state
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def set_atom_intention(self, atom_name, intention):
 | 
			
		||||
        """Sets the intention of an atom given an atoms name."""
 | 
			
		||||
        source, clone = self._atomdetail_by_name(atom_name, clone=True)
 | 
			
		||||
        clone.intention = intention
 | 
			
		||||
        self._with_connection(self._save_atom_detail, source, clone)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_atom_intention(self, atom_name):
 | 
			
		||||
        """Gets the intention of an atom given an atoms name."""
 | 
			
		||||
        source, _clone = self._atomdetail_by_name(atom_name)
 | 
			
		||||
        return source.intention
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_atoms_states(self, atom_names):
 | 
			
		||||
        """Gets all atoms states given a set of names."""
 | 
			
		||||
        return dict((name, (self.get_atom_state(name),
 | 
			
		||||
                            self.get_atom_intention(name)))
 | 
			
		||||
                    for name in atom_names)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def _update_atom_metadata(self, atom_name, update_with,
 | 
			
		||||
                              expected_type=None):
 | 
			
		||||
        source, clone = self._atomdetail_by_name(atom_name,
 | 
			
		||||
@@ -391,7 +391,7 @@ class Storage(object):
 | 
			
		||||
        self._update_atom_metadata(task_name, update_with,
 | 
			
		||||
                                   expected_type=logbook.TaskDetail)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_task_progress(self, task_name):
 | 
			
		||||
        """Get the progress of a task given a tasks name.
 | 
			
		||||
 | 
			
		||||
@@ -405,7 +405,7 @@ class Storage(object):
 | 
			
		||||
        except KeyError:
 | 
			
		||||
            return 0.0
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_task_progress_details(self, task_name):
 | 
			
		||||
        """Get the progress details of a task given a tasks name.
 | 
			
		||||
 | 
			
		||||
@@ -437,7 +437,7 @@ class Storage(object):
 | 
			
		||||
                LOG.warning("Atom %s did not supply result "
 | 
			
		||||
                            "with index %r (name %s)", atom_name, index, name)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def save(self, atom_name, data, state=states.SUCCESS):
 | 
			
		||||
        """Put result for atom with id 'uuid' to storage."""
 | 
			
		||||
        source, clone = self._atomdetail_by_name(atom_name, clone=True)
 | 
			
		||||
@@ -451,7 +451,7 @@ class Storage(object):
 | 
			
		||||
        else:
 | 
			
		||||
            self._check_all_results_provided(result.name, data)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def save_retry_failure(self, retry_name, failed_atom_name, failure):
 | 
			
		||||
        """Save subflow failure to retry controller history."""
 | 
			
		||||
        source, clone = self._atomdetail_by_name(
 | 
			
		||||
@@ -467,7 +467,7 @@ class Storage(object):
 | 
			
		||||
                failures[failed_atom_name] = failure
 | 
			
		||||
                self._with_connection(self._save_atom_detail, source, clone)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def cleanup_retry_history(self, retry_name, state):
 | 
			
		||||
        """Cleanup history of retry atom with given name."""
 | 
			
		||||
        source, clone = self._atomdetail_by_name(
 | 
			
		||||
@@ -476,7 +476,7 @@ class Storage(object):
 | 
			
		||||
        clone.results = []
 | 
			
		||||
        self._with_connection(self._save_atom_detail, source, clone)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def _get(self, atom_name, only_last=False):
 | 
			
		||||
        source, _clone = self._atomdetail_by_name(atom_name)
 | 
			
		||||
        if source.failure is not None:
 | 
			
		||||
@@ -499,7 +499,7 @@ class Storage(object):
 | 
			
		||||
        """Gets the results for an atom with a given name from storage."""
 | 
			
		||||
        return self._get(atom_name)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_failures(self):
 | 
			
		||||
        """Get list of failures that happened with this flow.
 | 
			
		||||
 | 
			
		||||
@@ -511,7 +511,7 @@ class Storage(object):
 | 
			
		||||
        """Returns True if there are failed tasks in the storage."""
 | 
			
		||||
        return bool(self._failures)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def reset(self, atom_name, state=states.PENDING):
 | 
			
		||||
        """Reset atom with given name (if the atom is not in a given state)."""
 | 
			
		||||
        if atom_name == self.injector_name:
 | 
			
		||||
@@ -579,7 +579,7 @@ class Storage(object):
 | 
			
		||||
            else:
 | 
			
		||||
                save_persistent()
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def inject(self, pairs, transient=False):
 | 
			
		||||
        """Add values into storage.
 | 
			
		||||
 | 
			
		||||
@@ -656,7 +656,7 @@ class Storage(object):
 | 
			
		||||
                if provider not in entries:
 | 
			
		||||
                    entries.append(provider)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def fetch(self, name, many_handler=None):
 | 
			
		||||
        """Fetch a named result."""
 | 
			
		||||
        def _many_handler(values):
 | 
			
		||||
@@ -691,7 +691,7 @@ class Storage(object):
 | 
			
		||||
        else:
 | 
			
		||||
            return many_handler(values)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def fetch_unsatisfied_args(self, atom_name, args_mapping,
 | 
			
		||||
                               scope_walker=None, optional_args=None):
 | 
			
		||||
        """Fetch unsatisfied atom arguments using an atoms argument mapping.
 | 
			
		||||
@@ -777,7 +777,7 @@ class Storage(object):
 | 
			
		||||
                missing.discard(bound_name)
 | 
			
		||||
        return missing
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def fetch_all(self, many_handler=None):
 | 
			
		||||
        """Fetch all named results known so far."""
 | 
			
		||||
        def _many_handler(values):
 | 
			
		||||
@@ -794,7 +794,7 @@ class Storage(object):
 | 
			
		||||
                pass
 | 
			
		||||
        return results
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def fetch_mapped_args(self, args_mapping,
 | 
			
		||||
                          atom_name=None, scope_walker=None,
 | 
			
		||||
                          optional_args=None):
 | 
			
		||||
@@ -908,14 +908,14 @@ class Storage(object):
 | 
			
		||||
                            bound_name, name, value, provider)
 | 
			
		||||
        return mapped_args
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def set_flow_state(self, state):
 | 
			
		||||
        """Set flow details state and save it."""
 | 
			
		||||
        source, clone = self._fetch_flowdetail(clone=True)
 | 
			
		||||
        clone.state = state
 | 
			
		||||
        self._with_connection(self._save_flow_detail, source, clone)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.write_locked
 | 
			
		||||
    @fasteners.write_locked
 | 
			
		||||
    def update_flow_metadata(self, update_with):
 | 
			
		||||
        """Update flowdetails metadata and save it."""
 | 
			
		||||
        if update_with:
 | 
			
		||||
@@ -923,7 +923,7 @@ class Storage(object):
 | 
			
		||||
            clone.meta.update(update_with)
 | 
			
		||||
            self._with_connection(self._save_flow_detail, source, clone)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_flow_state(self):
 | 
			
		||||
        """Get state from flow details."""
 | 
			
		||||
        source = self._flowdetail
 | 
			
		||||
@@ -945,14 +945,14 @@ class Storage(object):
 | 
			
		||||
                failure = ad.failure
 | 
			
		||||
        return retry.History(ad.results, failure=failure)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_retry_history(self, retry_name):
 | 
			
		||||
        """Fetch a single retrys history."""
 | 
			
		||||
        source, _clone = self._atomdetail_by_name(
 | 
			
		||||
            retry_name, expected_type=logbook.RetryDetail)
 | 
			
		||||
        return self._translate_into_history(source)
 | 
			
		||||
 | 
			
		||||
    @lock_utils.read_locked
 | 
			
		||||
    @fasteners.read_locked
 | 
			
		||||
    def get_retry_histories(self):
 | 
			
		||||
        """Fetch all retrys histories."""
 | 
			
		||||
        histories = []
 | 
			
		||||
 
 | 
			
		||||
@@ -15,16 +15,11 @@
 | 
			
		||||
#    under the License.
 | 
			
		||||
 | 
			
		||||
import collections
 | 
			
		||||
import random
 | 
			
		||||
import threading
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from concurrent import futures
 | 
			
		||||
 | 
			
		||||
from taskflow import test
 | 
			
		||||
from taskflow.test import mock
 | 
			
		||||
from taskflow.tests import utils as test_utils
 | 
			
		||||
from taskflow.types import timing
 | 
			
		||||
from taskflow.utils import lock_utils
 | 
			
		||||
from taskflow.utils import misc
 | 
			
		||||
from taskflow.utils import threading_utils
 | 
			
		||||
@@ -50,51 +45,6 @@ def _find_overlaps(times, start, end):
 | 
			
		||||
    return overlaps
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def _spawn_variation(readers, writers, max_workers=None):
 | 
			
		||||
    start_stops = collections.deque()
 | 
			
		||||
    lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
 | 
			
		||||
    def read_func(ident):
 | 
			
		||||
        with lock.read_lock():
 | 
			
		||||
            # TODO(harlowja): sometime in the future use a monotonic clock here
 | 
			
		||||
            # to avoid problems that can be caused by ntpd resyncing the clock
 | 
			
		||||
            # while we are actively running.
 | 
			
		||||
            enter_time = now()
 | 
			
		||||
            time.sleep(WORK_TIMES[ident % len(WORK_TIMES)])
 | 
			
		||||
            exit_time = now()
 | 
			
		||||
            start_stops.append((lock.READER, enter_time, exit_time))
 | 
			
		||||
            time.sleep(NAPPY_TIME)
 | 
			
		||||
 | 
			
		||||
    def write_func(ident):
 | 
			
		||||
        with lock.write_lock():
 | 
			
		||||
            enter_time = now()
 | 
			
		||||
            time.sleep(WORK_TIMES[ident % len(WORK_TIMES)])
 | 
			
		||||
            exit_time = now()
 | 
			
		||||
            start_stops.append((lock.WRITER, enter_time, exit_time))
 | 
			
		||||
            time.sleep(NAPPY_TIME)
 | 
			
		||||
 | 
			
		||||
    if max_workers is None:
 | 
			
		||||
        max_workers = max(0, readers) + max(0, writers)
 | 
			
		||||
    if max_workers > 0:
 | 
			
		||||
        with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
 | 
			
		||||
            count = 0
 | 
			
		||||
            for _i in range(0, readers):
 | 
			
		||||
                e.submit(read_func, count)
 | 
			
		||||
                count += 1
 | 
			
		||||
            for _i in range(0, writers):
 | 
			
		||||
                e.submit(write_func, count)
 | 
			
		||||
                count += 1
 | 
			
		||||
 | 
			
		||||
    writer_times = []
 | 
			
		||||
    reader_times = []
 | 
			
		||||
    for (lock_type, start, stop) in list(start_stops):
 | 
			
		||||
        if lock_type == lock.WRITER:
 | 
			
		||||
            writer_times.append((start, stop))
 | 
			
		||||
        else:
 | 
			
		||||
            reader_times.append((start, stop))
 | 
			
		||||
    return (writer_times, reader_times)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MultilockTest(test.TestCase):
 | 
			
		||||
    THREAD_COUNT = 20
 | 
			
		||||
 | 
			
		||||
@@ -329,306 +279,3 @@ class MultilockTest(test.TestCase):
 | 
			
		||||
        lock2 = threading.Lock()
 | 
			
		||||
        n_lock = lock_utils.MultiLock((lock1, lock2))
 | 
			
		||||
        self.assertRaises(threading.ThreadError, n_lock.release)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ReadWriteLockTest(test.TestCase):
 | 
			
		||||
    THREAD_COUNT = 20
 | 
			
		||||
 | 
			
		||||
    def test_no_double_writers(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        watch = timing.StopWatch(duration=5)
 | 
			
		||||
        watch.start()
 | 
			
		||||
        dups = collections.deque()
 | 
			
		||||
        active = collections.deque()
 | 
			
		||||
 | 
			
		||||
        def acquire_check(me):
 | 
			
		||||
            with lock.write_lock():
 | 
			
		||||
                if len(active) >= 1:
 | 
			
		||||
                    dups.append(me)
 | 
			
		||||
                    dups.extend(active)
 | 
			
		||||
                active.append(me)
 | 
			
		||||
                try:
 | 
			
		||||
                    time.sleep(random.random() / 100)
 | 
			
		||||
                finally:
 | 
			
		||||
                    active.remove(me)
 | 
			
		||||
 | 
			
		||||
        def run():
 | 
			
		||||
            me = threading.current_thread()
 | 
			
		||||
            while not watch.expired():
 | 
			
		||||
                acquire_check(me)
 | 
			
		||||
 | 
			
		||||
        threads = []
 | 
			
		||||
        for i in range(0, self.THREAD_COUNT):
 | 
			
		||||
            t = threading_utils.daemon_thread(run)
 | 
			
		||||
            threads.append(t)
 | 
			
		||||
            t.start()
 | 
			
		||||
        while threads:
 | 
			
		||||
            t = threads.pop()
 | 
			
		||||
            t.join()
 | 
			
		||||
 | 
			
		||||
        self.assertEqual([], list(dups))
 | 
			
		||||
        self.assertEqual([], list(active))
 | 
			
		||||
 | 
			
		||||
    def test_no_concurrent_readers_writers(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        watch = timing.StopWatch(duration=5)
 | 
			
		||||
        watch.start()
 | 
			
		||||
        dups = collections.deque()
 | 
			
		||||
        active = collections.deque()
 | 
			
		||||
 | 
			
		||||
        def acquire_check(me, reader):
 | 
			
		||||
            if reader:
 | 
			
		||||
                lock_func = lock.read_lock
 | 
			
		||||
            else:
 | 
			
		||||
                lock_func = lock.write_lock
 | 
			
		||||
            with lock_func():
 | 
			
		||||
                if not reader:
 | 
			
		||||
                    # There should be no-one else currently active, if there
 | 
			
		||||
                    # is ensure we capture them so that we can later blow-up
 | 
			
		||||
                    # the test.
 | 
			
		||||
                    if len(active) >= 1:
 | 
			
		||||
                        dups.append(me)
 | 
			
		||||
                        dups.extend(active)
 | 
			
		||||
                active.append(me)
 | 
			
		||||
                try:
 | 
			
		||||
                    time.sleep(random.random() / 100)
 | 
			
		||||
                finally:
 | 
			
		||||
                    active.remove(me)
 | 
			
		||||
 | 
			
		||||
        def run():
 | 
			
		||||
            me = threading.current_thread()
 | 
			
		||||
            while not watch.expired():
 | 
			
		||||
                acquire_check(me, random.choice([True, False]))
 | 
			
		||||
 | 
			
		||||
        threads = []
 | 
			
		||||
        for i in range(0, self.THREAD_COUNT):
 | 
			
		||||
            t = threading_utils.daemon_thread(run)
 | 
			
		||||
            threads.append(t)
 | 
			
		||||
            t.start()
 | 
			
		||||
        while threads:
 | 
			
		||||
            t = threads.pop()
 | 
			
		||||
            t.join()
 | 
			
		||||
 | 
			
		||||
        self.assertEqual([], list(dups))
 | 
			
		||||
        self.assertEqual([], list(active))
 | 
			
		||||
 | 
			
		||||
    def test_writer_abort(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        self.assertFalse(lock.owner)
 | 
			
		||||
 | 
			
		||||
        def blow_up():
 | 
			
		||||
            with lock.write_lock():
 | 
			
		||||
                self.assertEqual(lock.WRITER, lock.owner)
 | 
			
		||||
                raise RuntimeError("Broken")
 | 
			
		||||
 | 
			
		||||
        self.assertRaises(RuntimeError, blow_up)
 | 
			
		||||
        self.assertFalse(lock.owner)
 | 
			
		||||
 | 
			
		||||
    def test_reader_abort(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        self.assertFalse(lock.owner)
 | 
			
		||||
 | 
			
		||||
        def blow_up():
 | 
			
		||||
            with lock.read_lock():
 | 
			
		||||
                self.assertEqual(lock.READER, lock.owner)
 | 
			
		||||
                raise RuntimeError("Broken")
 | 
			
		||||
 | 
			
		||||
        self.assertRaises(RuntimeError, blow_up)
 | 
			
		||||
        self.assertFalse(lock.owner)
 | 
			
		||||
 | 
			
		||||
    def test_double_reader_abort(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        activated = collections.deque()
 | 
			
		||||
 | 
			
		||||
        def double_bad_reader():
 | 
			
		||||
            with lock.read_lock():
 | 
			
		||||
                with lock.read_lock():
 | 
			
		||||
                    raise RuntimeError("Broken")
 | 
			
		||||
 | 
			
		||||
        def happy_writer():
 | 
			
		||||
            with lock.write_lock():
 | 
			
		||||
                activated.append(lock.owner)
 | 
			
		||||
 | 
			
		||||
        with futures.ThreadPoolExecutor(max_workers=20) as e:
 | 
			
		||||
            for i in range(0, 20):
 | 
			
		||||
                if i % 2 == 0:
 | 
			
		||||
                    e.submit(double_bad_reader)
 | 
			
		||||
                else:
 | 
			
		||||
                    e.submit(happy_writer)
 | 
			
		||||
 | 
			
		||||
        self.assertEqual(10, len([a for a in activated if a == 'w']))
 | 
			
		||||
 | 
			
		||||
    def test_double_reader_writer(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        activated = collections.deque()
 | 
			
		||||
        active = threading_utils.Event()
 | 
			
		||||
 | 
			
		||||
        def double_reader():
 | 
			
		||||
            with lock.read_lock():
 | 
			
		||||
                active.set()
 | 
			
		||||
                while not lock.has_pending_writers:
 | 
			
		||||
                    time.sleep(0.001)
 | 
			
		||||
                with lock.read_lock():
 | 
			
		||||
                    activated.append(lock.owner)
 | 
			
		||||
 | 
			
		||||
        def happy_writer():
 | 
			
		||||
            with lock.write_lock():
 | 
			
		||||
                activated.append(lock.owner)
 | 
			
		||||
 | 
			
		||||
        reader = threading_utils.daemon_thread(double_reader)
 | 
			
		||||
        reader.start()
 | 
			
		||||
        self.assertTrue(active.wait(test_utils.WAIT_TIMEOUT))
 | 
			
		||||
 | 
			
		||||
        writer = threading_utils.daemon_thread(happy_writer)
 | 
			
		||||
        writer.start()
 | 
			
		||||
 | 
			
		||||
        reader.join()
 | 
			
		||||
        writer.join()
 | 
			
		||||
        self.assertEqual(2, len(activated))
 | 
			
		||||
        self.assertEqual(['r', 'w'], list(activated))
 | 
			
		||||
 | 
			
		||||
    def test_reader_chaotic(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        activated = collections.deque()
 | 
			
		||||
 | 
			
		||||
        def chaotic_reader(blow_up):
 | 
			
		||||
            with lock.read_lock():
 | 
			
		||||
                if blow_up:
 | 
			
		||||
                    raise RuntimeError("Broken")
 | 
			
		||||
                else:
 | 
			
		||||
                    activated.append(lock.owner)
 | 
			
		||||
 | 
			
		||||
        def happy_writer():
 | 
			
		||||
            with lock.write_lock():
 | 
			
		||||
                activated.append(lock.owner)
 | 
			
		||||
 | 
			
		||||
        with futures.ThreadPoolExecutor(max_workers=20) as e:
 | 
			
		||||
            for i in range(0, 20):
 | 
			
		||||
                if i % 2 == 0:
 | 
			
		||||
                    e.submit(chaotic_reader, blow_up=bool(i % 4 == 0))
 | 
			
		||||
                else:
 | 
			
		||||
                    e.submit(happy_writer)
 | 
			
		||||
 | 
			
		||||
        writers = [a for a in activated if a == 'w']
 | 
			
		||||
        readers = [a for a in activated if a == 'r']
 | 
			
		||||
        self.assertEqual(10, len(writers))
 | 
			
		||||
        self.assertEqual(5, len(readers))
 | 
			
		||||
 | 
			
		||||
    def test_writer_chaotic(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        activated = collections.deque()
 | 
			
		||||
 | 
			
		||||
        def chaotic_writer(blow_up):
 | 
			
		||||
            with lock.write_lock():
 | 
			
		||||
                if blow_up:
 | 
			
		||||
                    raise RuntimeError("Broken")
 | 
			
		||||
                else:
 | 
			
		||||
                    activated.append(lock.owner)
 | 
			
		||||
 | 
			
		||||
        def happy_reader():
 | 
			
		||||
            with lock.read_lock():
 | 
			
		||||
                activated.append(lock.owner)
 | 
			
		||||
 | 
			
		||||
        with futures.ThreadPoolExecutor(max_workers=20) as e:
 | 
			
		||||
            for i in range(0, 20):
 | 
			
		||||
                if i % 2 == 0:
 | 
			
		||||
                    e.submit(chaotic_writer, blow_up=bool(i % 4 == 0))
 | 
			
		||||
                else:
 | 
			
		||||
                    e.submit(happy_reader)
 | 
			
		||||
 | 
			
		||||
        writers = [a for a in activated if a == 'w']
 | 
			
		||||
        readers = [a for a in activated if a == 'r']
 | 
			
		||||
        self.assertEqual(5, len(writers))
 | 
			
		||||
        self.assertEqual(10, len(readers))
 | 
			
		||||
 | 
			
		||||
    def test_single_reader_writer(self):
 | 
			
		||||
        results = []
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        with lock.read_lock():
 | 
			
		||||
            self.assertTrue(lock.is_reader())
 | 
			
		||||
            self.assertEqual(0, len(results))
 | 
			
		||||
        with lock.write_lock():
 | 
			
		||||
            results.append(1)
 | 
			
		||||
            self.assertTrue(lock.is_writer())
 | 
			
		||||
        with lock.read_lock():
 | 
			
		||||
            self.assertTrue(lock.is_reader())
 | 
			
		||||
            self.assertEqual(1, len(results))
 | 
			
		||||
        self.assertFalse(lock.is_reader())
 | 
			
		||||
        self.assertFalse(lock.is_writer())
 | 
			
		||||
 | 
			
		||||
    def test_reader_to_writer(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
 | 
			
		||||
        def writer_func():
 | 
			
		||||
            with lock.write_lock():
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
        with lock.read_lock():
 | 
			
		||||
            self.assertRaises(RuntimeError, writer_func)
 | 
			
		||||
            self.assertFalse(lock.is_writer())
 | 
			
		||||
 | 
			
		||||
        self.assertFalse(lock.is_reader())
 | 
			
		||||
        self.assertFalse(lock.is_writer())
 | 
			
		||||
 | 
			
		||||
    def test_writer_to_reader(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
 | 
			
		||||
        def reader_func():
 | 
			
		||||
            with lock.read_lock():
 | 
			
		||||
                self.assertTrue(lock.is_writer())
 | 
			
		||||
                self.assertTrue(lock.is_reader())
 | 
			
		||||
 | 
			
		||||
        with lock.write_lock():
 | 
			
		||||
            self.assertIsNone(reader_func())
 | 
			
		||||
            self.assertFalse(lock.is_reader())
 | 
			
		||||
 | 
			
		||||
        self.assertFalse(lock.is_reader())
 | 
			
		||||
        self.assertFalse(lock.is_writer())
 | 
			
		||||
 | 
			
		||||
    def test_double_writer(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        with lock.write_lock():
 | 
			
		||||
            self.assertFalse(lock.is_reader())
 | 
			
		||||
            self.assertTrue(lock.is_writer())
 | 
			
		||||
            with lock.write_lock():
 | 
			
		||||
                self.assertTrue(lock.is_writer())
 | 
			
		||||
            self.assertTrue(lock.is_writer())
 | 
			
		||||
 | 
			
		||||
        self.assertFalse(lock.is_reader())
 | 
			
		||||
        self.assertFalse(lock.is_writer())
 | 
			
		||||
 | 
			
		||||
    def test_double_reader(self):
 | 
			
		||||
        lock = lock_utils.ReaderWriterLock()
 | 
			
		||||
        with lock.read_lock():
 | 
			
		||||
            self.assertTrue(lock.is_reader())
 | 
			
		||||
            self.assertFalse(lock.is_writer())
 | 
			
		||||
            with lock.read_lock():
 | 
			
		||||
                self.assertTrue(lock.is_reader())
 | 
			
		||||
            self.assertTrue(lock.is_reader())
 | 
			
		||||
 | 
			
		||||
        self.assertFalse(lock.is_reader())
 | 
			
		||||
        self.assertFalse(lock.is_writer())
 | 
			
		||||
 | 
			
		||||
    def test_multi_reader_multi_writer(self):
 | 
			
		||||
        writer_times, reader_times = _spawn_variation(10, 10)
 | 
			
		||||
        self.assertEqual(10, len(writer_times))
 | 
			
		||||
        self.assertEqual(10, len(reader_times))
 | 
			
		||||
        for (start, stop) in writer_times:
 | 
			
		||||
            self.assertEqual(0, _find_overlaps(reader_times, start, stop))
 | 
			
		||||
            self.assertEqual(1, _find_overlaps(writer_times, start, stop))
 | 
			
		||||
        for (start, stop) in reader_times:
 | 
			
		||||
            self.assertEqual(0, _find_overlaps(writer_times, start, stop))
 | 
			
		||||
 | 
			
		||||
    def test_multi_reader_single_writer(self):
 | 
			
		||||
        writer_times, reader_times = _spawn_variation(9, 1)
 | 
			
		||||
        self.assertEqual(1, len(writer_times))
 | 
			
		||||
        self.assertEqual(9, len(reader_times))
 | 
			
		||||
        start, stop = writer_times[0]
 | 
			
		||||
        self.assertEqual(0, _find_overlaps(reader_times, start, stop))
 | 
			
		||||
 | 
			
		||||
    def test_multi_writer(self):
 | 
			
		||||
        writer_times, reader_times = _spawn_variation(0, 10)
 | 
			
		||||
        self.assertEqual(10, len(writer_times))
 | 
			
		||||
        self.assertEqual(0, len(reader_times))
 | 
			
		||||
        for (start, stop) in writer_times:
 | 
			
		||||
            self.assertEqual(1, _find_overlaps(writer_times, start, stop))
 | 
			
		||||
 
 | 
			
		||||
@@ -19,23 +19,14 @@
 | 
			
		||||
# pulls in oslo.cfg) and is reduced to only what taskflow currently wants to
 | 
			
		||||
# use from that code.
 | 
			
		||||
 | 
			
		||||
import collections
 | 
			
		||||
import contextlib
 | 
			
		||||
import errno
 | 
			
		||||
import os
 | 
			
		||||
import threading
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from oslo_utils import importutils
 | 
			
		||||
import six
 | 
			
		||||
 | 
			
		||||
from taskflow import logging
 | 
			
		||||
from taskflow.utils import misc
 | 
			
		||||
 | 
			
		||||
# Used for the reader-writer lock get the right thread 'hack' (needed below).
 | 
			
		||||
eventlet = importutils.try_import('eventlet')
 | 
			
		||||
eventlet_patcher = importutils.try_import('eventlet.patcher')
 | 
			
		||||
 | 
			
		||||
LOG = logging.getLogger(__name__)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -96,203 +87,6 @@ def locked(*args, **kwargs):
 | 
			
		||||
            return decorator
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def read_locked(*args, **kwargs):
 | 
			
		||||
    """Acquires & releases a read lock around call into decorated method.
 | 
			
		||||
 | 
			
		||||
    NOTE(harlowja): if no attribute name is provided then by default the
 | 
			
		||||
    attribute named '_lock' is looked for (this attribute is expected to be
 | 
			
		||||
    the rw-lock object) in the instance object this decorator is attached to.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def decorator(f):
 | 
			
		||||
        attr_name = kwargs.get('lock', '_lock')
 | 
			
		||||
 | 
			
		||||
        @six.wraps(f)
 | 
			
		||||
        def wrapper(self, *args, **kwargs):
 | 
			
		||||
            rw_lock = getattr(self, attr_name)
 | 
			
		||||
            with rw_lock.read_lock():
 | 
			
		||||
                return f(self, *args, **kwargs)
 | 
			
		||||
 | 
			
		||||
        return wrapper
 | 
			
		||||
 | 
			
		||||
    # This is needed to handle when the decorator has args or the decorator
 | 
			
		||||
    # doesn't have args, python is rather weird here...
 | 
			
		||||
    if kwargs or not args:
 | 
			
		||||
        return decorator
 | 
			
		||||
    else:
 | 
			
		||||
        if len(args) == 1:
 | 
			
		||||
            return decorator(args[0])
 | 
			
		||||
        else:
 | 
			
		||||
            return decorator
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def write_locked(*args, **kwargs):
 | 
			
		||||
    """Acquires & releases a write lock around call into decorated method.
 | 
			
		||||
 | 
			
		||||
    NOTE(harlowja): if no attribute name is provided then by default the
 | 
			
		||||
    attribute named '_lock' is looked for (this attribute is expected to be
 | 
			
		||||
    the rw-lock object) in the instance object this decorator is attached to.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def decorator(f):
 | 
			
		||||
        attr_name = kwargs.get('lock', '_lock')
 | 
			
		||||
 | 
			
		||||
        @six.wraps(f)
 | 
			
		||||
        def wrapper(self, *args, **kwargs):
 | 
			
		||||
            rw_lock = getattr(self, attr_name)
 | 
			
		||||
            with rw_lock.write_lock():
 | 
			
		||||
                return f(self, *args, **kwargs)
 | 
			
		||||
 | 
			
		||||
        return wrapper
 | 
			
		||||
 | 
			
		||||
    # This is needed to handle when the decorator has args or the decorator
 | 
			
		||||
    # doesn't have args, python is rather weird here...
 | 
			
		||||
    if kwargs or not args:
 | 
			
		||||
        return decorator
 | 
			
		||||
    else:
 | 
			
		||||
        if len(args) == 1:
 | 
			
		||||
            return decorator(args[0])
 | 
			
		||||
        else:
 | 
			
		||||
            return decorator
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ReaderWriterLock(object):
 | 
			
		||||
    """A reader/writer lock.
 | 
			
		||||
 | 
			
		||||
    This lock allows for simultaneous readers to exist but only one writer
 | 
			
		||||
    to exist for use-cases where it is useful to have such types of locks.
 | 
			
		||||
 | 
			
		||||
    Currently a reader can not escalate its read lock to a write lock and
 | 
			
		||||
    a writer can not acquire a read lock while it is waiting on the write
 | 
			
		||||
    lock.
 | 
			
		||||
 | 
			
		||||
    In the future these restrictions may be relaxed.
 | 
			
		||||
 | 
			
		||||
    This can be eventually removed if http://bugs.python.org/issue8800 ever
 | 
			
		||||
    gets accepted into the python standard threading library...
 | 
			
		||||
    """
 | 
			
		||||
    WRITER = 'w'
 | 
			
		||||
    READER = 'r'
 | 
			
		||||
 | 
			
		||||
    @staticmethod
 | 
			
		||||
    def _fetch_current_thread_functor():
 | 
			
		||||
        # Until https://github.com/eventlet/eventlet/issues/172 is resolved
 | 
			
		||||
        # or addressed we have to use complicated workaround to get a object
 | 
			
		||||
        # that will not be recycled; the usage of threading.current_thread()
 | 
			
		||||
        # doesn't appear to currently be monkey patched and therefore isn't
 | 
			
		||||
        # reliable to use (and breaks badly when used as all threads share
 | 
			
		||||
        # the same current_thread() object)...
 | 
			
		||||
        if eventlet is not None and eventlet_patcher is not None:
 | 
			
		||||
            if eventlet_patcher.is_monkey_patched('thread'):
 | 
			
		||||
                return lambda: eventlet.getcurrent()
 | 
			
		||||
        return lambda: threading.current_thread()
 | 
			
		||||
 | 
			
		||||
    def __init__(self):
 | 
			
		||||
        self._writer = None
 | 
			
		||||
        self._pending_writers = collections.deque()
 | 
			
		||||
        self._readers = collections.deque()
 | 
			
		||||
        self._cond = threading.Condition()
 | 
			
		||||
        self._current_thread = self._fetch_current_thread_functor()
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def has_pending_writers(self):
 | 
			
		||||
        """Returns if there are writers waiting to become the *one* writer."""
 | 
			
		||||
        return bool(self._pending_writers)
 | 
			
		||||
 | 
			
		||||
    def is_writer(self, check_pending=True):
 | 
			
		||||
        """Returns if the caller is the active writer or a pending writer."""
 | 
			
		||||
        me = self._current_thread()
 | 
			
		||||
        if self._writer == me:
 | 
			
		||||
            return True
 | 
			
		||||
        if check_pending:
 | 
			
		||||
            return me in self._pending_writers
 | 
			
		||||
        else:
 | 
			
		||||
            return False
 | 
			
		||||
 | 
			
		||||
    @property
 | 
			
		||||
    def owner(self):
 | 
			
		||||
        """Returns whether the lock is locked by a writer or reader."""
 | 
			
		||||
        with self._cond:
 | 
			
		||||
            # Obtain the lock to ensure we get a accurate view of the actual
 | 
			
		||||
            # owner that isn't likely to change when we are reading it...
 | 
			
		||||
            if self._writer is not None:
 | 
			
		||||
                return self.WRITER
 | 
			
		||||
            if self._readers:
 | 
			
		||||
                return self.READER
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
    def is_reader(self):
 | 
			
		||||
        """Returns if the caller is one of the readers."""
 | 
			
		||||
        me = self._current_thread()
 | 
			
		||||
        return me in self._readers
 | 
			
		||||
 | 
			
		||||
    @contextlib.contextmanager
 | 
			
		||||
    def read_lock(self):
 | 
			
		||||
        """Context manager that grants a read lock.
 | 
			
		||||
 | 
			
		||||
        Will wait until no active or pending writers.
 | 
			
		||||
 | 
			
		||||
        Raises a RuntimeError if a pending writer tries to acquire
 | 
			
		||||
        a read lock.
 | 
			
		||||
        """
 | 
			
		||||
        me = self._current_thread()
 | 
			
		||||
        if me in self._pending_writers:
 | 
			
		||||
            raise RuntimeError("Writer %s can not acquire a read lock"
 | 
			
		||||
                               " while waiting for the write lock"
 | 
			
		||||
                               % me)
 | 
			
		||||
        with self._cond:
 | 
			
		||||
            while True:
 | 
			
		||||
                # No active writer, or we are the writer;
 | 
			
		||||
                # we are good to become a reader.
 | 
			
		||||
                if self._writer is None or self._writer == me:
 | 
			
		||||
                    self._readers.append(me)
 | 
			
		||||
                    break
 | 
			
		||||
                # An active writer; guess we have to wait.
 | 
			
		||||
                self._cond.wait()
 | 
			
		||||
        try:
 | 
			
		||||
            yield self
 | 
			
		||||
        finally:
 | 
			
		||||
            # I am no longer a reader, remove *one* occurrence of myself.
 | 
			
		||||
            # If the current thread acquired two read locks, then it will
 | 
			
		||||
            # still have to remove that other read lock; this allows for
 | 
			
		||||
            # basic reentrancy to be possible.
 | 
			
		||||
            with self._cond:
 | 
			
		||||
                self._readers.remove(me)
 | 
			
		||||
                self._cond.notify_all()
 | 
			
		||||
 | 
			
		||||
    @contextlib.contextmanager
 | 
			
		||||
    def write_lock(self):
 | 
			
		||||
        """Context manager that grants a write lock.
 | 
			
		||||
 | 
			
		||||
        Will wait until no active readers. Blocks readers after acquiring.
 | 
			
		||||
 | 
			
		||||
        Raises a RuntimeError if an active reader attempts to acquire a lock.
 | 
			
		||||
        """
 | 
			
		||||
        me = self._current_thread()
 | 
			
		||||
        if self.is_reader():
 | 
			
		||||
            raise RuntimeError("Reader %s to writer privilege"
 | 
			
		||||
                               " escalation not allowed" % me)
 | 
			
		||||
        if self.is_writer(check_pending=False):
 | 
			
		||||
            # Already the writer; this allows for basic reentrancy.
 | 
			
		||||
            yield self
 | 
			
		||||
        else:
 | 
			
		||||
            with self._cond:
 | 
			
		||||
                self._pending_writers.append(me)
 | 
			
		||||
                while True:
 | 
			
		||||
                    # No readers, and no active writer, am I next??
 | 
			
		||||
                    if len(self._readers) == 0 and self._writer is None:
 | 
			
		||||
                        if self._pending_writers[0] == me:
 | 
			
		||||
                            self._writer = self._pending_writers.popleft()
 | 
			
		||||
                            break
 | 
			
		||||
                    self._cond.wait()
 | 
			
		||||
            try:
 | 
			
		||||
                yield self
 | 
			
		||||
            finally:
 | 
			
		||||
                with self._cond:
 | 
			
		||||
                    self._writer = None
 | 
			
		||||
                    self._cond.notify_all()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class MultiLock(object):
 | 
			
		||||
    """A class which attempts to obtain & release many locks at once.
 | 
			
		||||
 | 
			
		||||
@@ -411,107 +205,3 @@ class MultiLock(object):
 | 
			
		||||
        # At the end only clear it off, so that under partial failure we don't
 | 
			
		||||
        # lose any locks...
 | 
			
		||||
        self._lock_stacks.pop()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class _InterProcessLock(object):
 | 
			
		||||
    """An interprocess locking implementation.
 | 
			
		||||
 | 
			
		||||
    This is a lock implementation which allows multiple locks, working around
 | 
			
		||||
    issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
 | 
			
		||||
    not require any cleanup. Since the lock is always held on a file
 | 
			
		||||
    descriptor rather than outside of the process, the lock gets dropped
 | 
			
		||||
    automatically if the process crashes, even if __exit__ is not executed.
 | 
			
		||||
 | 
			
		||||
    There are no guarantees regarding usage by multiple green threads in a
 | 
			
		||||
    single process here. This lock works only between processes.
 | 
			
		||||
 | 
			
		||||
    Note these locks are released when the descriptor is closed, so it's not
 | 
			
		||||
    safe to close the file descriptor while another green thread holds the
 | 
			
		||||
    lock. Just opening and closing the lock file can break synchronisation,
 | 
			
		||||
    so lock files must be accessed only using this abstraction.
 | 
			
		||||
    """
 | 
			
		||||
 | 
			
		||||
    def __init__(self, name):
 | 
			
		||||
        self.lockfile = None
 | 
			
		||||
        self.fname = name
 | 
			
		||||
 | 
			
		||||
    def acquire(self):
 | 
			
		||||
        basedir = os.path.dirname(self.fname)
 | 
			
		||||
 | 
			
		||||
        if not os.path.exists(basedir):
 | 
			
		||||
            misc.ensure_tree(basedir)
 | 
			
		||||
            LOG.debug('Created lock path: %s', basedir)
 | 
			
		||||
 | 
			
		||||
        self.lockfile = open(self.fname, 'w')
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            try:
 | 
			
		||||
                # Using non-blocking locks since green threads are not
 | 
			
		||||
                # patched to deal with blocking locking calls.
 | 
			
		||||
                # Also upon reading the MSDN docs for locking(), it seems
 | 
			
		||||
                # to have a laughable 10 attempts "blocking" mechanism.
 | 
			
		||||
                self.trylock()
 | 
			
		||||
                LOG.debug('Got file lock "%s"', self.fname)
 | 
			
		||||
                return True
 | 
			
		||||
            except IOError as e:
 | 
			
		||||
                if e.errno in (errno.EACCES, errno.EAGAIN):
 | 
			
		||||
                    # external locks synchronise things like iptables
 | 
			
		||||
                    # updates - give it some time to prevent busy spinning
 | 
			
		||||
                    time.sleep(0.01)
 | 
			
		||||
                else:
 | 
			
		||||
                    raise threading.ThreadError("Unable to acquire lock on"
 | 
			
		||||
                                                " `%(filename)s` due to"
 | 
			
		||||
                                                " %(exception)s" %
 | 
			
		||||
                                                {
 | 
			
		||||
                                                    'filename': self.fname,
 | 
			
		||||
                                                    'exception': e,
 | 
			
		||||
                                                })
 | 
			
		||||
 | 
			
		||||
    def __enter__(self):
 | 
			
		||||
        self.acquire()
 | 
			
		||||
        return self
 | 
			
		||||
 | 
			
		||||
    def release(self):
 | 
			
		||||
        try:
 | 
			
		||||
            self.unlock()
 | 
			
		||||
            self.lockfile.close()
 | 
			
		||||
            LOG.debug('Released file lock "%s"', self.fname)
 | 
			
		||||
        except IOError:
 | 
			
		||||
            LOG.exception("Could not release the acquired lock `%s`",
 | 
			
		||||
                          self.fname)
 | 
			
		||||
 | 
			
		||||
    def __exit__(self, exc_type, exc_val, exc_tb):
 | 
			
		||||
        self.release()
 | 
			
		||||
 | 
			
		||||
    def exists(self):
 | 
			
		||||
        return os.path.exists(self.fname)
 | 
			
		||||
 | 
			
		||||
    def trylock(self):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
    def unlock(self):
 | 
			
		||||
        raise NotImplementedError()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class _WindowsLock(_InterProcessLock):
 | 
			
		||||
    def trylock(self):
 | 
			
		||||
        msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_NBLCK, 1)
 | 
			
		||||
 | 
			
		||||
    def unlock(self):
 | 
			
		||||
        msvcrt.locking(self.lockfile.fileno(), msvcrt.LK_UNLCK, 1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class _PosixLock(_InterProcessLock):
 | 
			
		||||
    def trylock(self):
 | 
			
		||||
        fcntl.lockf(self.lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
 | 
			
		||||
 | 
			
		||||
    def unlock(self):
 | 
			
		||||
        fcntl.lockf(self.lockfile, fcntl.LOCK_UN)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if os.name == 'nt':
 | 
			
		||||
    import msvcrt
 | 
			
		||||
    InterProcessLock = _WindowsLock
 | 
			
		||||
else:
 | 
			
		||||
    import fcntl
 | 
			
		||||
    InterProcessLock = _PosixLock
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user