diff --git a/tooz/drivers/etcd.py b/tooz/drivers/etcd.py index 2765ac7..1bd94ea 100644 --- a/tooz/drivers/etcd.py +++ b/tooz/drivers/etcd.py @@ -35,6 +35,11 @@ def _translate_failures(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) + except ValueError as e: + # Typically json decoding failed for some reason. + coordination.raise_with_cause(coordination.ToozError, + encodeutils.exception_to_unicode(e), + cause=e) except requests.exceptions.RequestException as e: coordination.raise_with_cause(coordination.ToozConnectionError, encodeutils.exception_to_unicode(e), @@ -87,6 +92,12 @@ class EtcdLock(locking.Lock): self.lock = None self.ttl = ttl self._lock_url = lock_url + self._node = None + + @_translate_failures + def break_(self): + reply = self.client.delete(self._lock_url, make_url=False) + return reply.get('errorCode') is None def acquire(self, blocking=True): blocking, timeout = utils.convert_blocking(blocking) @@ -110,6 +121,7 @@ class EtcdLock(locking.Lock): # We got the lock! if reply.get("errorCode") is None: + self._node = reply['node'] self.coord._acquired_locks.append(self) return True @@ -131,10 +143,17 @@ class EtcdLock(locking.Lock): @_translate_failures def release(self): if self in self.coord._acquired_locks: - reply = self.client.delete(self._lock_url, make_url=False) - if reply.get("errorCode") is None: + lock_url = self._lock_url + lock_url += "?prevIndex=%s" % self._node['modifiedIndex'] + reply = self.client.delete(lock_url, make_url=False) + errorcode = reply.get("errorCode") + if errorcode is None: self.coord._acquired_locks.remove(self) + self._node = None return True + else: + LOG.warn("Unable to release '%s' due to %d, %s", + self.name, errorcode, reply.get('message')) return False @_translate_failures diff --git a/tooz/drivers/ipc.py b/tooz/drivers/ipc.py index fa2e139..2b18cca 100644 --- a/tooz/drivers/ipc.py +++ b/tooz/drivers/ipc.py @@ -64,6 +64,15 @@ class IPCLock(locking.Lock): self.key = ftok(name, self._LOCK_PROJECT) self._lock = None + def break_(self): + try: + lock = sysv_ipc.Semaphore(key=self.key) + lock.remove() + except sysv_ipc.ExistentialError: + return False + else: + return True + def acquire(self, blocking=True): if (blocking is not True and sysv_ipc.SEMAPHORE_TIMEOUT_SUPPORTED is False): @@ -112,8 +121,12 @@ class IPCLock(locking.Lock): def release(self): if self._lock is not None: - self._lock.remove() - self._lock = None + try: + self._lock.remove() + except sysv_ipc.ExistentialError: + return False + finally: + self._lock = None return True return False diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index beab3d8..04eb393 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -103,6 +103,10 @@ class MemcachedLock(locking.Lock): self.acquired = gotten = _acquire() return gotten + @_translate_failures + def break_(self): + return bool(self.coord.client.delete(self.name, noreply=False)) + @_translate_failures def release(self): if not self.acquired: @@ -133,11 +137,19 @@ class MemcachedLock(locking.Lock): # id and then do the delete and bail out if the session id is not # as expected but memcache doesn't seem to have any equivalent # capability. - if (self in self.coord._acquired_locks and - self.coord.client.delete(self.name, noreply=False)): - self.coord._acquired_locks.remove(self) - return True - return False + if self not in self.coord._acquired_locks: + return False + # Do a ghetto test to see what the value is... (see above note), + # and how this really can't be done safely with memcache due to + # it being done in the client side (non-atomic). + value = self.coord.client.get(self.name) + if value != self.coord._member_id: + return False + else: + was_deleted = self.coord.client.delete(self.name, noreply=False) + if was_deleted: + self.coord._acquired_locks.remove(self) + return was_deleted @_translate_failures def heartbeat(self): diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index 5e94ec5..269c74d 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -74,6 +74,10 @@ class RedisLock(locking.Lock): owner_tok = self._client.get(self.name) return owner_tok == lock_tok + def break_(self): + with _translate_failures(): + return bool(self._client.delete(self.name)) + def acquire(self, blocking=True): blocking, timeout = utils.convert_blocking(blocking) with _translate_failures(): diff --git a/tooz/locking.py b/tooz/locking.py index c611c44..51b4c09 100644 --- a/tooz/locking.py +++ b/tooz/locking.py @@ -79,6 +79,22 @@ class Lock(object): :rtype: bool """ + def break_(self): + """Forcefully release the lock. + + This is mostly used for testing purposes, to simulate an out of + band operation that breaks the lock. Backends may allow waiters to + acquire immediately if a lock is broken, or they should raise an + exception. Releasing should be successful for objects that believe + they hold the lock but do not have the lock anymore. However, + they should be careful not to re-break the lock by releasing it, + since they may not be the holder anymore. + + :returns: returns true if forcefully broken (false if not) + or raises ``NotImplemented`` if not implemented. + """ + raise tooz.NotImplemented + @abc.abstractmethod def acquire(self, blocking=True): """Attempts to acquire the lock. @@ -89,5 +105,4 @@ class Lock(object): number of seconds. :returns: returns true if acquired (false if not) :rtype: bool - """ diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index d302f51..4ef1108 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -105,6 +105,27 @@ class TestAPI(testscenarios.TestWithScenarios, all_group_ids = self._coord.get_groups().get() self.assertTrue(self.group_id in all_group_ids) + def test_get_lock_release_broken(self): + name = self._get_random_uuid() + memberid2 = self._get_random_uuid() + coord2 = tooz.coordination.get_coordinator(self.url, + memberid2) + coord2.start() + lock1 = self._coord.get_lock(name) + lock2 = coord2.get_lock(name) + self.assertTrue(lock1.acquire(blocking=False)) + self.assertFalse(lock2.acquire(blocking=False)) + self.assertTrue(lock2.break_()) + self.assertTrue(lock2.acquire(blocking=False)) + self.assertFalse(lock1.release()) + # Assert lock is not accidentally broken now + memberid3 = self._get_random_uuid() + coord3 = tooz.coordination.get_coordinator(self.url, + memberid3) + coord3.start() + lock3 = coord3.get_lock(name) + self.assertFalse(lock3.acquire(blocking=False)) + def test_create_group_already_exist(self): self._coord.create_group(self.group_id).get() create_group = self._coord.create_group(self.group_id)