diff --git a/tooz/drivers/file.py b/tooz/drivers/file.py index 24fdffa9..605bbf7a 100644 --- a/tooz/drivers/file.py +++ b/tooz/drivers/file.py @@ -22,6 +22,7 @@ import logging import os import shutil import threading +import weakref from concurrent import futures @@ -37,6 +38,12 @@ from tooz import utils LOG = logging.getLogger(__name__) +class _Barrier(object): + def __init__(self): + self.cond = threading.Condition() + self.owner = None + + @contextlib.contextmanager def _translate_failures(): try: @@ -64,11 +71,12 @@ def _lock_me(lock): class FileLock(locking.Lock): """A file based lock.""" - def __init__(self, path): + def __init__(self, path, barrier, member_id): super(FileLock, self).__init__(path) self.acquired = False self._lock = fasteners.InterProcessLock(path) - self._cond = threading.Condition() + self._barrier = barrier + self._member_id = member_id def is_still_owner(self): return self.acquired @@ -77,33 +85,46 @@ class FileLock(locking.Lock): blocking, timeout = utils.convert_blocking(blocking) watch = timeutils.StopWatch(duration=timeout) watch.start() - while True: - with self._cond: - if self.acquired and blocking: - if watch.expired(): - return False - # If in the same process wait until we can attempt to - # acquire it (aka, another thread should release it before - # we can try to get it). - self._cond.wait(watch.leftover(return_none=True)) - elif self.acquired and not blocking: + + # Make the shared barrier ours first. + with self._barrier.cond: + while self._barrier.owner is not None: + if not blocking or watch.expired(): return False - else: - # All the prior waits may have left less time to wait... - timeout = watch.leftover(return_none=True) - self.acquired = self._lock.acquire(blocking=blocking, - timeout=timeout) - return self.acquired + self._barrier.cond.wait(watch.leftover(return_none=True)) + self._barrier.owner = (threading.current_thread().ident, + os.getpid(), self._member_id) + + # Ok at this point we are now working in a thread safe manner, + # and now we can try to get the actual lock... + gotten = False + try: + gotten = self._lock.acquire( + blocking=blocking, + # Since the barrier waiting may have + # taken a long time, we have to use + # the leftover (and not the original). + timeout=watch.leftover(return_none=True)) + finally: + # NOTE(harlowja): do this in a finally block to **ensure** that + # we release the barrier if something bad happens... + if not gotten: + # Release the barrier to let someone else have a go at it... + with self._barrier.cond: + self._barrier.owner = None + self._barrier.cond.notify_all() + + self.acquired = gotten + return gotten def release(self): - with self._cond: - if self.acquired: - self._lock.release() - self.acquired = False - self._cond.notify_all() - return True - else: - return False + if not self.acquired: + return False + with self._barrier.cond: + self.acquired = False + self._barrier.owner = None + self._barrier.cond.notify_all() + return True def __del__(self): if self.acquired: @@ -134,6 +155,13 @@ class FileDriver(coordination._RunWatchersMixin, """This routine is used to hash a member (or group) id into a filesystem safe name that can be used for member lookup and group joining.""" + _barriers = weakref.WeakValueDictionary() + """ + Barriers shared among all file driver locks, this is required + since interprocess locking is not thread aware, so we must add the + thread awareness on-top of it instead. + """ + def __init__(self, member_id, parsed_url, options): """Initialize the file driver.""" super(FileDriver, self).__init__() @@ -142,21 +170,26 @@ class FileDriver(coordination._RunWatchersMixin, self._executor = utils.ProxyExecutor.build("File", options) self._group_dir = os.path.join(self._dir, 'groups') self._driver_lock_path = os.path.join(self._dir, '.driver_lock') - self._driver_lock = locking.SharedWeakLockHelper( - self._dir, FileLock, self._driver_lock_path) + self._driver_lock = self._get_raw_lock(self._driver_lock_path, + self._member_id) self._reserved_dirs = [self._dir, self._group_dir] self._reserved_paths = list(self._reserved_dirs) self._reserved_paths.append(self._driver_lock_path) self._joined_groups = set() self._safe_member_id = self._make_filesystem_safe(member_id) + @classmethod + def _get_raw_lock(cls, path, member_id): + lock_barrier = cls._barriers.setdefault(path, _Barrier()) + return FileLock(path, lock_barrier, member_id) + def get_lock(self, name): path = utils.safe_abs_path(self._dir, name.decode()) if path in self._reserved_paths: raise ValueError("Unable to create a lock using" " reserved path '%s' for lock" " with name '%s'" % (path, name)) - return locking.SharedWeakLockHelper(self._dir, FileLock, path) + return self._get_raw_lock(path, self._member_id) @classmethod def _make_filesystem_safe(cls, item): diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 1647185b..e865ae46 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -132,12 +132,11 @@ class MemcachedLock(locking.Lock): # id and then do the delete and bail out if the session id is not # as expected but memcache doesn't seem to have any equivalent # capability. - if self.coord.client.delete(self.name, noreply=False): - self.acquired = False + if (self in self.coord._acquired_locks + and self.coord.client.delete(self.name, noreply=False)): self.coord._acquired_locks.remove(self) return True - else: - return False + return False @_translate_failures def heartbeat(self): diff --git a/tooz/locking.py b/tooz/locking.py index cd15760f..c611c441 100644 --- a/tooz/locking.py +++ b/tooz/locking.py @@ -16,8 +16,6 @@ import abc import six -import threading -import weakref import tooz from tooz import coordination @@ -93,57 +91,3 @@ class Lock(object): :rtype: bool """ - - -class SharedWeakLockHelper(Lock): - """Helper for lock that need to rely on a state in memory and - be the same object across each coordinator.get_lock(...) - """ - - LOCKS_LOCK = threading.Lock() - ACQUIRED_LOCKS = dict() - RELEASED_LOCKS = weakref.WeakValueDictionary() - - def __init__(self, namespace, lockclass, name, *args, **kwargs): - super(SharedWeakLockHelper, self).__init__(name) - self._lock_key = "%s:%s" % (namespace, name) - self._newlock = lambda: lockclass( - self.name, *args, **kwargs) - - @property - def lock(self): - """Access the underlying lock object. - - For internal usage only. - """ - with self.LOCKS_LOCK: - try: - l = self.ACQUIRED_LOCKS[self._lock_key] - except KeyError: - l = self.RELEASED_LOCKS.setdefault( - self._lock_key, self._newlock()) - return l - - def acquire(self, blocking=True): - l = self.lock - if l.acquire(blocking): - with self.LOCKS_LOCK: - self.RELEASED_LOCKS.pop(self._lock_key, None) - self.ACQUIRED_LOCKS[self._lock_key] = l - return True - return False - - def release(self): - with self.LOCKS_LOCK: - try: - l = self.ACQUIRED_LOCKS.pop(self._lock_key) - except KeyError: - return False - else: - if l.release(): - self.RELEASED_LOCKS[self._lock_key] = l - return True - else: - # Put it back... - self.ACQUIRED_LOCKS[self._lock_key] = l - return False diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index 1225c2da..a1581ce8 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -194,6 +194,16 @@ class TestAPI(testscenarios.TestWithScenarios, self.assertRaises(tooz.coordination.MemberNotJoined, leave_group.get) + def test_get_lock_twice_locked_one_released_two(self): + name = self._get_random_uuid() + lock1 = self._coord.get_lock(name) + lock2 = self._coord.get_lock(name) + self.assertTrue(lock1.acquire()) + self.assertFalse(lock2.acquire(blocking=False)) + self.assertFalse(lock2.release()) + self.assertTrue(lock1.release()) + self.assertFalse(lock2.release()) + def test_get_members(self): group_id_test2 = self._get_random_uuid() member_id_test2 = self._get_random_uuid()