Allow for providing a customized semaphore container
It can be undesirable to have a globally shared sempahore container, especially since oslo.concurrency can now be shared among many disjoint applications and libraries. When a single container is used it is now possible to have those disjoint applications/libraries collide on the same sempahore names. This is not a good pattern to continue with, so in order to move away from it allow a custom container to be provided (which defaults to the existing global one) so that users of oslo.concurrency may provide there own container if they so desire. Change-Id: I9aab42e21ba0f52997de3e7c9b0fea51db5c7289
This commit is contained in:
@@ -185,8 +185,42 @@ else:
|
||||
import fcntl
|
||||
InterProcessLock = _FcntlLock
|
||||
|
||||
_semaphores = weakref.WeakValueDictionary()
|
||||
_semaphores_lock = threading.Lock()
|
||||
|
||||
class Semaphores(object):
|
||||
"""A garbage collected container of semaphores.
|
||||
|
||||
This collection internally uses a weak value dictionary so that when a
|
||||
semaphore is no longer in use (by any threads) it will automatically be
|
||||
removed from this container by the garbage collector.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._semaphores = weakref.WeakValueDictionary()
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def get(self, name):
|
||||
"""Gets (or creates) a semaphore with a given name.
|
||||
|
||||
:param name: The semaphore name to get/create (used to associate
|
||||
previously created names with the same semaphore).
|
||||
|
||||
Returns an newly constructed semaphore (or an existing one if it was
|
||||
already created for the given name).
|
||||
"""
|
||||
with self._lock:
|
||||
try:
|
||||
return self._semaphores[name]
|
||||
except KeyError:
|
||||
sem = threading.Semaphore()
|
||||
self._semaphores[name] = sem
|
||||
return sem
|
||||
|
||||
def __len__(self):
|
||||
"""Returns how many semaphores exist at the current time."""
|
||||
return len(self._semaphores)
|
||||
|
||||
|
||||
_semaphores = Semaphores()
|
||||
|
||||
|
||||
def _get_lock_path(name, lock_file_prefix, lock_path=None):
|
||||
@@ -211,11 +245,12 @@ def external_lock(name, lock_file_prefix=None, lock_path=None):
|
||||
return InterProcessLock(lock_file_path)
|
||||
|
||||
|
||||
def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None):
|
||||
def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None,
|
||||
semaphores=None):
|
||||
"""Remove an external lock file when it's not used anymore
|
||||
This will be helpful when we have a lot of lock files
|
||||
"""
|
||||
with internal_lock(name):
|
||||
with internal_lock(name, semaphores=semaphores):
|
||||
lock_file_path = _get_lock_path(name, lock_file_prefix, lock_path)
|
||||
try:
|
||||
os.remove(lock_file_path)
|
||||
@@ -224,20 +259,15 @@ def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None):
|
||||
{'file': lock_file_path})
|
||||
|
||||
|
||||
def internal_lock(name):
|
||||
with _semaphores_lock:
|
||||
try:
|
||||
sem = _semaphores[name]
|
||||
except KeyError:
|
||||
sem = threading.Semaphore()
|
||||
_semaphores[name] = sem
|
||||
|
||||
return sem
|
||||
def internal_lock(name, semaphores=None):
|
||||
if semaphores is None:
|
||||
semaphores = _semaphores
|
||||
return semaphores.get(name)
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
do_log=True):
|
||||
do_log=True, semaphores=None):
|
||||
"""Context based lock
|
||||
|
||||
This function yields a `threading.Semaphore` instance (if we don't use
|
||||
@@ -259,8 +289,13 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
:param do_log: Whether to log acquire/release messages. This is primarily
|
||||
intended to reduce log message duplication when `lock` is used from the
|
||||
`synchronized` decorator.
|
||||
|
||||
:param semaphores: Container that provides semaphores to use when locking.
|
||||
This ensures that threads inside the same application can not collide,
|
||||
due to the fact that external process locks are unaware of a processes
|
||||
active threads.
|
||||
"""
|
||||
int_lock = internal_lock(name)
|
||||
int_lock = internal_lock(name, semaphores=semaphores)
|
||||
with int_lock:
|
||||
if do_log:
|
||||
LOG.debug('Acquired semaphore "%(lock)s"', {'lock': name})
|
||||
@@ -276,7 +311,8 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
LOG.debug('Releasing semaphore "%(lock)s"', {'lock': name})
|
||||
|
||||
|
||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
|
||||
semaphores=None):
|
||||
"""Synchronization decorator.
|
||||
|
||||
Decorating a method like so::
|
||||
@@ -307,7 +343,7 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None):
|
||||
t2 = None
|
||||
try:
|
||||
with lock(name, lock_file_prefix, external, lock_path,
|
||||
do_log=False):
|
||||
do_log=False, semaphores=semaphores):
|
||||
t2 = time.time()
|
||||
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
|
||||
'waited %(wait_secs)0.3fs',
|
||||
|
@@ -12,6 +12,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import errno
|
||||
import fcntl
|
||||
import multiprocessing
|
||||
@@ -93,6 +94,30 @@ class LockTestCase(test_base.BaseTestCase):
|
||||
except IOError:
|
||||
pass
|
||||
|
||||
def test_lock_internally_different_collections(self):
|
||||
s1 = lockutils.Semaphores()
|
||||
s2 = lockutils.Semaphores()
|
||||
trigger = threading.Event()
|
||||
who_ran = collections.deque()
|
||||
|
||||
def f(name, semaphores, pull_trigger):
|
||||
with lockutils.internal_lock('testing', semaphores=semaphores):
|
||||
if pull_trigger:
|
||||
trigger.set()
|
||||
else:
|
||||
trigger.wait()
|
||||
who_ran.append(name)
|
||||
|
||||
threads = [
|
||||
threading.Thread(target=f, args=(1, s1, True)),
|
||||
threading.Thread(target=f, args=(2, s2, False)),
|
||||
]
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
self.assertEqual([1, 2], sorted(who_ran))
|
||||
|
||||
def test_lock_internally(self):
|
||||
"""We can lock across multiple threads."""
|
||||
saved_sem_num = len(lockutils._semaphores)
|
||||
|
Reference in New Issue
Block a user