Add shared filelock

Currently each driver in tooz only support exclusive lock, it is
inefficient in some scenarios which only need shared lock.

This patch adds shared lock in file driver.

Change-Id: I55a528977e74b27da736f421d2eee28c7aae8aa3
Implements: blueprint support-shared-lock
Co-Authored-By: Julien Danjou <julien@danjou.info>
This commit is contained in:
zhangsong 2017-03-15 12:51:27 +08:00
parent f624e16f62
commit e6dc88b2a5
11 changed files with 112 additions and 20 deletions

View File

@ -19,6 +19,7 @@ from __future__ import absolute_import
import consul
from oslo_utils import encodeutils
import tooz
from tooz import _retry
from tooz import coordination
from tooz import locking
@ -35,7 +36,9 @@ class ConsulLock(locking.Lock):
self._client = client
self.acquired = False
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
@_retry.retry(stop_max_delay=blocking)
def _acquire():

View File

@ -108,7 +108,10 @@ class EtcdLock(locking.Lock):
return reply.get('errorCode') is None
@fasteners.locked
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
blocking, timeout = utils.convert_blocking(blocking)
if timeout is not None:
watch = timeutils.StopWatch(duration=timeout)

View File

@ -48,6 +48,8 @@ class _Barrier(object):
def __init__(self):
self.cond = threading.Condition()
self.owner = None
self.shared = False
self.ref = 0
@contextlib.contextmanager
@ -109,11 +111,12 @@ class FileLock(locking.Lock):
self._lock = fasteners.InterProcessLock(path)
self._barrier = barrier
self._member_id = member_id
self.ref = 0
def is_still_owner(self):
return self.acquired
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
blocking, timeout = utils.convert_blocking(blocking)
watch = timeutils.StopWatch(duration=timeout)
watch.start()
@ -121,11 +124,16 @@ class FileLock(locking.Lock):
# Make the shared barrier ours first.
with self._barrier.cond:
while self._barrier.owner is not None:
if (shared and self._barrier.shared):
break
if not blocking or watch.expired():
return False
self._barrier.cond.wait(watch.leftover(return_none=True))
self._barrier.owner = (threading.current_thread().ident,
os.getpid(), self._member_id)
self._barrier.shared = shared
self._barrier.ref += 1
self.ref += 1
# Ok at this point we are now working in a thread safe manner,
# and now we can try to get the actual lock...
@ -144,6 +152,8 @@ class FileLock(locking.Lock):
# Release the barrier to let someone else have a go at it...
with self._barrier.cond:
self._barrier.owner = None
self._barrier.ref = 0
self._barrier.shared = False
self._barrier.cond.notify_all()
self.acquired = gotten
@ -153,10 +163,14 @@ class FileLock(locking.Lock):
if not self.acquired:
return False
with self._barrier.cond:
self.acquired = False
self._barrier.owner = None
self._lock.release()
self._barrier.cond.notify_all()
self._barrier.ref -= 1
self.ref -= 1
if not self.ref:
self.acquired = False
if not self._barrier.ref:
self._barrier.owner = None
self._lock.release()
self._barrier.cond.notify_all()
return True
def __del__(self):

View File

