diff --git a/eventlet/patcher.py b/eventlet/patcher.py index 3a9804e..115fd9b 100644 --- a/eventlet/patcher.py +++ b/eventlet/patcher.py @@ -250,6 +250,9 @@ def monkey_patch(**on): on.setdefault(modname, False) on.setdefault(modname, default_on) + if on['thread'] and not already_patched.get('thread'): + _green_existing_locks() + modules_to_patch = [] for name, modules_function in [ ('os', _green_os_modules), @@ -320,6 +323,59 @@ def is_monkey_patched(module): getattr(module, '__name__', None) in already_patched +def _green_existing_locks(): + """Make locks created before monkey-patching safe. + + RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it + blocks the native thread. We need to replace these with green Locks. + + This was originally noticed in the stdlib logging module.""" + import gc + import threading + import eventlet.green.thread + lock_type = type(threading.Lock()) + rlock_type = type(threading.RLock()) + if sys.version_info[0] >= 3: + pyrlock_type = type(threading._PyRLock()) + # We're monkey-patching so there can't be any greenlets yet, ergo our thread + # ID is the only valid owner possible. + tid = eventlet.green.thread.get_ident() + for obj in gc.get_objects(): + if isinstance(obj, rlock_type): + if (sys.version_info[0] == 2 and + isinstance(obj._RLock__block, lock_type)): + _fix_py2_rlock(obj, tid) + elif (sys.version_info[0] >= 3 and + not isinstance(obj, pyrlock_type)): + _fix_py3_rlock(obj) + + +def _fix_py2_rlock(rlock, tid): + import eventlet.green.threading + old = rlock._RLock__block + new = eventlet.green.threading.Lock() + rlock._RLock__block = new + if old.locked(): + new.acquire() + rlock._RLock__owner = tid + + +def _fix_py3_rlock(old): + import gc + import threading + new = threading._PyRLock() + while old._is_owned(): + old.release() + new.acquire() + if old._is_owned(): + new.acquire() + gc.collect() + for ref in gc.get_referrers(old): + for k, v in vars(ref): + if v == old: + setattr(ref, k, new) + + def _green_os_modules(): from eventlet.green import os return [('os', os)] diff --git a/tests/__init__.py b/tests/__init__.py index 0c37cdd..62bae83 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,7 +12,10 @@ try: except ImportError: resource = None import signal -import subprocess +try: + import subprocess32 as subprocess # py2 +except ImportError: + import subprocess # py3 import sys import unittest import warnings @@ -320,7 +323,12 @@ def run_python(path, env=None, args=None): stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) - output, _ = p.communicate() + try: + output, _ = p.communicate(timeout=30) + except subprocess.TimeoutExpired: + p.kill() + output, _ = p.communicate(timeout=30) + return "{0}\nFAIL - timed out".format(output) return output diff --git a/tests/isolated/patcher_existing_locks_early.py b/tests/isolated/patcher_existing_locks_early.py new file mode 100644 index 0000000..7b486b8 --- /dev/null +++ b/tests/isolated/patcher_existing_locks_early.py @@ -0,0 +1,28 @@ +__test__ = False + + +def aaa(lock, e1, e2): + e1.set() + with lock: + e2.wait() + + +def bbb(lock, e1, e2): + e1.wait() + e2.set() + with lock: + pass + + +if __name__ == '__main__': + import threading + test_lock = threading.RLock() + import eventlet + eventlet.monkey_patch() + + e1, e2 = threading.Event(), threading.Event() + a = eventlet.spawn(aaa, test_lock, e1, e2) + b = eventlet.spawn(bbb, test_lock, e1, e2) + a.wait() + b.wait() + print('pass') diff --git a/tests/isolated/patcher_existing_locks_late.py b/tests/isolated/patcher_existing_locks_late.py new file mode 100644 index 0000000..9924c23 --- /dev/null +++ b/tests/isolated/patcher_existing_locks_late.py @@ -0,0 +1,28 @@ +__test__ = False + + +def aaa(lock, e1, e2): + e1.set() + with lock: + e2.wait() + + +def bbb(lock, e1, e2): + e1.wait() + e2.set() + with lock: + pass + + +if __name__ == '__main__': + import threading + import eventlet + eventlet.monkey_patch() + test_lock = threading.RLock() + + e1, e2 = threading.Event(), threading.Event() + a = eventlet.spawn(aaa, test_lock, e1, e2) + b = eventlet.spawn(bbb, test_lock, e1, e2) + a.wait() + b.wait() + print('pass') diff --git a/tests/isolated/patcher_existing_locks_locked.py b/tests/isolated/patcher_existing_locks_locked.py new file mode 100644 index 0000000..df7d833 --- /dev/null +++ b/tests/isolated/patcher_existing_locks_locked.py @@ -0,0 +1,42 @@ +__test__ = False + + +def take(lock, sync1, sync2): + sync2.acquire() + sync1.release() + with lock: + sync2.release() + + +if __name__ == '__main__': + import sys + import threading + lock = threading.RLock() + lock.acquire() + import eventlet + eventlet.monkey_patch() + + lock.release() + try: + lock.release() + except RuntimeError as e: + assert e.args == ('cannot release un-acquired lock',) + lock.acquire() + + sync1 = threading.Lock() + sync2 = threading.Lock() + sync1.acquire() + eventlet.spawn(take, lock, sync1, sync2) + # Ensure sync2 has been taken + with sync1: + pass + + # an RLock should be reentrant + lock.acquire() + lock.release() + lock.release() + # To acquire sync2, 'take' must have acquired lock, which has been locked + # until now + sync2.acquire() + + print('pass') diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 7ec12b1..0e3c0cf 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -498,6 +498,18 @@ t2.join() self.assertEqual(lines[1], "True", lines[1]) +def test_patcher_existing_locks_early(): + tests.run_isolated('patcher_existing_locks_early.py') + + +def test_patcher_existing_locks_late(): + tests.run_isolated('patcher_existing_locks_late.py') + + +def test_patcher_existing_locks_locked(): + tests.run_isolated('patcher_existing_locks_locked.py') + + def test_importlib_lock(): tests.run_isolated('patcher_importlib_lock.py') diff --git a/tox.ini b/tox.ini index 3094c71..0c86dbd 100644 --- a/tox.ini +++ b/tox.ini @@ -43,6 +43,7 @@ basepython = deps = nose==1.3.1 setuptools==5.4.1 + py{26,27}: subprocess32==3.2.7 py27-dns: dnspython==1.12.0 py{26,27}-{selects,poll,epolls}: MySQL-python==1.2.5 py{34,py}-dns: dnspython3==1.12.0