Merge "Make sure Lock.heartbeat() returns True/False"
This commit is contained in:
commit
03dbf70f70
@ -188,12 +188,14 @@ class EtcdLock(locking.Lock):
|
|||||||
poked = self.client.put(self._lock_url,
|
poked = self.client.put(self._lock_url,
|
||||||
data={"ttl": self.ttl,
|
data={"ttl": self.ttl,
|
||||||
"prevExist": "true"}, make_url=False)
|
"prevExist": "true"}, make_url=False)
|
||||||
errorcode = poked.get("errorCode")
|
|
||||||
if errorcode:
|
|
||||||
LOG.warning("Unable to heartbeat by updating key '%s' with "
|
|
||||||
"extended expiry of %s seconds: %d, %s", self.name,
|
|
||||||
self.ttl, errorcode, poked.get("message"))
|
|
||||||
self._node = poked['node']
|
self._node = poked['node']
|
||||||
|
errorcode = poked.get("errorCode")
|
||||||
|
if not errorcode:
|
||||||
|
return True
|
||||||
|
LOG.warning("Unable to heartbeat by updating key '%s' with "
|
||||||
|
"extended expiry of %s seconds: %d, %s", self.name,
|
||||||
|
self.ttl, errorcode, poked.get("message"))
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class EtcdDriver(coordination.CoordinationDriver):
|
class EtcdDriver(coordination.CoordinationDriver):
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
|
import threading
|
||||||
|
|
||||||
import etcd3
|
import etcd3
|
||||||
from etcd3 import exceptions as etcd3_exc
|
from etcd3 import exceptions as etcd3_exc
|
||||||
@ -61,6 +62,7 @@ class Etcd3Lock(locking.Lock):
|
|||||||
super(Etcd3Lock, self).__init__(name)
|
super(Etcd3Lock, self).__init__(name)
|
||||||
self._coord = coord
|
self._coord = coord
|
||||||
self._lock = coord.client.lock(name.decode(), timeout)
|
self._lock = coord.client.lock(name.decode(), timeout)
|
||||||
|
self._exclusive_access = threading.Lock()
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def acquire(self, blocking=True, shared=False):
|
def acquire(self, blocking=True, shared=False):
|
||||||
@ -83,14 +85,19 @@ class Etcd3Lock(locking.Lock):
|
|||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def release(self):
|
def release(self):
|
||||||
if self.acquired and self._lock.release():
|
with self._exclusive_access:
|
||||||
self._coord._acquired_locks.discard(self)
|
if self.acquired and self._lock.release():
|
||||||
return True
|
self._coord._acquired_locks.discard(self)
|
||||||
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
self._lock.refresh()
|
with self._exclusive_access:
|
||||||
|
if self.acquired:
|
||||||
|
self._lock.refresh()
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class Etcd3Driver(coordination.CoordinationDriver):
|
class Etcd3Driver(coordination.CoordinationDriver):
|
||||||
|
@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
from __future__ import absolute_import
|
from __future__ import absolute_import
|
||||||
import base64
|
import base64
|
||||||
|
import threading
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
import etcd3gw
|
import etcd3gw
|
||||||
@ -68,6 +69,7 @@ class Etcd3Lock(locking.Lock):
|
|||||||
self._key_b64 = base64.b64encode(self._key).decode("ascii")
|
self._key_b64 = base64.b64encode(self._key).decode("ascii")
|
||||||
self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii")
|
self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii")
|
||||||
self._lease = self._coord.client.lease(self._timeout)
|
self._lease = self._coord.client.lease(self._timeout)
|
||||||
|
self._exclusive_access = threading.Lock()
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def acquire(self, blocking=True, shared=False):
|
def acquire(self, blocking=True, shared=False):
|
||||||
@ -126,11 +128,12 @@ class Etcd3Lock(locking.Lock):
|
|||||||
}]
|
}]
|
||||||
}
|
}
|
||||||
|
|
||||||
result = self._coord.client.transaction(txn)
|
with self._exclusive_access:
|
||||||
success = result.get('succeeded', False)
|
result = self._coord.client.transaction(txn)
|
||||||
if success:
|
success = result.get('succeeded', False)
|
||||||
self._coord._acquired_locks.remove(self)
|
if success:
|
||||||
return True
|
self._coord._acquired_locks.remove(self)
|
||||||
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
@ -140,9 +143,17 @@ class Etcd3Lock(locking.Lock):
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def acquired(self):
|
||||||
|
return self in self._coord._acquired_locks
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
self._lease.refresh()
|
with self._exclusive_access:
|
||||||
|
if self.acquired:
|
||||||
|
self._lease.refresh()
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
class Etcd3Driver(coordination.CoordinationDriver):
|
class Etcd3Driver(coordination.CoordinationDriver):
|
||||||
|
@ -165,10 +165,12 @@ class MemcachedLock(locking.Lock):
|
|||||||
poked = self.coord.client.touch(self.name,
|
poked = self.coord.client.touch(self.name,
|
||||||
expire=self.timeout,
|
expire=self.timeout,
|
||||||
noreply=False)
|
noreply=False)
|
||||||
if not poked:
|
if poked:
|
||||||
LOG.warning("Unable to heartbeat by updating key '%s' with "
|
return True
|
||||||
"extended expiry of %s seconds", self.name,
|
LOG.warning("Unable to heartbeat by updating key '%s' with "
|
||||||
self.timeout)
|
"extended expiry of %s seconds", self.name,
|
||||||
|
self.timeout)
|
||||||
|
return False
|
||||||
|
|
||||||
@_translate_failures
|
@_translate_failures
|
||||||
def get_owner(self):
|
def get_owner(self):
|
||||||
|
@ -109,6 +109,8 @@ class RedisLock(locking.Lock):
|
|||||||
if self.acquired:
|
if self.acquired:
|
||||||
with _translate_failures():
|
with _translate_failures():
|
||||||
self._lock.extend(self._lock.timeout)
|
self._lock.extend(self._lock.timeout)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def acquired(self):
|
def acquired(self):
|
||||||
|
@ -712,6 +712,12 @@ class TestAPI(tests.TestWithCoordinator):
|
|||||||
with lock:
|
with lock:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def test_heartbeat_lock_not_acquired(self):
|
||||||
|
lock = self._coord.get_lock(tests.get_random_uuid())
|
||||||
|
# Not all locks need heartbeat
|
||||||
|
if hasattr(lock, "heartbeat"):
|
||||||
|
self.assertFalse(lock.heartbeat())
|
||||||
|
|
||||||
def test_get_shared_lock(self):
|
def test_get_shared_lock(self):
|
||||||
lock = self._coord.get_lock(tests.get_random_uuid())
|
lock = self._coord.get_lock(tests.get_random_uuid())
|
||||||
self.assertTrue(lock.acquire(shared=True))
|
self.assertTrue(lock.acquire(shared=True))
|
||||||
|
Loading…
Reference in New Issue
Block a user