Merge "Integrate the ping listener into the filter system."
This commit is contained in:
@@ -289,7 +289,9 @@ from sqlalchemy.interfaces import PoolListener
|
||||
import sqlalchemy.orm
|
||||
from sqlalchemy.pool import NullPool, StaticPool
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
from sqlalchemy.sql.expression import select
|
||||
|
||||
from oslo.db import exception
|
||||
from oslo.db.openstack.common.gettextutils import _LW
|
||||
from oslo.db.openstack.common import timeutils
|
||||
from oslo.db import options
|
||||
@@ -335,34 +337,23 @@ def _thread_yield(dbapi_con, con_record):
|
||||
time.sleep(0)
|
||||
|
||||
|
||||
def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
|
||||
"""Ensures that MySQL, PostgreSQL or DB2 connections are alive.
|
||||
def _begin_ping_listener(connection):
|
||||
"""Ping the server at transaction begin and transparently reconnect
|
||||
if a disconnect exception occurs.
|
||||
|
||||
Borrowed from:
|
||||
http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f
|
||||
"""
|
||||
cursor = dbapi_conn.cursor()
|
||||
try:
|
||||
ping_sql = 'select 1'
|
||||
if engine.name == 'ibm_db_sa':
|
||||
# DB2 requires a table expression
|
||||
ping_sql = 'select 1 from (values (1)) AS t1'
|
||||
cursor.execute(ping_sql)
|
||||
except Exception as ex:
|
||||
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
|
||||
msg = _LW('Database server has gone away: %s') % ex
|
||||
LOG.warning(msg)
|
||||
|
||||
# if the database server has gone away, all connections in the pool
|
||||
# have become invalid and we can safely close all of them here,
|
||||
# rather than waste time on checking of every single connection
|
||||
engine.dispose()
|
||||
|
||||
# this will be handled by SQLAlchemy and will force it to create
|
||||
# a new connection and retry the original action
|
||||
raise sqla_exc.DisconnectionError(msg)
|
||||
else:
|
||||
raise
|
||||
# run a SELECT 1. use a core select() so that
|
||||
# any details like that needed by Oracle, DB2 etc. are handled.
|
||||
connection.scalar(select([1]))
|
||||
except exception.DBConnectionError:
|
||||
# catch DBConnectionError, which is raised by the filter
|
||||
# system.
|
||||
# disconnect detected. The connection is now
|
||||
# "invalid", but the pool should be ready to return
|
||||
# new connections assuming they are good now.
|
||||
# run the select again to re-validate the Connection.
|
||||
connection.scalar(select([1]))
|
||||
|
||||
|
||||
def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
|
||||
@@ -482,12 +473,9 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
|
||||
if thread_checkin:
|
||||
sqlalchemy.event.listen(engine, 'checkin', _thread_yield)
|
||||
|
||||
if engine.name in ('ibm_db_sa', 'mysql', 'postgresql'):
|
||||
ping_callback = functools.partial(_ping_listener, engine)
|
||||
sqlalchemy.event.listen(engine, 'checkout', ping_callback)
|
||||
if engine.name == 'mysql':
|
||||
if mysql_sql_mode is not None:
|
||||
_mysql_set_mode_callback(engine, mysql_sql_mode)
|
||||
if engine.name == 'mysql':
|
||||
if mysql_sql_mode is not None:
|
||||
_mysql_set_mode_callback(engine, mysql_sql_mode)
|
||||
elif 'sqlite' in connection_dict.drivername:
|
||||
if not sqlite_synchronous:
|
||||
sqlalchemy.event.listen(engine, 'connect',
|
||||
@@ -522,6 +510,10 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
|
||||
|
||||
# register alternate exception handler
|
||||
exc_filters.register_engine(engine)
|
||||
|
||||
# register on begin handler
|
||||
sqlalchemy.event.listen(engine, "begin", _begin_ping_listener)
|
||||
|
||||
return engine
|
||||
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@
|
||||
"""Test exception filters applied to engines."""
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
|
||||
import mock
|
||||
import six
|
||||
@@ -59,6 +60,16 @@ class TestsExceptionFilter(test_base.DbTestCase):
|
||||
|
||||
"""
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _dbapi_fixture(self, dialect_name):
|
||||
engine = self.engine
|
||||
with contextlib.nested(
|
||||
mock.patch.object(engine.dialect.dbapi, "Error",
|
||||
self.Error),
|
||||
mock.patch.object(engine.dialect, "name", dialect_name),
|
||||
):
|
||||
yield
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _fixture(self, dialect_name, exception, is_disconnect=False):
|
||||
|
||||
@@ -451,3 +462,61 @@ class IntegrationTest(test_base.DbTestCase):
|
||||
self.Foo.counter == sqla.func.imfake(123))
|
||||
matched = self.assertRaises(sqla.exc.OperationalError, q.all)
|
||||
self.assertTrue("no such function" in str(matched))
|
||||
|
||||
|
||||
class TestDBDisconnected(TestsExceptionFilter):
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _fixture(self, dialect_name, exception, num_disconnects):
|
||||
engine = self.engine
|
||||
|
||||
real_do_execute = engine.dialect.do_execute
|
||||
counter = itertools.count(1)
|
||||
|
||||
def fake_do_execute(self, *arg, **kw):
|
||||
if next(counter) > num_disconnects:
|
||||
return real_do_execute(self, *arg, **kw)
|
||||
else:
|
||||
raise exception
|
||||
|
||||
with self._dbapi_fixture(dialect_name):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(engine.dialect,
|
||||
"do_execute", fake_do_execute),
|
||||
mock.patch.object(engine.dialect, "is_disconnect",
|
||||
mock.Mock(return_value=True))
|
||||
):
|
||||
yield
|
||||
|
||||
def _test_ping_listener_disconnected(self, dialect_name, exc_obj):
|
||||
with self._fixture(dialect_name, exc_obj, 1):
|
||||
conn = self.engine.connect()
|
||||
with conn.begin():
|
||||
self.assertEqual(conn.scalar(sqla.select([1])), 1)
|
||||
self.assertFalse(conn.closed)
|
||||
self.assertFalse(conn.invalidated)
|
||||
self.assertTrue(conn.in_transaction())
|
||||
|
||||
with self._fixture(dialect_name, exc_obj, 2):
|
||||
conn = self.engine.connect()
|
||||
self.assertRaises(
|
||||
exception.DBConnectionError,
|
||||
conn.begin
|
||||
)
|
||||
self.assertFalse(conn.closed)
|
||||
self.assertFalse(conn.in_transaction())
|
||||
self.assertTrue(conn.invalidated)
|
||||
|
||||
def test_mysql_ping_listener_disconnected(self):
|
||||
for code in [2006, 2013, 2014, 2045, 2055]:
|
||||
self._test_ping_listener_disconnected(
|
||||
"mysql",
|
||||
self.OperationalError('%d MySQL server has gone away' % code)
|
||||
)
|
||||
|
||||
def test_db2_ping_listener_disconnected(self):
|
||||
self._test_ping_listener_disconnected(
|
||||
"ibm_db_sa",
|
||||
self.OperationalError(
|
||||
'SQL30081N: DB2 Server connection is no longer active')
|
||||
)
|
||||
|
||||
@@ -17,9 +17,7 @@
|
||||
|
||||
"""Unit tests for SQLAlchemy specific code."""
|
||||
import logging
|
||||
from oslo.config import cfg
|
||||
|
||||
import _mysql_exceptions
|
||||
import fixtures
|
||||
import mock
|
||||
from oslotest import base as oslo_test
|
||||
@@ -28,6 +26,7 @@ from sqlalchemy import Column, MetaData, Table
|
||||
from sqlalchemy import Integer, String
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.db import exception
|
||||
from oslo.db import options as db_options
|
||||
from oslo.db.sqlalchemy import models
|
||||
@@ -127,53 +126,6 @@ class FakeDB2Engine(object):
|
||||
pass
|
||||
|
||||
|
||||
class TestDBDisconnected(oslo_test.BaseTestCase):
|
||||
|
||||
def _test_ping_listener_disconnected(self, connection):
|
||||
engine_args = {
|
||||
'pool_recycle': 3600,
|
||||
'echo': False,
|
||||
'convert_unicode': True}
|
||||
|
||||
engine = sqlalchemy.create_engine(connection, **engine_args)
|
||||
with mock.patch.object(engine, 'dispose') as dispose_mock:
|
||||
self.assertRaises(sqlalchemy.exc.DisconnectionError,
|
||||
session._ping_listener, engine,
|
||||
FakeDBAPIConnection(), FakeConnectionRec(),
|
||||
FakeConnectionProxy())
|
||||
dispose_mock.assert_called_once_with()
|
||||
|
||||
def test_mysql_ping_listener_disconnected(self):
|
||||
def fake_execute(sql):
|
||||
raise _mysql_exceptions.OperationalError(self.mysql_error,
|
||||
('MySQL server has '
|
||||
'gone away'))
|
||||
with mock.patch.object(FakeCursor, 'execute',
|
||||
side_effect=fake_execute):
|
||||
connection = 'mysql://root:password@fakehost/fakedb?charset=utf8'
|
||||
for code in [2006, 2013, 2014, 2045, 2055]:
|
||||
self.mysql_error = code
|
||||
self._test_ping_listener_disconnected(connection)
|
||||
|
||||
def test_db2_ping_listener_disconnected(self):
|
||||
|
||||
def fake_execute(sql):
|
||||
raise OperationalError('SQL30081N: DB2 Server '
|
||||
'connection is no longer active')
|
||||
with mock.patch.object(FakeCursor, 'execute',
|
||||
side_effect=fake_execute):
|
||||
# TODO(dperaza): Need a fake engine for db2 since ibm_db_sa is not
|
||||
# in global requirements. Change this code to use real IBM db2
|
||||
# engine as soon as ibm_db_sa is included in global-requirements
|
||||
# under openstack/requirements project.
|
||||
fake_create_engine = lambda *args, **kargs: FakeDB2Engine()
|
||||
with mock.patch.object(sqlalchemy, 'create_engine',
|
||||
side_effect=fake_create_engine):
|
||||
connection = ('ibm_db_sa://db2inst1:openstack@fakehost:50000'
|
||||
'/fakedab')
|
||||
self._test_ping_listener_disconnected(connection)
|
||||
|
||||
|
||||
class MySQLModeTestCase(test_base.MySQLOpportunisticTestCase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
||||
Reference in New Issue
Block a user