Add lock breaking
Lock breaking (which breaks the lock forcefully) is useful for testing other features in tooz that need to handle a lock being broken (or owned by someone else) after they were initially acquired so adding it will be quite useful going forward (for tooz and users of tooz). Co-Authored-By: Clint Byrum <clint@fewbar.com> Change-Id: Icd17f57045c5642d81bb643a846ceb4fb6859f0e
This commit is contained in:
@@ -35,6 +35,11 @@ def _translate_failures(func):
|
||||
def wrapper(*args, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except ValueError as e:
|
||||
# Typically json decoding failed for some reason.
|
||||
coordination.raise_with_cause(coordination.ToozError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
cause=e)
|
||||
except requests.exceptions.RequestException as e:
|
||||
coordination.raise_with_cause(coordination.ToozConnectionError,
|
||||
encodeutils.exception_to_unicode(e),
|
||||
@@ -87,6 +92,12 @@ class EtcdLock(locking.Lock):
|
||||
self.lock = None
|
||||
self.ttl = ttl
|
||||
self._lock_url = lock_url
|
||||
self._node = None
|
||||
|
||||
@_translate_failures
|
||||
def break_(self):
|
||||
reply = self.client.delete(self._lock_url, make_url=False)
|
||||
return reply.get('errorCode') is None
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
blocking, timeout = utils.convert_blocking(blocking)
|
||||
@@ -110,6 +121,7 @@ class EtcdLock(locking.Lock):
|
||||
|
||||
# We got the lock!
|
||||
if reply.get("errorCode") is None:
|
||||
self._node = reply['node']
|
||||
self.coord._acquired_locks.append(self)
|
||||
return True
|
||||
|
||||
@@ -131,10 +143,17 @@ class EtcdLock(locking.Lock):
|
||||
@_translate_failures
|
||||
def release(self):
|
||||
if self in self.coord._acquired_locks:
|
||||
reply = self.client.delete(self._lock_url, make_url=False)
|
||||
if reply.get("errorCode") is None:
|
||||
lock_url = self._lock_url
|
||||
lock_url += "?prevIndex=%s" % self._node['modifiedIndex']
|
||||
reply = self.client.delete(lock_url, make_url=False)
|
||||
errorcode = reply.get("errorCode")
|
||||
if errorcode is None:
|
||||
self.coord._acquired_locks.remove(self)
|
||||
self._node = None
|
||||
return True
|
||||
else:
|
||||
LOG.warn("Unable to release '%s' due to %d, %s",
|
||||
self.name, errorcode, reply.get('message'))
|
||||
return False
|
||||
|
||||
@_translate_failures
|
||||
|
||||
@@ -64,6 +64,15 @@ class IPCLock(locking.Lock):
|
||||
self.key = ftok(name, self._LOCK_PROJECT)
|
||||
self._lock = None
|
||||
|
||||
def break_(self):
|
||||
try:
|
||||
lock = sysv_ipc.Semaphore(key=self.key)
|
||||
lock.remove()
|
||||
except sysv_ipc.ExistentialError:
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
if (blocking is not True and
|
||||
sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False):
|
||||
@@ -112,8 +121,12 @@ class IPCLock(locking.Lock):
|
||||
|
||||
def release(self):
|
||||
if self._lock is not None:
|
||||
self._lock.remove()
|
||||
self._lock = None
|
||||
try:
|
||||
self._lock.remove()
|
||||
except sysv_ipc.ExistentialError:
|
||||
return False
|
||||
finally:
|
||||
self._lock = None
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
@@ -103,6 +103,10 @@ class MemcachedLock(locking.Lock):
|
||||
self.acquired = gotten = _acquire()
|
||||
return gotten
|
||||
|
||||
@_translate_failures
|
||||
def break_(self):
|
||||
return bool(self.coord.client.delete(self.name, noreply=False))
|
||||
|
||||
@_translate_failures
|
||||
def release(self):
|
||||
if not self.acquired:
|
||||
@@ -133,11 +137,19 @@ class MemcachedLock(locking.Lock):
|
||||
# id and then do the delete and bail out if the session id is not
|
||||
# as expected but memcache doesn't seem to have any equivalent
|
||||
# capability.
|
||||
if (self in self.coord._acquired_locks and
|
||||
self.coord.client.delete(self.name, noreply=False)):
|
||||
self.coord._acquired_locks.remove(self)
|
||||
return True
|
||||
return False
|
||||
if self not in self.coord._acquired_locks:
|
||||
return False
|
||||
# Do a ghetto test to see what the value is... (see above note),
|
||||
# and how this really can't be done safely with memcache due to
|
||||
# it being done in the client side (non-atomic).
|
||||
value = self.coord.client.get(self.name)
|
||||
if value != self.coord._member_id:
|
||||
return False
|
||||
else:
|
||||
was_deleted = self.coord.client.delete(self.name, noreply=False)
|
||||
if was_deleted:
|
||||
self.coord._acquired_locks.remove(self)
|
||||
return was_deleted
|
||||
|
||||
@_translate_failures
|
||||
def heartbeat(self):
|
||||
|
||||
@@ -74,6 +74,10 @@ class RedisLock(locking.Lock):
|
||||
owner_tok = self._client.get(self.name)
|
||||
return owner_tok == lock_tok
|
||||
|
||||
def break_(self):
|
||||
with _translate_failures():
|
||||
return bool(self._client.delete(self.name))
|
||||
|
||||
def acquire(self, blocking=True):
|
||||
blocking, timeout = utils.convert_blocking(blocking)
|
||||
with _translate_failures():
|
||||
|
||||
@@ -79,6 +79,22 @@ class Lock(object):
|
||||
:rtype: bool
|
||||
"""
|
||||
|
||||
def break_(self):
|
||||
"""Forcefully release the lock.
|
||||
|
||||
This is mostly used for testing purposes, to simulate an out of
|
||||
band operation that breaks the lock. Backends may allow waiters to
|
||||
acquire immediately if a lock is broken, or they should raise an
|
||||
exception. Releasing should be successful for objects that believe
|
||||
they hold the lock but do not have the lock anymore. However,
|
||||
they should be careful not to re-break the lock by releasing it,
|
||||
since they may not be the holder anymore.
|
||||
|
||||
:returns: returns true if forcefully broken (false if not)
|
||||
or raises ``NotImplemented`` if not implemented.
|
||||
"""
|
||||
raise tooz.NotImplemented
|
||||
|
||||
@abc.abstractmethod
|
||||
def acquire(self, blocking=True):
|
||||
"""Attempts to acquire the lock.
|
||||
@@ -89,5 +105,4 @@ class Lock(object):
|
||||
number of seconds.
|
||||
:returns: returns true if acquired (false if not)
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
|
||||
@@ -105,6 +105,27 @@ class TestAPI(testscenarios.TestWithScenarios,
|
||||
all_group_ids = self._coord.get_groups().get()
|
||||
self.assertTrue(self.group_id in all_group_ids)
|
||||
|
||||
def test_get_lock_release_broken(self):
|
||||
name = self._get_random_uuid()
|
||||
memberid2 = self._get_random_uuid()
|
||||
coord2 = tooz.coordination.get_coordinator(self.url,
|
||||
memberid2)
|
||||
coord2.start()
|
||||
lock1 = self._coord.get_lock(name)
|
||||
lock2 = coord2.get_lock(name)
|
||||
self.assertTrue(lock1.acquire(blocking=False))
|
||||
self.assertFalse(lock2.acquire(blocking=False))
|
||||
self.assertTrue(lock2.break_())
|
||||
self.assertTrue(lock2.acquire(blocking=False))
|
||||
self.assertFalse(lock1.release())
|
||||
# Assert lock is not accidentally broken now
|
||||
memberid3 = self._get_random_uuid()
|
||||
coord3 = tooz.coordination.get_coordinator(self.url,
|
||||
memberid3)
|
||||
coord3.start()
|
||||
lock3 = coord3.get_lock(name)
|
||||
self.assertFalse(lock3.acquire(blocking=False))
|
||||
|
||||
def test_create_group_already_exist(self):
|
||||
self._coord.create_group(self.group_id).get()
|
||||
create_group = self._coord.create_group(self.group_id)
|
||||
|
||||
Reference in New Issue
Block a user