Add process guards + invalidate to the connection pool
It's not safe for a database TCP connection to be shared to a child process, as this is a file descriptor which will maintain its state on both sides. Applications such as Cinder which spin up multiprocessing subprocesses at startup time are subject to race conditions as a result. Instead of requiring that engines be explicitly prepared within a child process, we can detect and accommodate this situation in the connection pool itself, by tracking the originating pid of a connection, and if it changes on checkout, by invalidating. Change-Id: If116f7b7140b3eba064d8147e5f637a05beb1cd8
This commit is contained in:
@@ -280,11 +280,13 @@ Efficient use of soft deletes:
|
||||
|
||||
import itertools
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
from sqlalchemy import exc
|
||||
import sqlalchemy.orm
|
||||
from sqlalchemy import pool
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
@@ -476,6 +478,8 @@ def _init_connection_args(url, engine_args, **kw):
|
||||
def _init_events(engine, thread_checkin=True, connection_trace=False, **kw):
|
||||
"""Set up event listeners for all database backends."""
|
||||
|
||||
_add_process_guards(engine)
|
||||
|
||||
if connection_trace:
|
||||
_add_trace_comments(engine)
|
||||
|
||||
@@ -609,6 +613,35 @@ def get_maker(engine, autocommit=True, expire_on_commit=False):
|
||||
query_cls=Query)
|
||||
|
||||
|
||||
def _add_process_guards(engine):
|
||||
"""Add multiprocessing guards.
|
||||
|
||||
Forces a connection to be reconnected if it is detected
|
||||
as having been shared to a sub-process.
|
||||
|
||||
"""
|
||||
|
||||
@sqlalchemy.event.listens_for(engine, "connect")
|
||||
def connect(dbapi_connection, connection_record):
|
||||
connection_record.info['pid'] = os.getpid()
|
||||
|
||||
@sqlalchemy.event.listens_for(engine, "checkout")
|
||||
def checkout(dbapi_connection, connection_record, connection_proxy):
|
||||
pid = os.getpid()
|
||||
if connection_record.info['pid'] != pid:
|
||||
LOG.debug(_LW(
|
||||
"Parent process %(orig)s forked (%(newproc)s) with an open "
|
||||
"database connection, "
|
||||
"which is being discarded and recreated."),
|
||||
{"newproc": pid, "orig": connection_record.info['pid']})
|
||||
connection_record.connection = connection_proxy.connection = None
|
||||
raise exc.DisconnectionError(
|
||||
"Connection record belongs to pid %s, "
|
||||
"attempting to check out in pid %s" %
|
||||
(connection_record.info['pid'], pid)
|
||||
)
|
||||
|
||||
|
||||
def _add_trace_comments(engine):
|
||||
"""Add trace comments.
|
||||
|
||||
|
||||
@@ -648,6 +648,34 @@ class CreateEngineTest(oslo_test.BaseTestCase):
|
||||
)
|
||||
|
||||
|
||||
class ProcessGuardTest(test_base.DbTestCase):
|
||||
def test_process_guard(self):
|
||||
self.engine.dispose()
|
||||
|
||||
def get_parent_pid():
|
||||
return 4
|
||||
|
||||
def get_child_pid():
|
||||
return 5
|
||||
|
||||
with mock.patch("os.getpid", get_parent_pid):
|
||||
with self.engine.connect() as conn:
|
||||
dbapi_id = id(conn.connection.connection)
|
||||
|
||||
with mock.patch("os.getpid", get_child_pid):
|
||||
with self.engine.connect() as conn:
|
||||
new_dbapi_id = id(conn.connection.connection)
|
||||
|
||||
self.assertNotEqual(dbapi_id, new_dbapi_id)
|
||||
|
||||
# ensure it doesn't trip again
|
||||
with mock.patch("os.getpid", get_child_pid):
|
||||
with self.engine.connect() as conn:
|
||||
newer_dbapi_id = id(conn.connection.connection)
|
||||
|
||||
self.assertEqual(new_dbapi_id, newer_dbapi_id)
|
||||
|
||||
|
||||
class PatchStacktraceTest(test_base.DbTestCase):
|
||||
|
||||
def test_trace(self):
|
||||
|
||||
Reference in New Issue
Block a user