Integrate the ping listener into the filter system.
Replace the use of the connection pool "checkout" event for "pinging" a connection with the use of the Connection.begin() event. This event corresponds to the start of a transaction, so will "ping" the connection for every transaction rather than just on checkout, and by running at the level of Connection rather than the pool, we have access to the whole range of services we need, including that we can emit a core "select([1])" that will work on all backends, we get the use of the handle_error() event, and we get the built in checking for "is disconnect" as well as the engine/pool being invalidated or disposed as is appropriate for the SQLAlchemy version in use. The begin() event is a safe place to recycle the connection from an invalid state back to a ready state. partially implement bp: use-events-for-error-wrapping Change-Id: Ic29e12f1288f084a5e727101686dd71b12a5787b
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
"""Test exception filters applied to engines."""
|
||||
|
||||
import contextlib
|
||||
import itertools
|
||||
|
||||
import mock
|
||||
import six
|
||||
@@ -56,6 +57,16 @@ class TestsExceptionFilter(test_base.DbTestCase):
|
||||
|
||||
"""
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _dbapi_fixture(self, dialect_name):
|
||||
engine = self.engine
|
||||
with contextlib.nested(
|
||||
mock.patch.object(engine.dialect.dbapi, "Error",
|
||||
self.Error),
|
||||
mock.patch.object(engine.dialect, "name", dialect_name),
|
||||
):
|
||||
yield
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _fixture(self, dialect_name, exception, is_disconnect=False):
|
||||
|
||||
@@ -440,3 +451,61 @@ class IntegrationTest(test_base.DbTestCase):
|
||||
self.Foo.counter == sqla.func.imfake(123))
|
||||
matched = self.assertRaises(sqla.exc.OperationalError, q.all)
|
||||
self.assertTrue("no such function" in str(matched))
|
||||
|
||||
|
||||
class TestDBDisconnected(TestsExceptionFilter):
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _fixture(self, dialect_name, exception, num_disconnects):
|
||||
engine = self.engine
|
||||
|
||||
real_do_execute = engine.dialect.do_execute
|
||||
counter = itertools.count(1)
|
||||
|
||||
def fake_do_execute(self, *arg, **kw):
|
||||
if next(counter) > num_disconnects:
|
||||
return real_do_execute(self, *arg, **kw)
|
||||
else:
|
||||
raise exception
|
||||
|
||||
with self._dbapi_fixture(dialect_name):
|
||||
with contextlib.nested(
|
||||
mock.patch.object(engine.dialect,
|
||||
"do_execute", fake_do_execute),
|
||||
mock.patch.object(engine.dialect, "is_disconnect",
|
||||
mock.Mock(return_value=True))
|
||||
):
|
||||
yield
|
||||
|
||||
def _test_ping_listener_disconnected(self, dialect_name, exc_obj):
|
||||
with self._fixture(dialect_name, exc_obj, 1):
|
||||
conn = self.engine.connect()
|
||||
with conn.begin():
|
||||
self.assertEqual(conn.scalar(sqla.select([1])), 1)
|
||||
self.assertFalse(conn.closed)
|
||||
self.assertFalse(conn.invalidated)
|
||||
self.assertTrue(conn.in_transaction())
|
||||
|
||||
with self._fixture(dialect_name, exc_obj, 2):
|
||||
conn = self.engine.connect()
|
||||
self.assertRaises(
|
||||
exception.DBConnectionError,
|
||||
conn.begin
|
||||
)
|
||||
self.assertFalse(conn.closed)
|
||||
self.assertFalse(conn.in_transaction())
|
||||
self.assertTrue(conn.invalidated)
|
||||
|
||||
def test_mysql_ping_listener_disconnected(self):
|
||||
for code in [2006, 2013, 2014, 2045, 2055]:
|
||||
self._test_ping_listener_disconnected(
|
||||
"mysql",
|
||||
self.OperationalError('%d MySQL server has gone away' % code)
|
||||
)
|
||||
|
||||
def test_db2_ping_listener_disconnected(self):
|
||||
self._test_ping_listener_disconnected(
|
||||
"ibm_db_sa",
|
||||
self.OperationalError(
|
||||
'SQL30081N: DB2 Server connection is no longer active')
|
||||
)
|
||||
|
||||
@@ -17,9 +17,7 @@
|
||||
|
||||
"""Unit tests for SQLAlchemy specific code."""
|
||||
import logging
|
||||
from oslo.config import cfg
|
||||
|
||||
import _mysql_exceptions
|
||||
import fixtures
|
||||
import mock
|
||||
from oslotest import base as oslo_test
|
||||
@@ -28,6 +26,7 @@ from sqlalchemy import Column, MetaData, Table
|
||||
from sqlalchemy import Integer, String
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
from oslo.config import cfg
|
||||
from oslo.db import exception
|
||||
from oslo.db import options as db_options
|
||||
from oslo.db.sqlalchemy import models
|
||||
@@ -127,53 +126,6 @@ class FakeDB2Engine(object):
|
||||
pass
|
||||
|
||||
|
||||
class TestDBDisconnected(oslo_test.BaseTestCase):
|
||||
|
||||
def _test_ping_listener_disconnected(self, connection):
|
||||
engine_args = {
|
||||
'pool_recycle': 3600,
|
||||
'echo': False,
|
||||
'convert_unicode': True}
|
||||
|
||||
engine = sqlalchemy.create_engine(connection, **engine_args)
|
||||
with mock.patch.object(engine, 'dispose') as dispose_mock:
|
||||
self.assertRaises(sqlalchemy.exc.DisconnectionError,
|
||||
session._ping_listener, engine,
|
||||
FakeDBAPIConnection(), FakeConnectionRec(),
|
||||
FakeConnectionProxy())
|
||||
dispose_mock.assert_called_once_with()
|
||||
|
||||
def test_mysql_ping_listener_disconnected(self):
|
||||
def fake_execute(sql):
|
||||
raise _mysql_exceptions.OperationalError(self.mysql_error,
|
||||
('MySQL server has '
|
||||
'gone away'))
|
||||
with mock.patch.object(FakeCursor, 'execute',
|
||||
side_effect=fake_execute):
|
||||
connection = 'mysql://root:password@fakehost/fakedb?charset=utf8'
|
||||
for code in [2006, 2013, 2014, 2045, 2055]:
|
||||
self.mysql_error = code
|
||||
self._test_ping_listener_disconnected(connection)
|
||||
|
||||
def test_db2_ping_listener_disconnected(self):
|
||||
|
||||
def fake_execute(sql):
|
||||
raise OperationalError('SQL30081N: DB2 Server '
|
||||
'connection is no longer active')
|
||||
with mock.patch.object(FakeCursor, 'execute',
|
||||
side_effect=fake_execute):
|
||||
# TODO(dperaza): Need a fake engine for db2 since ibm_db_sa is not
|
||||
# in global requirements. Change this code to use real IBM db2
|
||||
# engine as soon as ibm_db_sa is included in global-requirements
|
||||
# under openstack/requirements project.
|
||||
fake_create_engine = lambda *args, **kargs: FakeDB2Engine()
|
||||
with mock.patch.object(sqlalchemy, 'create_engine',
|
||||
side_effect=fake_create_engine):
|
||||
connection = ('ibm_db_sa://db2inst1:openstack@fakehost:50000'
|
||||
'/fakedab')
|
||||
self._test_ping_listener_disconnected(connection)
|
||||
|
||||
|
||||
class MySQLModeTestCase(test_base.MySQLOpportunisticTestCase):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
|
||||
Reference in New Issue
Block a user