Fix deadlock when logging from a tpool thread.

The object server runs certain IO-intensive methods outside the main
pthread for performance. If one of those methods tries to log, this can
cause a crash that eventually leads to an object server with hundreds
or thousands of greenthreads, all deadlocked.

The short version of the story is that logging.SysLogHandler has a
mutex which Eventlet monkey-patches. However, the monkey-patched mutex
sometimes breaks if used across different pthreads, and it breaks in
such a way that it is still considered held. After that happens, any
attempt to emit a log message blocks the calling greenthread forever.

The fix is to use a mutex that works across different greenlets and
across different pthreads. This patch introduces such a lock based on
an anonymous pipe.

Change-Id: I57decefaf5bbed57b97a62d0df8518b112917480
Closes-Bug: 1710328
This commit is contained in:
Samuel Merritt
2017-08-14 10:41:31 -07:00
parent bf09a06708
commit 6d160797fc
2 changed files with 307 additions and 7 deletions

View File

@@ -50,9 +50,12 @@ import stat
import datetime
import eventlet
import eventlet.debug
import eventlet.greenthread
import eventlet.semaphore
from eventlet import GreenPool, sleep, Timeout, tpool
from eventlet.green import socket, threading
from eventlet.hubs import trampoline
import eventlet.queue
import netifaces
import codecs
@@ -1894,17 +1897,18 @@ def get_logger(conf, name=None, log_to_console=False, log_route=None,
if udp_host:
udp_port = int(conf.get('log_udp_port',
logging.handlers.SYSLOG_UDP_PORT))
handler = SysLogHandler(address=(udp_host, udp_port),
facility=facility)
handler = ThreadSafeSysLogHandler(address=(udp_host, udp_port),
facility=facility)
else:
log_address = conf.get('log_address', '/dev/log')
try:
handler = SysLogHandler(address=log_address, facility=facility)
handler = ThreadSafeSysLogHandler(address=log_address,
facility=facility)
except socket.error as e:
# Either /dev/log isn't a UNIX socket or it does not exist at all
if e.errno not in [errno.ENOTSOCK, errno.ENOENT]:
raise
handler = SysLogHandler(facility=facility)
handler = ThreadSafeSysLogHandler(facility=facility)
handler.setFormatter(formatter)
logger.addHandler(handler)
get_logger.handler4logger[logger] = handler
@@ -4304,3 +4308,123 @@ def replace_partition_in_path(path, part_power):
path_components[-4] = "%d" % part
return os.sep.join(path_components)
class PipeMutex(object):
"""
Mutex using a pipe. Works across both greenlets and real threads, even
at the same time.
"""
def __init__(self):
self.rfd, self.wfd = os.pipe()
# You can't create a pipe in non-blocking mode; you must set it
# later.
rflags = fcntl.fcntl(self.rfd, fcntl.F_GETFL)
fcntl.fcntl(self.rfd, fcntl.F_SETFL, rflags | os.O_NONBLOCK)
os.write(self.wfd, b'-') # start unlocked
self.owner = None
self.recursion_depth = 0
# Usually, it's an error to have multiple greenthreads all waiting
# to read the same file descriptor. It's often a sign of inadequate
# concurrency control; for example, if you have two greenthreads
# trying to use the same memcache connection, they'll end up writing
# interleaved garbage to the socket or stealing part of each others'
# responses.
#
# In this case, we have multiple greenthreads waiting on the same
# file descriptor by design. This lets greenthreads in real thread A
# wait with greenthreads in real thread B for the same mutex.
# Therefore, we must turn off eventlet's multiple-reader detection.
#
# It would be better to turn off multiple-reader detection for only
# our calls to trampoline(), but eventlet does not support that.
eventlet.debug.hub_prevent_multiple_readers(False)
def acquire(self, blocking=True):
"""
Acquire the mutex.
If called with blocking=False, returns True if the mutex was
acquired and False if it wasn't. Otherwise, blocks until the mutex
is acquired and returns True.
This lock is recursive; the same greenthread may acquire it as many
times as it wants to, though it must then release it that many times
too.
"""
current_greenthread_id = id(eventlet.greenthread.getcurrent())
if self.owner == current_greenthread_id:
self.recursion_depth += 1
return True
while True:
try:
# If there is a byte available, this will read it and remove
# it from the pipe. If not, this will raise OSError with
# errno=EAGAIN.
os.read(self.rfd, 1)
self.owner = current_greenthread_id
return True
except OSError as err:
if err.errno != errno.EAGAIN:
raise
if not blocking:
return False
# Tell eventlet to suspend the current greenthread until
# self.rfd becomes readable. This will happen when someone
# else writes to self.wfd.
trampoline(self.rfd, read=True)
def release(self):
"""
Release the mutex.
"""
current_greenthread_id = id(eventlet.greenthread.getcurrent())
if self.owner != current_greenthread_id:
raise RuntimeError("cannot release un-acquired lock")
if self.recursion_depth > 0:
self.recursion_depth -= 1
return
self.owner = None
os.write(self.wfd, b'X')
def close(self):
"""
Close the mutex. This releases its file descriptors.
You can't use a mutex after it's been closed.
"""
if self.wfd is not None:
os.close(self.rfd)
self.rfd = None
os.close(self.wfd)
self.wfd = None
self.owner = None
self.recursion_depth = 0
def __del__(self):
# We need this so we don't leak file descriptors. Otherwise, if you
# call get_logger() and don't explicitly dispose of it by calling
# logger.logger.handlers[0].lock.close() [1], the pipe file
# descriptors are leaked.
#
# This only really comes up in tests. Swift processes tend to call
# get_logger() once and then hang on to it until they exit, but the
# test suite calls get_logger() a lot.
#
# [1] and that's a completely ridiculous thing to expect callers to
# do, so nobody does it and that's okay.
self.close()
class ThreadSafeSysLogHandler(SysLogHandler):
def createLock(self):
self.lock = PipeMutex()

View File

@@ -21,7 +21,9 @@ import ctypes
import contextlib
import errno
import eventlet
import eventlet.debug
import eventlet.event
import eventlet.patcher
import functools
import grp
import logging
@@ -1489,7 +1491,7 @@ class TestUtils(unittest.TestCase):
'test1\ntest3\ntest4\ntest6\n')
def test_get_logger_sysloghandler_plumbing(self):
orig_sysloghandler = utils.SysLogHandler
orig_sysloghandler = utils.ThreadSafeSysLogHandler
syslog_handler_args = []
def syslog_handler_catcher(*args, **kwargs):
@@ -1500,7 +1502,7 @@ class TestUtils(unittest.TestCase):
syslog_handler_catcher.LOG_LOCAL3 = orig_sysloghandler.LOG_LOCAL3
try:
utils.SysLogHandler = syslog_handler_catcher
utils.ThreadSafeSysLogHandler = syslog_handler_catcher
utils.get_logger({
'log_facility': 'LOG_LOCAL3',
}, 'server', log_route='server')
@@ -1550,7 +1552,7 @@ class TestUtils(unittest.TestCase):
'facility': orig_sysloghandler.LOG_LOCAL0})],
syslog_handler_args)
finally:
utils.SysLogHandler = orig_sysloghandler
utils.ThreadSafeSysLogHandler = orig_sysloghandler
@reset_logger_state
def test_clean_logger_exception(self):
@@ -6252,5 +6254,179 @@ name = %s
reload_storage_policies()
self.assertIsNotNone(POLICIES.get_by_name(self.policy_name))
class TestPipeMutex(unittest.TestCase):
def setUp(self):
self.mutex = utils.PipeMutex()
def tearDown(self):
self.mutex.close()
def test_nonblocking(self):
evt_lock1 = eventlet.event.Event()
evt_lock2 = eventlet.event.Event()
evt_unlock = eventlet.event.Event()
def get_the_lock():
self.mutex.acquire()
evt_lock1.send('got the lock')
evt_lock2.wait()
self.mutex.release()
evt_unlock.send('released the lock')
eventlet.spawn(get_the_lock)
evt_lock1.wait() # Now, the other greenthread has the lock.
self.assertFalse(self.mutex.acquire(blocking=False))
evt_lock2.send('please release the lock')
evt_unlock.wait() # The other greenthread has released the lock.
self.assertTrue(self.mutex.acquire(blocking=False))
def test_recursive(self):
self.assertTrue(self.mutex.acquire(blocking=False))
self.assertTrue(self.mutex.acquire(blocking=False))
def try_acquire_lock():
return self.mutex.acquire(blocking=False)
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
self.mutex.release()
self.assertFalse(eventlet.spawn(try_acquire_lock).wait())
self.mutex.release()
self.assertTrue(eventlet.spawn(try_acquire_lock).wait())
def test_release_without_acquire(self):
self.assertRaises(RuntimeError, self.mutex.release)
def test_too_many_releases(self):
self.mutex.acquire()
self.mutex.release()
self.assertRaises(RuntimeError, self.mutex.release)
def test_wrong_releaser(self):
self.mutex.acquire()
self.assertRaises(RuntimeError,
eventlet.spawn(self.mutex.release).wait)
def test_blocking(self):
evt = eventlet.event.Event()
sequence = []
def coro1():
eventlet.sleep(0) # let coro2 go
self.mutex.acquire()
sequence.append('coro1 acquire')
evt.send('go')
self.mutex.release()
sequence.append('coro1 release')
def coro2():
evt.wait() # wait for coro1 to start us
self.mutex.acquire()
sequence.append('coro2 acquire')
self.mutex.release()
sequence.append('coro2 release')
c1 = eventlet.spawn(coro1)
c2 = eventlet.spawn(coro2)
c1.wait()
c2.wait()
self.assertEqual(sequence, [
'coro1 acquire',
'coro1 release',
'coro2 acquire',
'coro2 release'])
def test_blocking_tpool(self):
# Note: this test's success isn't a guarantee that the mutex is
# working. However, this test's failure means that the mutex is
# definitely broken.
sequence = []
def do_stuff():
n = 10
while n > 0:
self.mutex.acquire()
sequence.append("<")
eventlet.sleep(0.0001)
sequence.append(">")
self.mutex.release()
n -= 1
greenthread1 = eventlet.spawn(do_stuff)
greenthread2 = eventlet.spawn(do_stuff)
real_thread1 = eventlet.patcher.original('threading').Thread(
target=do_stuff)
real_thread1.start()
real_thread2 = eventlet.patcher.original('threading').Thread(
target=do_stuff)
real_thread2.start()
greenthread1.wait()
greenthread2.wait()
real_thread1.join()
real_thread2.join()
self.assertEqual(''.join(sequence), "<>" * 40)
def test_blocking_preserves_ownership(self):
pthread1_event = eventlet.patcher.original('threading').Event()
pthread2_event1 = eventlet.patcher.original('threading').Event()
pthread2_event2 = eventlet.patcher.original('threading').Event()
thread_id = []
owner = []
def pthread1():
thread_id.append(id(eventlet.greenthread.getcurrent()))
self.mutex.acquire()
owner.append(self.mutex.owner)
pthread2_event1.set()
orig_os_write = utils.os.write
def patched_os_write(*a, **kw):
try:
return orig_os_write(*a, **kw)
finally:
pthread1_event.wait()
with mock.patch.object(utils.os, 'write', patched_os_write):
self.mutex.release()
pthread2_event2.set()
def pthread2():
pthread2_event1.wait() # ensure pthread1 acquires lock first
thread_id.append(id(eventlet.greenthread.getcurrent()))
self.mutex.acquire()
pthread1_event.set()
pthread2_event2.wait()
owner.append(self.mutex.owner)
self.mutex.release()
real_thread1 = eventlet.patcher.original('threading').Thread(
target=pthread1)
real_thread1.start()
real_thread2 = eventlet.patcher.original('threading').Thread(
target=pthread2)
real_thread2.start()
real_thread1.join()
real_thread2.join()
self.assertEqual(thread_id, owner)
self.assertIsNone(self.mutex.owner)
@classmethod
def tearDownClass(cls):
# PipeMutex turns this off when you instantiate one
eventlet.debug.hub_prevent_multiple_readers(True)
if __name__ == '__main__':
unittest.main()