Make all locks operate the same when fetched from different coordinators
Instead of having one outlier lock class (file locks) which fetch the same lock object from different coordinators have each coordinator that is using the file driver return different locks and only share the absolute minimum of components to ensure this is done in a safe manner (due to lack of thread awareness in process based locks). Change-Id: I32d22230cb0165ccfbb39b6e97f12f906836b0e1
This commit is contained in:
parent
c08caaef07
commit
ba3c23cf42
@ -22,6 +22,7 @@ import logging
|
||||
import os
|
||||
import shutil
|
||||
import threading
|
||||
import weakref
|
||||
|
||||
from concurrent import futures
|
||||
|
||||
@ -37,6 +38,12 @@ from tooz import utils
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class _Barrier(object):
|
||||
def __init__(self):
|
||||
self.cond = threading.Condition()
|
||||
self.owner = None
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _translate_failures():
|
||||
try:
|
||||
@ -64,11 +71,12 @@ def _lock_me(lock):
|
||||
class FileLock(locking.Lock):
|
||||
"""A file based lock."""
|
||||
|
||||
def __init__(self, path):
|
||||
def __init__(self, path, barrier, member_id):
|
||||
super(FileLock, self).__init__(path)
|
||||
self.acquired = False
|
||||
self._lock = fasteners.InterProcessLock(path)
|
||||
self._cond = threading.Condition()
|
||||
self._barrier = barrier
|
||||
self._member_id = member_id
|
||||
|
||||
def is_still_owner(self):
|
||||
return self.acquired
|
||||
@ -77,33 +85,46 @@ class FileLock(locking.Lock):
|
||||
blocking, timeout = utils.convert_blocking(blocking)
|
||||
watch = timeutils.StopWatch(duration=timeout)
|
||||
watch.start()
|
||||
while True:
|
||||
with self._cond:
|
||||
if self.acquired and blocking:
|
||||
if watch.expired():
|
||||
|
||||
# Make the shared barrier ours first.
|
||||
with self._barrier.cond:
|
||||
while self._barrier.owner is not None:
|
||||
if not blocking or watch.expired():
|
||||
return False
|
||||
# If in the same process wait until we can attempt to
|
||||
# acquire it (aka, another thread should release it before
|
||||
# we can try to get it).
|
||||
self._cond.wait(watch.leftover(return_none=True))
|
||||
elif self.acquired and not blocking:
|
||||
return False
|
||||
else:
|
||||
# All the prior waits may have left less time to wait...
|
||||
timeout = watch.leftover(return_none=True)
|
||||
self.acquired = self._lock.acquire(blocking=blocking,
|
||||
timeout=timeout)
|
||||
return self.acquired
|
||||
self._barrier.cond.wait(watch.leftover(return_none=True))
|
||||
self._barrier.owner = (threading.current_thread().ident,
|
||||
os.getpid(), self._member_id)
|
||||
|
||||
# Ok at this point we are now working in a thread safe manner,
|
||||
# and now we can try to get the actual lock...
|
||||
gotten = False
|
||||
try:
|
||||
gotten = self._lock.acquire(
|
||||
blocking=blocking,
|
||||
# Since the barrier waiting may have
|
||||
# taken a long time, we have to use
|
||||
# the leftover (and not the original).
|
||||
timeout=watch.leftover(return_none=True))
|
||||
finally:
|
||||
# NOTE(harlowja): do this in a finally block to **ensure** that
|
||||
# we release the barrier if something bad happens...
|
||||
if not gotten:
|
||||
# Release the barrier to let someone else have a go at it...
|
||||
with self._barrier.cond:
|
||||
self._barrier.owner = None
|
||||
self._barrier.cond.notify_all()
|
||||
|
||||
self.acquired = gotten
|
||||
return gotten
|
||||
|
||||
def release(self):
|
||||
with self._cond:
|
||||
if self.acquired:
|
||||
self._lock.release()
|
||||
self.acquired = False
|
||||
self._cond.notify_all()
|
||||
return True
|
||||
else:
|
||||
if not self.acquired:
|
||||
return False
|
||||
with self._barrier.cond:
|
||||
self.acquired = False
|
||||
self._barrier.owner = None
|
||||
self._barrier.cond.notify_all()
|
||||
return True
|
||||
|
||||
def __del__(self):
|
||||
if self.acquired:
|
||||
@ -134,6 +155,13 @@ class FileDriver(coordination._RunWatchersMixin,
|
||||
"""This routine is used to hash a member (or group) id into a filesystem
|
||||
safe name that can be used for member lookup and group joining."""
|
||||
|
||||
_barriers = weakref.WeakValueDictionary()
|
||||
"""
|
||||
Barriers shared among all file driver locks, this is required
|
||||
since interprocess locking is not thread aware, so we must add the
|
||||
thread awareness on-top of it instead.
|
||||
"""
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
"""Initialize the file driver."""
|
||||
super(FileDriver, self).__init__()
|
||||
@ -142,21 +170,26 @@ class FileDriver(coordination._RunWatchersMixin,
|
||||
self._executor = utils.ProxyExecutor.build("File", options)
|
||||
self._group_dir = os.path.join(self._dir, 'groups')
|
||||
self._driver_lock_path = os.path.join(self._dir, '.driver_lock')
|
||||
self._driver_lock = locking.SharedWeakLockHelper(
|
||||
self._dir, FileLock, self._driver_lock_path)
|
||||
self._driver_lock = self._get_raw_lock(self._driver_lock_path,
|
||||
self._member_id)
|
||||
self._reserved_dirs = [self._dir, self._group_dir]
|
||||
self._reserved_paths = list(self._reserved_dirs)
|
||||
self._reserved_paths.append(self._driver_lock_path)
|
||||
self._joined_groups = set()
|
||||
self._safe_member_id = self._make_filesystem_safe(member_id)
|
||||
|
||||
@classmethod
|
||||
def _get_raw_lock(cls, path, member_id):
|
||||
lock_barrier = cls._barriers.setdefault(path, _Barrier())
|
||||
return FileLock(path, lock_barrier, member_id)
|
||||
|
||||
def get_lock(self, name):
|
||||
path = utils.safe_abs_path(self._dir, name.decode())
|
||||
if path in self._reserved_paths:
|
||||
raise ValueError("Unable to create a lock using"
|
||||
" reserved path '%s' for lock"
|
||||
" with name '%s'" % (path, name))
|
||||
return locking.SharedWeakLockHelper(self._dir, FileLock, path)
|
||||
return self._get_raw_lock(path, self._member_id)
|
||||
|
||||
@classmethod
|
||||
def _make_filesystem_safe(cls, item):
|
||||
|
@ -132,11 +132,10 @@ class MemcachedLock(locking.Lock):
|
||||
# id and then do the delete and bail out if the session id is not
|
||||
# as expected but memcache doesn't seem to have any equivalent
|
||||
# capability.
|
||||
if self.coord.client.delete(self.name, noreply=False):
|
||||
self.acquired = False
|
||||
if (self in self.coord._acquired_locks
|
||||
and self.coord.client.delete(self.name, noreply=False)):
|
||||
self.coord._acquired_locks.remove(self)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
@_translate_failures
|
||||
|
@ -16,8 +16,6 @@
|
||||
import abc
|
||||
|
||||
import six
|
||||
import threading
|
||||
import weakref
|
||||
|
||||
import tooz
|
||||
from tooz import coordination
|
||||
@ -93,57 +91,3 @@ class Lock(object):
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
|
||||
|
||||
class SharedWeakLockHelper(Lock):
|
||||
"""Helper for lock that need to rely on a state in memory and
|
||||
be the same object across each coordinator.get_lock(...)
|
||||
"""
|
||||
|
||||
LOCKS_LOCK = threading.Lock()
|
||||
ACQUIRED_LOCKS = dict()
|
||||
RELEASED_LOCKS = weakref.WeakValueDictionary()
|
||||
|
||||
def __init__(self, namespace, lockclass, name, *args, **kwargs):
|
||||
super(SharedWeakLockHelper, self).__init__(name)
|
||||
self._lock_key = "%s:%s" % (namespace, name)
|
||||
self._newlock = lambda: lockclass(
|
||||
self.name, *args, **kwargs)
|
||||
|
||||
@property
|
||||
def lock(self):
|
||||
"""Access the underlying lock object.
|
||||
|
||||
For internal usage only.
|
||||
"""
|
||||
with self.LOCKS_LOCK:
|
||||
try:
|
||||
l = self.ACQUIRED_LOCKS[self._lock_key]
|
||||
except KeyError:
|
||||
l = self.RELEASED_LOCKS.setdefault(
|
||||
self._lock_key, self._newlock())
|
||||
return l
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
l = self.lock
|
||||
if l.acquire(blocking):
|
||||
with self.LOCKS_LOCK:
|
||||
self.RELEASED_LOCKS.pop(self._lock_key, None)
|
||||
self.ACQUIRED_LOCKS[self._lock_key] = l
|
||||
return True
|
||||
return False
|
||||
|
||||
def release(self):
|
||||
with self.LOCKS_LOCK:
|
||||
try:
|
||||
l = self.ACQUIRED_LOCKS.pop(self._lock_key)
|
||||
except KeyError:
|
||||
return False
|
||||
else:
|
||||
if l.release():
|
||||
self.RELEASED_LOCKS[self._lock_key] = l
|
||||
return True
|
||||
else:
|
||||
# Put it back...
|
||||
self.ACQUIRED_LOCKS[self._lock_key] = l
|
||||
return False
|
||||
|
@ -194,6 +194,16 @@ class TestAPI(testscenarios.TestWithScenarios,
|
||||
self.assertRaises(tooz.coordination.MemberNotJoined,
|
||||
leave_group.get)
|
||||
|
||||
def test_get_lock_twice_locked_one_released_two(self):
|
||||
name = self._get_random_uuid()
|
||||
lock1 = self._coord.get_lock(name)
|
||||
lock2 = self._coord.get_lock(name)
|
||||
self.assertTrue(lock1.acquire())
|
||||
self.assertFalse(lock2.acquire(blocking=False))
|
||||
self.assertFalse(lock2.release())
|
||||
self.assertTrue(lock1.release())
|
||||
self.assertFalse(lock2.release())
|
||||
|
||||
def test_get_members(self):
|
||||
group_id_test2 = self._get_random_uuid()
|
||||
member_id_test2 = self._get_random_uuid()
|
||||
|
Loading…
Reference in New Issue
Block a user