Merge "Use a thread-identifier that can't easily be recycled"
This commit is contained in:
@@ -26,11 +26,15 @@ import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_utils import importutils
|
||||
import six
|
||||
|
||||
from taskflow import logging
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import threading_utils as tu
|
||||
|
||||
# Used for the reader-writer lock get the right thread 'hack' (needed below).
|
||||
eventlet = importutils.try_import('eventlet')
|
||||
eventlet_patcher = importutils.try_import('eventlet.patcher')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -113,11 +117,25 @@ class ReaderWriterLock(object):
|
||||
WRITER = 'w'
|
||||
READER = 'r'
|
||||
|
||||
@staticmethod
|
||||
def _fetch_current_thread_functor():
|
||||
# Until https://github.com/eventlet/eventlet/issues/172 is resolved
|
||||
# or addressed we have to use complicated workaround to get a object
|
||||
# that will not be recycled; the usage of threading.current_thread()
|
||||
# doesn't appear to currently be monkey patched and therefore isn't
|
||||
# reliable to use (and breaks badly when used as all threads share
|
||||
# the same current_thread() object)...
|
||||
if eventlet is not None and eventlet_patcher is not None:
|
||||
if eventlet_patcher.is_monkey_patched('thread'):
|
||||
return lambda: eventlet.getcurrent()
|
||||
return lambda: threading.current_thread()
|
||||
|
||||
def __init__(self):
|
||||
self._writer = None
|
||||
self._pending_writers = collections.deque()
|
||||
self._readers = collections.deque()
|
||||
self._cond = threading.Condition()
|
||||
self._current_thread = self._fetch_current_thread_functor()
|
||||
|
||||
@property
|
||||
def has_pending_writers(self):
|
||||
@@ -132,7 +150,7 @@ class ReaderWriterLock(object):
|
||||
"""Returns if the caller is the active writer or a pending writer."""
|
||||
self._cond.acquire()
|
||||
try:
|
||||
me = tu.get_ident()
|
||||
me = self._current_thread()
|
||||
if self._writer is not None and self._writer == me:
|
||||
return True
|
||||
if check_pending:
|
||||
@@ -159,7 +177,7 @@ class ReaderWriterLock(object):
|
||||
"""Returns if the caller is one of the readers."""
|
||||
self._cond.acquire()
|
||||
try:
|
||||
return tu.get_ident() in self._readers
|
||||
return self._current_thread() in self._readers
|
||||
finally:
|
||||
self._cond.release()
|
||||
|
||||
@@ -172,7 +190,7 @@ class ReaderWriterLock(object):
|
||||
Raises a RuntimeError if an active or pending writer tries to acquire
|
||||
a read lock.
|
||||
"""
|
||||
me = tu.get_ident()
|
||||
me = self._current_thread()
|
||||
if self.is_writer():
|
||||
raise RuntimeError("Writer %s can not acquire a read lock"
|
||||
" while holding/waiting for the write lock"
|
||||
@@ -210,7 +228,7 @@ class ReaderWriterLock(object):
|
||||
|
||||
Raises a RuntimeError if an active reader attempts to acquire a lock.
|
||||
"""
|
||||
me = tu.get_ident()
|
||||
me = self._current_thread()
|
||||
if self.is_reader():
|
||||
raise RuntimeError("Reader %s to writer privilege"
|
||||
" escalation not allowed" % me)
|
||||
|
||||
Reference in New Issue
Block a user