Allow the lock delay to be provided
When a lock can't be acquired there is currently a hard coded delay (0.01) that is used before trying again, instead of having a hard coded delay we should allow this delay to be configured since having it set at a hard coded value can limit concurrency (if the delay is actually way to high) or cause to much contention (if the delay is actually way to low). This review adds on that logic and also uses the retrying library to perform the acquisition attempts (and associated failures when/if they occur); as well as shows logs after a given amount of time has elapsed with the logs being output at a given periodicity. Change-Id: Ideeefba1439ddd677c608d01becb4f6a0d4bc83d
This commit is contained in:
parent
3bda65ccbb
commit
46c836ee28
@ -28,6 +28,7 @@ import weakref
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.config import cfgfilter
|
||||
import retrying
|
||||
import six
|
||||
|
||||
from oslo_concurrency._i18n import _, _LE, _LI
|
||||
@ -64,6 +65,86 @@ def set_defaults(lock_path):
|
||||
cfg.set_defaults(_opts, lock_path=lock_path)
|
||||
|
||||
|
||||
class _Hourglass(object):
|
||||
"""A hourglass like periodic timer."""
|
||||
|
||||
def __init__(self, period):
|
||||
self._period = period
|
||||
self._last_flipped = None
|
||||
|
||||
def flip(self):
|
||||
"""Flips the hourglass.
|
||||
|
||||
The drain() method will now only return true until the period
|
||||
is reached again.
|
||||
"""
|
||||
self._last_flipped = time.time()
|
||||
|
||||
def drain(self):
|
||||
"""Drains the hourglass, returns True if period reached."""
|
||||
if self._last_flipped is None:
|
||||
return True
|
||||
else:
|
||||
elapsed = max(0, time.time() - self._last_flipped)
|
||||
return elapsed >= self._period
|
||||
|
||||
|
||||
def _lock_retry(delay, filename,
|
||||
# These parameters trigger logging to begin after a certain
|
||||
# amount of time has elapsed where the lock couldn't be
|
||||
# acquired (log statements will be emitted after that duration
|
||||
# at the provided periodicity).
|
||||
log_begins_after=1.0, log_periodicity=0.5):
|
||||
"""Retry logic that acquiring a lock will go through."""
|
||||
|
||||
# If this returns True, a retry attempt will occur (using the defined
|
||||
# retry policy we have requested the retrying library to apply), if it
|
||||
# returns False then the original exception will be re-raised (if it
|
||||
# raises a new or different exception the original exception will be
|
||||
# replaced with that one and raised).
|
||||
def retry_on_exception(e):
|
||||
if isinstance(e, IOError) and e.errno in (errno.EACCES, errno.EAGAIN):
|
||||
return True
|
||||
raise threading.ThreadError(_("Unable to acquire lock on"
|
||||
" `%(filename)s` due to"
|
||||
" %(exception)s") %
|
||||
{
|
||||
'filename': filename,
|
||||
'exception': e,
|
||||
})
|
||||
|
||||
# Logs all attempts (with information about how long we have been trying
|
||||
# to acquire the underlying lock...); after a threshold has been passed,
|
||||
# and only at a fixed rate...
|
||||
def never_stop(hg, attempt_number, delay_since_first_attempt_ms):
|
||||
delay_since_first_attempt = delay_since_first_attempt_ms / 1000.0
|
||||
if delay_since_first_attempt >= log_begins_after:
|
||||
if hg.drain():
|
||||
LOG.debug("Attempting to acquire %s (delayed %0.2f seconds)",
|
||||
filename, delay_since_first_attempt)
|
||||
hg.flip()
|
||||
return False
|
||||
|
||||
# The retrying library seems to prefer milliseconds for some reason; this
|
||||
# might be changed in (see: https://github.com/rholder/retrying/issues/6)
|
||||
# someday in the future...
|
||||
delay_ms = delay * 1000.0
|
||||
|
||||
def decorator(func):
|
||||
|
||||
@six.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
hg = _Hourglass(log_periodicity)
|
||||
r = retrying.Retrying(wait_fixed=delay_ms,
|
||||
retry_on_exception=retry_on_exception,
|
||||
stop_func=functools.partial(never_stop, hg))
|
||||
return r.call(func, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
class _FileLock(object):
|
||||
"""Lock implementation which allows multiple locks, working around
|
||||
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
|
||||
@ -87,9 +168,11 @@ class _FileLock(object):
|
||||
self.fname = name
|
||||
self.acquire_time = None
|
||||
|
||||
def acquire(self):
|
||||
basedir = os.path.dirname(self.fname)
|
||||
def acquire(self, delay=0.01):
|
||||
if delay < 0:
|
||||
raise ValueError("Delay must be greater than or equal to zero")
|
||||
|
||||
basedir = os.path.dirname(self.fname)
|
||||
if not os.path.exists(basedir):
|
||||
fileutils.ensure_tree(basedir)
|
||||
LOG.info(_LI('Created lock path: %s'), basedir)
|
||||
@ -98,32 +181,20 @@ class _FileLock(object):
|
||||
# the target file. This eliminates the possibility of an attacker
|
||||
# creating a symlink to an important file in our lock_path.
|
||||
self.lockfile = open(self.fname, 'a')
|
||||
|
||||
start_time = time.time()
|
||||
while True:
|
||||
try:
|
||||
# Using non-blocking locks since green threads are not
|
||||
# patched to deal with blocking locking calls.
|
||||
# Also upon reading the MSDN docs for locking(), it seems
|
||||
# to have a laughable 10 attempts "blocking" mechanism.
|
||||
self.trylock()
|
||||
self.acquire_time = time.time()
|
||||
LOG.debug('Acquired file lock "%s" after waiting %0.3fs',
|
||||
self.fname, (self.acquire_time - start_time))
|
||||
return True
|
||||
except IOError as e:
|
||||
if e.errno in (errno.EACCES, errno.EAGAIN):
|
||||
# external locks synchronise things like iptables
|
||||
# updates - give it some time to prevent busy spinning
|
||||
time.sleep(0.01)
|
||||
else:
|
||||
raise threading.ThreadError(_("Unable to acquire lock on"
|
||||
" `%(filename)s` due to"
|
||||
" %(exception)s") %
|
||||
{
|
||||
'filename': self.fname,
|
||||
'exception': e,
|
||||
})
|
||||
|
||||
# Using non-blocking locks (with retries) since green threads are not
|
||||
# patched to deal with blocking locking calls. Also upon reading the
|
||||
# MSDN docs for locking(), it seems to have a 'laughable' 10
|
||||
# attempts "blocking" mechanism.
|
||||
do_acquire = _lock_retry(delay=delay,
|
||||
filename=self.fname)(self.trylock)
|
||||
do_acquire()
|
||||
self.acquire_time = time.time()
|
||||
LOG.debug('Acquired file lock "%s" after waiting %0.3fs',
|
||||
self.fname, (self.acquire_time - start_time))
|
||||
|
||||
return True
|
||||
|
||||
def __enter__(self):
|
||||
self.acquire()
|
||||
@ -267,7 +338,7 @@ def internal_lock(name, semaphores=None):
|
||||
|
||||
@contextlib.contextmanager
|
||||
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
do_log=True, semaphores=None):
|
||||
do_log=True, semaphores=None, delay=0.01):
|
||||
"""Context based lock
|
||||
|
||||
This function yields a `threading.Semaphore` instance (if we don't use
|
||||
@ -294,6 +365,8 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
This ensures that threads inside the same application can not collide,
|
||||
due to the fact that external process locks are unaware of a processes
|
||||
active threads.
|
||||
|
||||
:param delay: Delay between acquisition attempts (in seconds).
|
||||
"""
|
||||
int_lock = internal_lock(name, semaphores=semaphores)
|
||||
with int_lock:
|
||||
@ -302,8 +375,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
try:
|
||||
if external and not CONF.oslo_concurrency.disable_process_locking:
|
||||
ext_lock = external_lock(name, lock_file_prefix, lock_path)
|
||||
with ext_lock:
|
||||
ext_lock.acquire(delay=delay)
|
||||
try:
|
||||
yield ext_lock
|
||||
finally:
|
||||
ext_lock.release()
|
||||
else:
|
||||
yield int_lock
|
||||
finally:
|
||||
@ -312,7 +388,7 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
|
||||
|
||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
semaphores=None):
|
||||
semaphores=None, delay=0.01):
|
||||
"""Synchronization decorator.
|
||||
|
||||
Decorating a method like so::
|
||||
@ -343,7 +419,7 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
t2 = None
|
||||
try:
|
||||
with lock(name, lock_file_prefix, external, lock_path,
|
||||
do_log=False, semaphores=semaphores):
|
||||
do_log=False, semaphores=semaphores, delay=delay):
|
||||
t2 = time.time()
|
||||
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
||||
'waited %(wait_secs)0.3fs',
|
||||
|
@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0
|
||||
oslo.utils>=1.0.0 # Apache-2.0
|
||||
posix_ipc
|
||||
six>=1.7.0
|
||||
retrying>=1.2.2,!=1.3.0 # Apache-2.0
|
||||
|
@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0
|
||||
oslo.utils>=1.0.0 # Apache-2.0
|
||||
posix_ipc
|
||||
six>=1.7.0
|
||||
retrying>=1.2.2,!=1.3.0 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user