diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py index 7812db8..ae1c498 100644 --- a/oslo_concurrency/lockutils.py +++ b/oslo_concurrency/lockutils.py @@ -28,6 +28,7 @@ import weakref from oslo.config import cfg from oslo.config import cfgfilter +import retrying import six from oslo_concurrency._i18n import _, _LE, _LI @@ -64,6 +65,86 @@ def set_defaults(lock_path): cfg.set_defaults(_opts, lock_path=lock_path) +class _Hourglass(object): + """A hourglass like periodic timer.""" + + def __init__(self, period): + self._period = period + self._last_flipped = None + + def flip(self): + """Flips the hourglass. + + The drain() method will now only return true until the period + is reached again. + """ + self._last_flipped = time.time() + + def drain(self): + """Drains the hourglass, returns True if period reached.""" + if self._last_flipped is None: + return True + else: + elapsed = max(0, time.time() - self._last_flipped) + return elapsed >= self._period + + +def _lock_retry(delay, filename, + # These parameters trigger logging to begin after a certain + # amount of time has elapsed where the lock couldn't be + # acquired (log statements will be emitted after that duration + # at the provided periodicity). + log_begins_after=1.0, log_periodicity=0.5): + """Retry logic that acquiring a lock will go through.""" + + # If this returns True, a retry attempt will occur (using the defined + # retry policy we have requested the retrying library to apply), if it + # returns False then the original exception will be re-raised (if it + # raises a new or different exception the original exception will be + # replaced with that one and raised). + def retry_on_exception(e): + if isinstance(e, IOError) and e.errno in (errno.EACCES, errno.EAGAIN): + return True + raise threading.ThreadError(_("Unable to acquire lock on" + " `%(filename)s` due to" + " %(exception)s") % + { + 'filename': filename, + 'exception': e, + }) + + # Logs all attempts (with information about how long we have been trying + # to acquire the underlying lock...); after a threshold has been passed, + # and only at a fixed rate... + def never_stop(hg, attempt_number, delay_since_first_attempt_ms): + delay_since_first_attempt = delay_since_first_attempt_ms / 1000.0 + if delay_since_first_attempt >= log_begins_after: + if hg.drain(): + LOG.debug("Attempting to acquire %s (delayed %0.2f seconds)", + filename, delay_since_first_attempt) + hg.flip() + return False + + # The retrying library seems to prefer milliseconds for some reason; this + # might be changed in (see: https://github.com/rholder/retrying/issues/6) + # someday in the future... + delay_ms = delay * 1000.0 + + def decorator(func): + + @six.wraps(func) + def wrapper(*args, **kwargs): + hg = _Hourglass(log_periodicity) + r = retrying.Retrying(wait_fixed=delay_ms, + retry_on_exception=retry_on_exception, + stop_func=functools.partial(never_stop, hg)) + return r.call(func, *args, **kwargs) + + return wrapper + + return decorator + + class _FileLock(object): """Lock implementation which allows multiple locks, working around issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does @@ -87,9 +168,11 @@ class _FileLock(object): self.fname = name self.acquire_time = None - def acquire(self): - basedir = os.path.dirname(self.fname) + def acquire(self, delay=0.01): + if delay < 0: + raise ValueError("Delay must be greater than or equal to zero") + basedir = os.path.dirname(self.fname) if not os.path.exists(basedir): fileutils.ensure_tree(basedir) LOG.info(_LI('Created lock path: %s'), basedir) @@ -98,32 +181,20 @@ class _FileLock(object): # the target file. This eliminates the possibility of an attacker # creating a symlink to an important file in our lock_path. self.lockfile = open(self.fname, 'a') - start_time = time.time() - while True: - try: - # Using non-blocking locks since green threads are not - # patched to deal with blocking locking calls. - # Also upon reading the MSDN docs for locking(), it seems - # to have a laughable 10 attempts "blocking" mechanism. - self.trylock() - self.acquire_time = time.time() - LOG.debug('Acquired file lock "%s" after waiting %0.3fs', - self.fname, (self.acquire_time - start_time)) - return True - except IOError as e: - if e.errno in (errno.EACCES, errno.EAGAIN): - # external locks synchronise things like iptables - # updates - give it some time to prevent busy spinning - time.sleep(0.01) - else: - raise threading.ThreadError(_("Unable to acquire lock on" - " `%(filename)s` due to" - " %(exception)s") % - { - 'filename': self.fname, - 'exception': e, - }) + + # Using non-blocking locks (with retries) since green threads are not + # patched to deal with blocking locking calls. Also upon reading the + # MSDN docs for locking(), it seems to have a 'laughable' 10 + # attempts "blocking" mechanism. + do_acquire = _lock_retry(delay=delay, + filename=self.fname)(self.trylock) + do_acquire() + self.acquire_time = time.time() + LOG.debug('Acquired file lock "%s" after waiting %0.3fs', + self.fname, (self.acquire_time - start_time)) + + return True def __enter__(self): self.acquire() @@ -267,7 +338,7 @@ def internal_lock(name, semaphores=None): @contextlib.contextmanager def lock(name, lock_file_prefix=None, external=False, lock_path=None, - do_log=True, semaphores=None): + do_log=True, semaphores=None, delay=0.01): """Context based lock This function yields a `threading.Semaphore` instance (if we don't use @@ -294,6 +365,8 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, This ensures that threads inside the same application can not collide, due to the fact that external process locks are unaware of a processes active threads. + + :param delay: Delay between acquisition attempts (in seconds). """ int_lock = internal_lock(name, semaphores=semaphores) with int_lock: @@ -302,8 +375,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, try: if external and not CONF.oslo_concurrency.disable_process_locking: ext_lock = external_lock(name, lock_file_prefix, lock_path) - with ext_lock: + ext_lock.acquire(delay=delay) + try: yield ext_lock + finally: + ext_lock.release() else: yield int_lock finally: @@ -312,7 +388,7 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None, def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, - semaphores=None): + semaphores=None, delay=0.01): """Synchronization decorator. Decorating a method like so:: @@ -343,7 +419,7 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None, t2 = None try: with lock(name, lock_file_prefix, external, lock_path, - do_log=False, semaphores=semaphores): + do_log=False, semaphores=semaphores, delay=delay): t2 = time.time() LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: ' 'waited %(wait_secs)0.3fs', diff --git a/requirements-py3.txt b/requirements-py3.txt index b1a8722..a27b434 100644 --- a/requirements-py3.txt +++ b/requirements-py3.txt @@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0 oslo.utils>=1.0.0 # Apache-2.0 posix_ipc six>=1.7.0 +retrying>=1.2.2,!=1.3.0 # Apache-2.0 diff --git a/requirements.txt b/requirements.txt index b1a8722..a27b434 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0 oslo.utils>=1.0.0 # Apache-2.0 posix_ipc six>=1.7.0 +retrying>=1.2.2,!=1.3.0 # Apache-2.0