From bbeaa2e3a0b6da26af08f5e6734767e04c187c5b Mon Sep 17 00:00:00 2001 From: Alexis Lee Date: Mon, 18 Apr 2016 21:07:00 +0100 Subject: [PATCH] patcher: patch existing threading locks; Thanks to Alexis Lee In projects which dynamically determine whether to activate eventlet, it can be hard not to import a low level module like logging before eventlet. When logging is imported it initialises a threading.RLock which it uses to protect the logging configuration. If two greenthreads attempt to claim this lock, the second one will block the /native/ thread not just itself. As green systems usually only have one native thread, this will freeze the whole system. Search the GC for unsafe RLocks and replace their internal Lock with a safe one while monkey-patching. The tests pass, but were they to fail, the test process would never return. To deal with this, I've added a test dependency on subprocess32 which is a backport of the stdlib subprocess module from Python3. This offers a timeout option on Popen#communicate, which I've arbitrarily set at 30 seconds. --- eventlet/patcher.py | 56 +++++++++++++++++++ tests/__init__.py | 12 +++- .../isolated/patcher_existing_locks_early.py | 28 ++++++++++ tests/isolated/patcher_existing_locks_late.py | 28 ++++++++++ .../isolated/patcher_existing_locks_locked.py | 42 ++++++++++++++ tests/patcher_test.py | 12 ++++ tox.ini | 1 + 7 files changed, 177 insertions(+), 2 deletions(-) create mode 100644 tests/isolated/patcher_existing_locks_early.py create mode 100644 tests/isolated/patcher_existing_locks_late.py create mode 100644 tests/isolated/patcher_existing_locks_locked.py diff --git a/eventlet/patcher.py b/eventlet/patcher.py index 3a9804e..115fd9b 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -250,6 +250,9 @@ def monkey_patch(**on): on.setdefault(modname, False) on.setdefault(modname, default_on) + if on['thread'] and not already_patched.get('thread'): + _green_existing_locks() + modules_to_patch = [] for name, modules_function in [ ('os', _green_os_modules), @@ -320,6 +323,59 @@ def is_monkey_patched(module): getattr(module, '__name__', None) in already_patched +def _green_existing_locks(): + """Make locks created before monkey-patching safe. + + RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it + blocks the native thread. We need to replace these with green Locks. + + This was originally noticed in the stdlib logging module.""" + import gc + import threading + import eventlet.green.thread + lock_type = type(threading.Lock()) + rlock_type = type(threading.RLock()) + if sys.version_info[0] >= 3: + pyrlock_type = type(threading._PyRLock()) + # We're monkey-patching so there can't be any greenlets yet, ergo our thread + # ID is the only valid owner possible. + tid = eventlet.green.thread.get_ident() + for obj in gc.get_objects(): + if isinstance(obj, rlock_type): + if (sys.version_info[0] == 2 and + isinstance(obj._RLock__block, lock_type)): + _fix_py2_rlock(obj, tid) + elif (sys.version_info[0] >= 3 and + not isinstance(obj, pyrlock_type)): + _fix_py3_rlock(obj) + + +def _fix_py2_rlock(rlock, tid): + import eventlet.green.threading + old = rlock._RLock__block + new = eventlet.green.threading.Lock() + rlock._RLock__block = new + if old.locked(): + new.acquire() + rlock._RLock__owner = tid + + +def _fix_py3_rlock(old): + import gc + import threading + new = threading._PyRLock() + while old._is_owned(): + old.release() + new.acquire() + if old._is_owned(): + new.acquire() + gc.collect() + for ref in gc.get_referrers(old): + for k, v in vars(ref): + if v == old: + setattr(ref, k, new) + + def _green_os_modules(): from eventlet.green import os return [('os', os)] diff --git a/tests/__init__.py b/tests/__init__.py index 0c37cdd..62bae83 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,7 +12,10 @@ try: except ImportError: resource = None import signal -import subprocess +try: + import subprocess32 as subprocess # py2 +except ImportError: + import subprocess # py3 import sys import unittest import warnings @@ -320,7 +323,12 @@ def run_python(path, env=None, args=None): stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) - output, _ = p.communicate() + try: + output, _ = p.communicate(timeout=30) + except subprocess.TimeoutExpired: + p.kill() + output, _ = p.communicate(timeout=30) + return "{0}\nFAIL - timed out".format(output) return output diff --git a/tests/isolated/patcher_existing_locks_early.py b/tests/isolated/patcher_existing_locks_early.py new file mode 100644 index 0000000..7b486b8 --- /dev/null +++ b/tests/isolated/patcher_existing_locks_early.py @@ -0,0 +1,28 @@ +__test__ = False + + +def aaa(lock, e1, e2): + e1.set() + with lock: + e2.wait() + + +def bbb(lock, e1, e2): + e1.wait() + e2.set() + with lock: + pass + + +if __name__ == '__main__': + import threading + test_lock = threading.RLock() + import eventlet + eventlet.monkey_patch() + + e1, e2 = threading.Event(), threading.Event() + a = eventlet.spawn(aaa, test_lock, e1, e2) + b = eventlet.spawn(bbb, test_lock, e1, e2) + a.wait() + b.wait() + print('pass') diff --git a/tests/isolated/patcher_existing_locks_late.py b/tests/isolated/patcher_existing_locks_late.py new file mode 100644 index 0000000..9924c23 --- /dev/null +++ b/tests/isolated/patcher_existing_locks_late.py @@ -0,0 +1,28 @@ +__test__ = False + + +def aaa(lock, e1, e2): + e1.set() + with lock: + e2.wait() + + +def bbb(lock, e1, e2): + e1.wait() + e2.set() + with lock: + pass + + +if __name__ == '__main__': + import threading + import eventlet + eventlet.monkey_patch() + test_lock = threading.RLock() + + e1, e2 = threading.Event(), threading.Event() + a = eventlet.spawn(aaa, test_lock, e1, e2) + b = eventlet.spawn(bbb, test_lock, e1, e2) + a.wait() + b.wait() + print('pass') diff --git a/tests/isolated/patcher_existing_locks_locked.py b/tests/isolated/patcher_existing_locks_locked.py new file mode 100644 index 0000000..df7d833 --- /dev/null +++ b/tests/isolated/patcher_existing_locks_locked.py @@ -0,0 +1,42 @@ +__test__ = False + + +def take(lock, sync1, sync2): + sync2.acquire() + sync1.release() + with lock: + sync2.release() + + +if __name__ == '__main__': + import sys + import threading + lock = threading.RLock() + lock.acquire() + import eventlet + eventlet.monkey_patch() + + lock.release() + try: + lock.release() + except RuntimeError as e: + assert e.args == ('cannot release un-acquired lock',) + lock.acquire() + + sync1 = threading.Lock() + sync2 = threading.Lock() + sync1.acquire() + eventlet.spawn(take, lock, sync1, sync2) + # Ensure sync2 has been taken + with sync1: + pass + + # an RLock should be reentrant + lock.acquire() + lock.release() + lock.release() + # To acquire sync2, 'take' must have acquired lock, which has been locked + # until now + sync2.acquire() + + print('pass') diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 7ec12b1..0e3c0cf 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -498,6 +498,18 @@ t2.join() self.assertEqual(lines[1], "True", lines[1]) +def test_patcher_existing_locks_early(): + tests.run_isolated('patcher_existing_locks_early.py') + + +def test_patcher_existing_locks_late(): + tests.run_isolated('patcher_existing_locks_late.py') + + +def test_patcher_existing_locks_locked(): + tests.run_isolated('patcher_existing_locks_locked.py') + + def test_importlib_lock(): tests.run_isolated('patcher_importlib_lock.py') diff --git a/tox.ini b/tox.ini index 3094c71..0c86dbd 100644 --- a/tox.ini +++ b/tox.ini @@ -43,6 +43,7 @@ basepython = deps = nose==1.3.1 setuptools==5.4.1 + py{26,27}: subprocess32==3.2.7 py27-dns: dnspython==1.12.0 py{26,27}-{selects,poll,epolls}: MySQL-python==1.2.5 py{34,py}-dns: dnspython3==1.12.0