From 0186f4a6481fe6e8efb11e6c0ea6eb86d33e0b67 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Fri, 13 Dec 2024 13:48:52 -0500 Subject: [PATCH] A PipeMutex that works with asyncio eventlet hub. Change-Id: I20f9d2e01b95475371876d3a28d1869b8ad85994 --- .zuul.yaml | 11 +++ oslo_log/pipe_mutex.py | 207 ++++++++++++++++++++++++++++++++--------- test-requirements.txt | 2 +- tox.ini | 5 +- 4 files changed, 179 insertions(+), 46 deletions(-) diff --git a/.zuul.yaml b/.zuul.yaml index 5bdfee55..9e3e5069 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -46,6 +46,15 @@ vars: tox_environment: OSLO_LOG_TEST_EVENTLET: 1 + EVENTLET_HUB: epolls + +- job: + name: oslo.log-tox-py312-eventlet-patched-asyncio + parent: openstack-tox-py312 + vars: + tox_environment: + OSLO_LOG_TEST_EVENTLET: 1 + EVENTLET_HUB: asyncio - project: check: @@ -53,10 +62,12 @@ - oslo.log-src-grenade - oslo.log-jsonformatter - oslo.log-tox-py312-eventlet-patched + - oslo.log-tox-py312-eventlet-patched-asyncio gate: jobs: - oslo.log-jsonformatter - oslo.log-tox-py312-eventlet-patched + - oslo.log-tox-py312-eventlet-patched-asyncio templates: - check-requirements - lib-forward-testing-python3 diff --git a/oslo_log/pipe_mutex.py b/oslo_log/pipe_mutex.py index 3b3a09a4..2003ed09 100644 --- a/oslo_log/pipe_mutex.py +++ b/oslo_log/pipe_mutex.py @@ -13,53 +13,34 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio +from asyncio.exceptions import TimeoutError as AsyncioTimeoutError import errno import fcntl +import importlib.metadata import eventlet +import eventlet.asyncio import eventlet.debug import eventlet.greenthread import eventlet.hubs +import eventlet.hubs.asyncio +import eventlet.patcher # We want the blocking APIs, because we set file descriptors to non-blocking. os = eventlet.patcher.original("os") +# Used to communicate between real threads: +SimpleQueue = eventlet.patcher.original("queue").SimpleQueue +# Real OS-level threads: +threading = eventlet.patcher.original("threading") -class PipeMutex: - """Mutex using a pipe. +class _BaseMutex: + """Shared code for different mutex implementations.""" - Works across both greenlets and real threads, even at the same time. - - Class code copied from Swift's swift/common/utils.py - Related eventlet bug: https://github.com/eventlet/eventlet/issues/432 - """ def __init__(self): - self.rfd, self.wfd = os.pipe() - - # You can't create a pipe in non-blocking mode; you must set it - # later. - rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL) - fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK) - os.write(self.wfd, b'-') # start unlocked - self.owner = None - self.recursion_depth = 0 - # Usually, it's an error to have multiple greenthreads all waiting - # to read the same file descriptor. It's often a sign of inadequate - # concurrency control; for example, if you have two greenthreads - # trying to use the same memcache connection, they'll end up writing - # interleaved garbage to the socket or stealing part of each others' - # responses. - # - # In this case, we have multiple greenthreads waiting on the same - # file descriptor by design. This lets greenthreads in real thread A - # wait with greenthreads in real thread B for the same mutex. - # Therefore, we must turn off eventlet's multiple-reader detection. - # - # It would be better to turn off multiple-reader detection for only - # our calls to trampoline(), but eventlet does not support that. - eventlet.debug.hub_prevent_multiple_readers(False) def acquire(self, blocking=True): """Acquire the mutex. @@ -76,6 +57,67 @@ class PipeMutex: self.recursion_depth += 1 return True + return self._acquire_eventlet(blocking, current_greenthread_id) + + def release(self): + """Release the mutex.""" + current_greenthread_id = id(eventlet.greenthread.getcurrent()) + if self.owner != current_greenthread_id: + raise RuntimeError("cannot release un-acquired lock") + + if self.recursion_depth > 0: + self.recursion_depth -= 1 + return + + self.owner = None + self._release_eventlet() + + def close(self): + """Close the mutex. + + This releases its file descriptors. + You can't use a mutex after it's been closed. + """ + self.owner = None + self.recursion_depth = 0 + + +class _ReallyPipeMutex(_BaseMutex): + """Mutex using a pipe. + + Works across both greenlets and real threads, even at the same time. + + Class code copied from Swift's swift/common/utils.py + Related eventlet bug: https://github.com/eventlet/eventlet/issues/432 + """ + def __init__(self): + super().__init__() + + self.rfd, self.wfd = os.pipe() + + # You can't create a pipe in non-blocking mode; you must set it + # later. + rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL) + fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK) + os.write(self.wfd, b'-') # start unlocked + + # Usually, it's an error to have multiple greenthreads all waiting + # to read the same file descriptor. It's often a sign of inadequate + # concurrency control; for example, if you have two greenthreads + # trying to use the same memcache connection, they'll end up writing + # interleaved garbage to the socket or stealing part of each others' + # responses. + # + # In this case, we have multiple greenthreads waiting on the same + # file descriptor by design. This lets greenthreads in real thread A + # wait with greenthreads in real thread B for the same mutex. + # Therefore, we must turn off eventlet's multiple-reader detection. + # + # It would be better to turn off multiple-reader detection for only + # our calls to trampoline(), but eventlet does not support that. + eventlet.debug.hub_prevent_multiple_readers(False) + + def _acquire_eventlet(self, blocking, current_greenthread_id): while True: try: # If there is a byte available, this will read it and remove @@ -96,17 +138,7 @@ class PipeMutex: # else writes to self.wfd. eventlet.hubs.trampoline(self.rfd, read=True) - def release(self): - """Release the mutex.""" - current_greenthread_id = id(eventlet.greenthread.getcurrent()) - if self.owner != current_greenthread_id: - raise RuntimeError("cannot release un-acquired lock") - - if self.recursion_depth > 0: - self.recursion_depth -= 1 - return - - self.owner = None + def _release_eventlet(self): os.write(self.wfd, b'X') def close(self): @@ -121,8 +153,7 @@ class PipeMutex: if self.rfd is not None: os.close(self.rfd) self.rfd = None - self.owner = None - self.recursion_depth = 0 + super().close() def __del__(self): # We need this so we don't leak file descriptors. Otherwise, if you @@ -146,6 +177,94 @@ class PipeMutex: self.release() +class _AsyncioMutex(_BaseMutex): + """Alternative implementation of mutex for eventlet asyncio hub. + + When using the eventlet asyncio hub, multiple file descriptors as readers + aren't supported. So instead, we use two levels of locking: + + 1. A normal RLock, for locking across OS threads. + + 2. An ``asyncio.Lock`` for locking across greenlets within a single OS + thread (each OS thread running greenlets has its own asyncio loop) + """ + def __init__(self): + super().__init__() + self._asyncio_lock = asyncio.Lock() + self._threading_lock = threading.RLock() + + async def _asyncio_acquire(self, blocking, current_greenthread_id): + if blocking: + timeout = None + else: + timeout = 0.000001 + try: + await asyncio.wait_for( + self._asyncio_lock.acquire(), timeout=timeout + ) + except AsyncioTimeoutError: + return False + else: + self.owner = current_greenthread_id + return True + + def _acquire_eventlet(self, blocking, current_greenthread_id): + return eventlet.asyncio.spawn_for_awaitable( + self._asyncio_acquire(blocking, current_greenthread_id) + ).wait() + + def acquire(self, blocking=True): + # First, acquire the RLock: + rlock_acquired = self._threading_lock.acquire(blocking=False) + if not rlock_acquired and not blocking: + return False + # Simulate blocking while allowing other greenlets to run: + while not rlock_acquired: + rlock_acquired = self._threading_lock.acquire(blocking=False) + # The chance of hitting this path is usually quite low (it requires + # multiple _OS_ threads having a conflict) and sleeping for too + # short a time leads to very slow results in one of the tests, + # perhaps due to lots of context switching overhead. So compromise + # on a short but not too-short sleep. + eventlet.sleep(0.0001) + # Then, do the eventlet locking: + return super().acquire(blocking=blocking) + + # Preserve documentation, without copy/pasting: + acquire.__doc__ = _BaseMutex.acquire.__doc__ + + def _release_eventlet(self): + self._asyncio_lock.release() + + def release(self): + """Release the mutex.""" + # We release in reverse order from acquire(), first eventlet and then + # the RLock: + super().release() + self._threading_lock.release() + + def close(self): + """Close the mutex.""" + del self._asyncio_lock + del self._threading_lock + super().close() + + +_HUB = eventlet.hubs.get_hub() +if isinstance(_HUB, eventlet.hubs.asyncio.Hub): + major, minor, patch = map( + int, + importlib.metadata.version("eventlet").split(".")[:3] + ) + if (major, minor, patch) < (0, 38, 2): + raise RuntimeError( + "eventlet 0.38.2 or later is required when using asyncio hub" + ) + PipeMutex = _AsyncioMutex +else: + PipeMutex = _ReallyPipeMutex + + def pipe_createLock(self): """Replacement for logging.Handler.createLock method.""" self.lock = PipeMutex() diff --git a/test-requirements.txt b/test-requirements.txt index 919b8a8b..90716097 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -6,4 +6,4 @@ coverage>=4.5.1 # Apache-2.0 fixtures>=3.0.0 # Apache-2.0/BSD -eventlet>=0.30.1 # MIT +eventlet>=0.38.2 # MIT diff --git a/tox.ini b/tox.ini index 52820e4b..5cdeb19e 100644 --- a/tox.ini +++ b/tox.ini @@ -10,9 +10,12 @@ deps = -c{env:TOX_CONSTRAINTS_FILE:https://releases.openstack.org/constraints/upper/master} -r{toxinidir}/test-requirements.txt -r{toxinidir}/requirements.txt -passenv = OSLO_LOG_TEST_EVENTLET +passenv = + OSLO_LOG_TEST_EVENTLET + EVENTLET_HUB commands = find . -type f -name "*.pyc" -delete + python -c "import eventlet.hubs; print('Hub used:', eventlet.hubs.get_hub())" stestr run {posargs} stestr slowest