patcher: patch existing threading locks; Thanks to Alexis Lee

In projects which dynamically determine whether to activate eventlet,
it can be hard not to import a low level module like logging before
eventlet. When logging is imported it initialises a threading.RLock
which it uses to protect the logging configuration. If two
greenthreads attempt to claim this lock, the second one will block the
/native/ thread not just itself. As green systems usually only have
one native thread, this will freeze the whole system.

Search the GC for unsafe RLocks and replace their internal Lock with a
safe one while monkey-patching.

The tests pass, but were they to fail, the test process would never
return. To deal with this, I've added a test dependency on
subprocess32 which is a backport of the stdlib subprocess module from
Python3. This offers a timeout option on Popen#communicate, which I've
arbitrarily set at 30 seconds.
This commit is contained in:
Alexis Lee
2016-04-18 21:07:00 +01:00
committed by Sergey Shepelev
parent 2cd5f1d9ae
commit bbeaa2e3a0
7 changed files with 177 additions and 2 deletions

View File

@@ -250,6 +250,9 @@ def monkey_patch(**on):
on.setdefault(modname, False)
on.setdefault(modname, default_on)
if on['thread'] and not already_patched.get('thread'):
_green_existing_locks()
modules_to_patch = []
for name, modules_function in [
('os', _green_os_modules),
@@ -320,6 +323,59 @@ def is_monkey_patched(module):
getattr(module, '__name__', None) in already_patched
def _green_existing_locks():
"""Make locks created before monkey-patching safe.
RLocks rely on a Lock and on Python 2, if an unpatched Lock blocks, it
blocks the native thread. We need to replace these with green Locks.
This was originally noticed in the stdlib logging module."""
import gc
import threading
import eventlet.green.thread
lock_type = type(threading.Lock())
rlock_type = type(threading.RLock())
if sys.version_info[0] >= 3:
pyrlock_type = type(threading._PyRLock())
# We're monkey-patching so there can't be any greenlets yet, ergo our thread
# ID is the only valid owner possible.
tid = eventlet.green.thread.get_ident()
for obj in gc.get_objects():
if isinstance(obj, rlock_type):
if (sys.version_info[0] == 2 and
isinstance(obj._RLock__block, lock_type)):
_fix_py2_rlock(obj, tid)
elif (sys.version_info[0] >= 3 and
not isinstance(obj, pyrlock_type)):
_fix_py3_rlock(obj)
def _fix_py2_rlock(rlock, tid):
import eventlet.green.threading
old = rlock._RLock__block
new = eventlet.green.threading.Lock()
rlock._RLock__block = new
if old.locked():
new.acquire()
rlock._RLock__owner = tid
def _fix_py3_rlock(old):
import gc
import threading
new = threading._PyRLock()
while old._is_owned():
old.release()
new.acquire()
if old._is_owned():
new.acquire()
gc.collect()
for ref in gc.get_referrers(old):
for k, v in vars(ref):
if v == old:
setattr(ref, k, new)
def _green_os_modules():
from eventlet.green import os
return [('os', os)]

View File

@@ -12,7 +12,10 @@ try:
except ImportError:
resource = None
import signal
import subprocess
try:
import subprocess32 as subprocess # py2
except ImportError:
import subprocess # py3
import sys
import unittest
import warnings
@@ -320,7 +323,12 @@ def run_python(path, env=None, args=None):
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
output, _ = p.communicate()
try:
output, _ = p.communicate(timeout=30)
except subprocess.TimeoutExpired:
p.kill()
output, _ = p.communicate(timeout=30)
return "{0}\nFAIL - timed out".format(output)
return output

View File

@@ -0,0 +1,28 @@
__test__ = False
def aaa(lock, e1, e2):
e1.set()
with lock:
e2.wait()
def bbb(lock, e1, e2):
e1.wait()
e2.set()
with lock:
pass
if __name__ == '__main__':
import threading
test_lock = threading.RLock()
import eventlet
eventlet.monkey_patch()
e1, e2 = threading.Event(), threading.Event()
a = eventlet.spawn(aaa, test_lock, e1, e2)
b = eventlet.spawn(bbb, test_lock, e1, e2)
a.wait()
b.wait()
print('pass')

View File

@@ -0,0 +1,28 @@
__test__ = False
def aaa(lock, e1, e2):
e1.set()
with lock:
e2.wait()
def bbb(lock, e1, e2):
e1.wait()
e2.set()
with lock:
pass
if __name__ == '__main__':
import threading
import eventlet
eventlet.monkey_patch()
test_lock = threading.RLock()
e1, e2 = threading.Event(), threading.Event()
a = eventlet.spawn(aaa, test_lock, e1, e2)
b = eventlet.spawn(bbb, test_lock, e1, e2)
a.wait()
b.wait()
print('pass')

View File

@@ -0,0 +1,42 @@
__test__ = False
def take(lock, sync1, sync2):
sync2.acquire()
sync1.release()
with lock:
sync2.release()
if __name__ == '__main__':
import sys
import threading
lock = threading.RLock()
lock.acquire()
import eventlet
eventlet.monkey_patch()
lock.release()
try:
lock.release()
except RuntimeError as e:
assert e.args == ('cannot release un-acquired lock',)
lock.acquire()
sync1 = threading.Lock()
sync2 = threading.Lock()
sync1.acquire()
eventlet.spawn(take, lock, sync1, sync2)
# Ensure sync2 has been taken
with sync1:
pass
# an RLock should be reentrant
lock.acquire()
lock.release()
lock.release()
# To acquire sync2, 'take' must have acquired lock, which has been locked
# until now
sync2.acquire()
print('pass')

View File

@@ -498,6 +498,18 @@ t2.join()
self.assertEqual(lines[1], "True", lines[1])
def test_patcher_existing_locks_early():
tests.run_isolated('patcher_existing_locks_early.py')
def test_patcher_existing_locks_late():
tests.run_isolated('patcher_existing_locks_late.py')
def test_patcher_existing_locks_locked():
tests.run_isolated('patcher_existing_locks_locked.py')
def test_importlib_lock():
tests.run_isolated('patcher_importlib_lock.py')

View File

@@ -43,6 +43,7 @@ basepython =
deps =
nose==1.3.1
setuptools==5.4.1
py{26,27}: subprocess32==3.2.7
py27-dns: dnspython==1.12.0
py{26,27}-{selects,poll,epolls}: MySQL-python==1.2.5
py{34,py}-dns: dnspython3==1.12.0