@ -73,7 +73,10 @@ class IPCLock(locking.Lock):
else:
return True
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
if (blocking is not True and
sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False):
raise tooz.NotImplemented("This system does not support"

View File

@ -83,7 +83,9 @@ class MemcachedLock(locking.Lock):
return False
return owner == self.coord._member_id
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
@_retry.retry(stop_max_delay=blocking)
@_translate_failures

View File

@ -37,7 +37,10 @@ class MySQLLock(locking.Lock):
self.acquired = False
self._conn = MySQLDriver.get_connection(parsed_url, options)
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
@_retry.retry(stop_max_delay=blocking)
def _lock():

View File

@ -104,7 +104,10 @@ class PostgresLock(locking.Lock):
else:
self.key = h.digest()[0:2]
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
@_retry.retry(stop_max_delay=blocking)
def _lock():

View File

@ -80,7 +80,9 @@ class RedisLock(locking.Lock):
with _translate_failures():
return bool(self._client.delete(self.name))
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
blocking, timeout = utils.convert_blocking(blocking)
with _translate_failures():
acquired = self._lock.acquire(

View File

@ -57,7 +57,9 @@ class ZooKeeperLock(locking.Lock):
"operation error: %s" % (e),
cause=e)
def acquire(self, blocking=True):
def acquire(self, blocking=True, shared=False):
if shared:
raise tooz.NotImplemented
blocking, timeout = utils.convert_blocking(blocking)
return self._lock.acquire(blocking=blocking,
timeout=timeout)

View File

@ -22,12 +22,13 @@ from tooz import coordination
class _LockProxy(object):
def __init__(self, lock, blocking=True):
def __init__(self, lock, *args, **kwargs):
self.lock = lock
self.blocking = blocking
self.args = args
self.kwargs = kwargs
def __enter__(self):
return self.lock.__enter__(self.blocking)
return self.lock.__enter__(*self.args, **self.kwargs)
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.__exit__(exc_type, exc_val, exc_tb)
@ -44,11 +45,11 @@ class Lock(object):
def name(self):
return self._name
def __call__(self, blocking=True):
return _LockProxy(self, blocking)
def __call__(self, *args, **kwargs):
return _LockProxy(self, *args, **kwargs)
def __enter__(self, blocking=True):
acquired = self.acquire(blocking)
def __enter__(self, *args, **kwargs):
acquired = self.acquire(*args, **kwargs)
if not acquired:
msg = u'Acquiring lock %s failed' % self.name
raise coordination.LockAcquireFailed(msg)

View File

@ -712,6 +712,62 @@ class TestAPI(tests.TestWithCoordinator):
with lock:
pass
def test_get_shared_lock(self):
lock = self._coord.get_lock(tests.get_random_uuid())
self.assertTrue(lock.acquire(shared=True))
self.assertTrue(lock.release())
with lock(shared=True):
pass
def test_get_shared_lock_locking_same_lock_twice(self):
lock = self._coord.get_lock(tests.get_random_uuid())
self.assertTrue(lock.acquire(shared=True))
self.assertTrue(lock.acquire(shared=True))
self.assertTrue(lock.release())
self.assertTrue(lock.release())
self.assertFalse(lock.release())
with lock(shared=True):
pass
def test_get_shared_lock_locking_two_lock(self):
name = tests.get_random_uuid()
lock1 = self._coord.get_lock(name)
coord = tooz.coordination.get_coordinator(
self.url, tests.get_random_uuid())
coord.start()
lock2 = coord.get_lock(name)
self.assertTrue(lock1.acquire(shared=True))
self.assertTrue(lock2.acquire(shared=True))
self.assertTrue(lock1.release())
self.assertTrue(lock2.release())
def test_get_lock_locking_shared_and_exclusive(self):
name = tests.get_random_uuid()
lock1 = self._coord.get_lock(name)
coord = tooz.coordination.get_coordinator(
self.url, tests.get_random_uuid())
coord.start()
lock2 = coord.get_lock(name)
self.assertTrue(lock1.acquire(shared=True))
self.assertFalse(lock2.acquire(blocking=False))
self.assertTrue(lock1.release())
self.assertFalse(lock2.release())
def test_get_lock_locking_exclusive_and_shared(self):
name = tests.get_random_uuid()
lock1 = self._coord.get_lock(name)
coord = tooz.coordination.get_coordinator(
self.url, tests.get_random_uuid())
coord.start()
lock2 = coord.get_lock(name)
self.assertTrue(lock1.acquire())
self.assertFalse(lock2.acquire(shared=True, blocking=False))
self.assertTrue(lock1.release())
self.assertFalse(lock2.release())
def test_get_lock_concurrency_locking_same_lock(self):
lock = self._coord.get_lock(tests.get_random_uuid())