Merge "redis: make sure we don't release and heartbeat a lock at the same time"
This commit is contained in:
@@ -20,6 +20,7 @@ import contextlib
|
|||||||
from distutils import version
|
from distutils import version
|
||||||
import logging
|
import logging
|
||||||
import string
|
import string
|
||||||
|
import threading
|
||||||
|
|
||||||
from concurrent import futures
|
from concurrent import futures
|
||||||
from oslo_utils import encodeutils
|
from oslo_utils import encodeutils
|
||||||
@@ -58,6 +59,9 @@ class RedisLock(locking.Lock):
|
|||||||
def __init__(self, coord, client, name, timeout):
|
def __init__(self, coord, client, name, timeout):
|
||||||
name = "%s_%s_lock" % (coord.namespace, six.text_type(name))
|
name = "%s_%s_lock" % (coord.namespace, six.text_type(name))
|
||||||
super(RedisLock, self).__init__(name)
|
super(RedisLock, self).__init__(name)
|
||||||
|
# NOTE(jd) Make sure we don't release and heartbeat at the same time by
|
||||||
|
# using a exclusive access lock (LP#1557593)
|
||||||
|
self._exclusive_access = threading.Lock()
|
||||||
self._lock = client.lock(name,
|
self._lock = client.lock(name,
|
||||||
timeout=timeout,
|
timeout=timeout,
|
||||||
thread_local=False)
|
thread_local=False)
|
||||||
@@ -86,20 +90,22 @@ class RedisLock(locking.Lock):
|
|||||||
return acquired
|
return acquired
|
||||||
|
|
||||||
def release(self):
|
def release(self):
|
||||||
if not self.acquired:
|
with self._exclusive_access:
|
||||||
return False
|
if not self.acquired:
|
||||||
with _translate_failures():
|
|
||||||
try:
|
|
||||||
self._lock.release()
|
|
||||||
except exceptions.LockError:
|
|
||||||
return False
|
return False
|
||||||
self._coord._acquired_locks.discard(self)
|
with _translate_failures():
|
||||||
return True
|
try:
|
||||||
|
self._lock.release()
|
||||||
|
except exceptions.LockError:
|
||||||
|
return False
|
||||||
|
self._coord._acquired_locks.discard(self)
|
||||||
|
return True
|
||||||
|
|
||||||
def heartbeat(self):
|
def heartbeat(self):
|
||||||
if self.acquired:
|
with self._exclusive_access:
|
||||||
with _translate_failures():
|
if self.acquired:
|
||||||
self._lock.extend(self._lock.timeout)
|
with _translate_failures():
|
||||||
|
self._lock.extend(self._lock.timeout)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def acquired(self):
|
def acquired(self):
|
||||||
|
|||||||
Reference in New Issue
Block a user