Merge "Ensure DBConnectionError is raised on failed revalidate"
This commit is contained in:
commit
67c6f02bb9
oslo_db
sqlalchemy
tests/sqlalchemy
@ -23,8 +23,6 @@ from oslo_db.sqlalchemy.compat import handle_error as _h_err
|
|||||||
# flake8 won't let me import handle_error directly
|
# flake8 won't let me import handle_error directly
|
||||||
engine_connect = _e_conn.engine_connect
|
engine_connect = _e_conn.engine_connect
|
||||||
handle_error = _h_err.handle_error
|
handle_error = _h_err.handle_error
|
||||||
handle_connect_context = _h_err.handle_connect_context
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
'engine_connect', 'handle_error',
|
'engine_connect', 'handle_error']
|
||||||
'handle_connect_context']
|
|
||||||
|
@ -16,10 +16,10 @@ http://docs.sqlalchemy.org/en/rel_0_9/core/events.html.
|
|||||||
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
import contextlib
|
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
import six
|
import six
|
||||||
|
from sqlalchemy.engine import base as engine_base
|
||||||
from sqlalchemy.engine import Engine
|
from sqlalchemy.engine import Engine
|
||||||
from sqlalchemy import event
|
from sqlalchemy import event
|
||||||
from sqlalchemy import exc as sqla_exc
|
from sqlalchemy import exc as sqla_exc
|
||||||
@ -39,117 +39,294 @@ def handle_error(engine, listener):
|
|||||||
if utils.sqla_100:
|
if utils.sqla_100:
|
||||||
event.listen(engine, "handle_error", listener)
|
event.listen(engine, "handle_error", listener)
|
||||||
return
|
return
|
||||||
elif utils.sqla_097:
|
|
||||||
# ctx.engine added per
|
|
||||||
# https://bitbucket.org/zzzeek/sqlalchemy/issue/3266/
|
|
||||||
def wrap_listener(ctx):
|
|
||||||
ctx.engine = ctx.connection.engine
|
|
||||||
return listener(ctx)
|
|
||||||
event.listen(engine, "handle_error", wrap_listener)
|
|
||||||
return
|
|
||||||
|
|
||||||
assert isinstance(engine, Engine), \
|
assert isinstance(engine, Engine), \
|
||||||
"engine argument must be an Engine instance, not a Connection"
|
"engine argument must be an Engine instance, not a Connection"
|
||||||
|
|
||||||
# use a Connection-wrapper class to wrap _handle_dbapi_exception.
|
if not utils.sqla_097:
|
||||||
if not getattr(engine._connection_cls,
|
_rework_handle_exception_for_events(engine)
|
||||||
'_oslo_handle_error_wrapper', False):
|
engine._oslo_handle_error_events.append(listener)
|
||||||
engine._oslo_handle_error_events = []
|
|
||||||
|
|
||||||
class Connection(engine._connection_cls):
|
_rework_connect_and_revalidate_for_events(engine)
|
||||||
_oslo_handle_error_wrapper = True
|
|
||||||
|
|
||||||
def _handle_dbapi_exception(self, e, statement, parameters,
|
if utils.sqla_097:
|
||||||
cursor, context):
|
# ctx.engine added per
|
||||||
|
# https://bitbucket.org/zzzeek/sqlalchemy/issue/3266/
|
||||||
|
def wrap_listener(ctx):
|
||||||
|
if isinstance(ctx, engine_base.ExceptionContextImpl):
|
||||||
|
ctx.engine = ctx.connection.engine
|
||||||
|
return listener(ctx)
|
||||||
|
event.listen(engine, "handle_error", wrap_listener)
|
||||||
|
|
||||||
|
|
||||||
|
def _rework_handle_exception_for_events(engine):
|
||||||
|
"""Patch the _handle_dbapi_error() system on Connection.
|
||||||
|
|
||||||
|
This allows the 0.9.7-style handle_error() event to be available on
|
||||||
|
the Connection object.
|
||||||
|
|
||||||
|
"""
|
||||||
|
engine._oslo_handle_error_events = []
|
||||||
|
|
||||||
|
class Connection(engine._connection_cls):
|
||||||
|
def _handle_dbapi_exception(self, e, statement, parameters,
|
||||||
|
cursor, context):
|
||||||
|
|
||||||
|
try:
|
||||||
|
super(Connection, self)._handle_dbapi_exception(
|
||||||
|
e, statement, parameters, cursor, context)
|
||||||
|
except Exception as reraised_exception:
|
||||||
|
# all versions:
|
||||||
|
# _handle_dbapi_exception reraises all DBAPI errors
|
||||||
|
# 0.8 and above:
|
||||||
|
# reraises all errors unconditionally
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
# 0.7.8:
|
||||||
|
# _handle_dbapi_exception does not unconditionally
|
||||||
|
# re-raise
|
||||||
|
reraised_exception = e
|
||||||
|
|
||||||
|
_oslo_handle_error_events = getattr(
|
||||||
|
self.engine,
|
||||||
|
'_oslo_handle_error_events',
|
||||||
|
False)
|
||||||
|
|
||||||
|
newraise = None
|
||||||
|
if _oslo_handle_error_events:
|
||||||
|
if isinstance(reraised_exception,
|
||||||
|
sqla_exc.StatementError):
|
||||||
|
sqlalchemy_exception = reraised_exception
|
||||||
|
original_exception = sqlalchemy_exception.orig
|
||||||
|
self._is_disconnect = is_disconnect = (
|
||||||
|
isinstance(sqlalchemy_exception,
|
||||||
|
sqla_exc.DBAPIError)
|
||||||
|
and sqlalchemy_exception.connection_invalidated)
|
||||||
|
else:
|
||||||
|
sqlalchemy_exception = None
|
||||||
|
original_exception = reraised_exception
|
||||||
|
is_disconnect = False
|
||||||
|
|
||||||
|
# new handle_error event
|
||||||
|
ctx = ExceptionContextImpl(
|
||||||
|
original_exception, sqlalchemy_exception,
|
||||||
|
self.engine, self, cursor, statement,
|
||||||
|
parameters, context, is_disconnect)
|
||||||
|
|
||||||
|
for fn in _oslo_handle_error_events:
|
||||||
|
try:
|
||||||
|
# handler returns an exception;
|
||||||
|
# call next handler in a chain
|
||||||
|
per_fn = fn(ctx)
|
||||||
|
if per_fn is not None:
|
||||||
|
ctx.chained_exception = newraise = per_fn
|
||||||
|
except Exception as _raised:
|
||||||
|
# handler raises an exception - stop processing
|
||||||
|
newraise = _raised
|
||||||
|
break
|
||||||
|
|
||||||
|
if sqlalchemy_exception and \
|
||||||
|
self._is_disconnect != ctx.is_disconnect:
|
||||||
|
|
||||||
|
if not ctx.is_disconnect:
|
||||||
|
raise NotImplementedError(
|
||||||
|
"Can't reset 'disconnect' status of exception "
|
||||||
|
"once it is set with this version of "
|
||||||
|
"SQLAlchemy")
|
||||||
|
|
||||||
|
sqlalchemy_exception.connection_invalidated = \
|
||||||
|
self._is_disconnect = ctx.is_disconnect
|
||||||
|
if self._is_disconnect:
|
||||||
|
self._do_disconnect(e)
|
||||||
|
|
||||||
|
if newraise:
|
||||||
|
six.reraise(type(newraise), newraise, sys.exc_info()[2])
|
||||||
|
else:
|
||||||
|
six.reraise(type(reraised_exception),
|
||||||
|
reraised_exception, sys.exc_info()[2])
|
||||||
|
|
||||||
|
def _do_disconnect(self, e):
|
||||||
|
del self._is_disconnect
|
||||||
|
if utils.sqla_094:
|
||||||
|
dbapi_conn_wrapper = self.connection
|
||||||
|
self.engine.pool._invalidate(dbapi_conn_wrapper, e)
|
||||||
|
self.invalidate(e)
|
||||||
|
else:
|
||||||
|
dbapi_conn_wrapper = self.connection
|
||||||
|
self.invalidate(e)
|
||||||
|
if not hasattr(dbapi_conn_wrapper, '_pool') or \
|
||||||
|
dbapi_conn_wrapper._pool is self.engine.pool:
|
||||||
|
self.engine.dispose()
|
||||||
|
|
||||||
|
engine._connection_cls = Connection
|
||||||
|
|
||||||
|
|
||||||
|
def _rework_connect_and_revalidate_for_events(engine):
|
||||||
|
"""Patch the _revalidate_connection() system on Connection.
|
||||||
|
|
||||||
|
This applies 1.0's _revalidate_connection() approach into an 0.9
|
||||||
|
version of SQLAlchemy, and consists of three steps:
|
||||||
|
|
||||||
|
1. wrap the pool._creator function, which in 0.9 has a local
|
||||||
|
call to sqlalchemy.exc.DBAPIError.instance(), so that this exception is
|
||||||
|
again unwrapped back to the original DBAPI-specific Error, then raise
|
||||||
|
that. This is essentially the same as if the dbapi.connect() isn't
|
||||||
|
wrapped in the first place, which is how SQLAlchemy 1.0 now functions.
|
||||||
|
|
||||||
|
2. patch the Engine object's raw_connection() method. In SQLAlchemy 1.0,
|
||||||
|
this is now where the error wrapping occurs when a pool connect attempt
|
||||||
|
is made. Here, when raw_connection() is called without a hosting
|
||||||
|
Connection, we send exception raises to
|
||||||
|
_handle_dbapi_exception_noconnection(), here copied from SQLAlchemy
|
||||||
|
1.0, which is an alternate version of Connection._handle_dbapi_exception()
|
||||||
|
tailored for an initial connect failure when there is no
|
||||||
|
Connection object being dealt with. This allows the error handler
|
||||||
|
events to be called.
|
||||||
|
|
||||||
|
3. patch the Connection class to follow 1.0's behavior for
|
||||||
|
_revalidate_connection(); here, the call to engine.raw_connection()
|
||||||
|
will pass the raised error to Connection._handle_dbapi_exception(),
|
||||||
|
again allowing error handler events to be called.
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
_orig_connect = engine.pool._creator
|
||||||
|
|
||||||
|
def connect():
|
||||||
|
try:
|
||||||
|
return _orig_connect()
|
||||||
|
except sqla_exc.DBAPIError as err:
|
||||||
|
original_exception = err.orig
|
||||||
|
raise original_exception
|
||||||
|
engine.pool._creator = connect
|
||||||
|
|
||||||
|
self = engine
|
||||||
|
|
||||||
|
def contextual_connect(close_with_result=False, **kwargs):
|
||||||
|
return self._connection_cls(
|
||||||
|
self,
|
||||||
|
self._wrap_pool_connect(self.pool.connect, None),
|
||||||
|
close_with_result=close_with_result,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
|
def _wrap_pool_connect(fn, connection):
|
||||||
|
dialect = self.dialect
|
||||||
|
try:
|
||||||
|
return fn()
|
||||||
|
except dialect.dbapi.Error as e:
|
||||||
|
if connection is None:
|
||||||
|
_handle_dbapi_exception_noconnection(
|
||||||
|
e, dialect, self)
|
||||||
|
else:
|
||||||
|
six.reraise(*sys.exc_info())
|
||||||
|
|
||||||
|
def raw_connection(_connection=None):
|
||||||
|
return self._wrap_pool_connect(
|
||||||
|
self.pool.unique_connection, _connection)
|
||||||
|
|
||||||
|
engine.contextual_connect = contextual_connect
|
||||||
|
engine._wrap_pool_connect = _wrap_pool_connect
|
||||||
|
engine.raw_connection = raw_connection
|
||||||
|
|
||||||
|
class Connection(engine._connection_cls):
|
||||||
|
|
||||||
|
@property
|
||||||
|
def connection(self):
|
||||||
|
"The underlying DB-API connection managed by this Connection."
|
||||||
|
try:
|
||||||
|
return self.__connection
|
||||||
|
except AttributeError:
|
||||||
try:
|
try:
|
||||||
super(Connection, self)._handle_dbapi_exception(
|
return self._revalidate_connection()
|
||||||
e, statement, parameters, cursor, context)
|
except Exception as e:
|
||||||
except Exception as reraised_exception:
|
self._handle_dbapi_exception(e, None, None, None, None)
|
||||||
# all versions:
|
|
||||||
# _handle_dbapi_exception reraises all DBAPI errors
|
|
||||||
# 0.8 and above:
|
|
||||||
# reraises all errors unconditionally
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
# 0.7.8:
|
|
||||||
# _handle_dbapi_exception does not unconditionally
|
|
||||||
# re-raise
|
|
||||||
reraised_exception = e
|
|
||||||
|
|
||||||
_oslo_handle_error_events = getattr(
|
def _handle_dbapi_exception(self,
|
||||||
self.engine,
|
e,
|
||||||
'_oslo_handle_error_events',
|
statement,
|
||||||
False)
|
parameters,
|
||||||
|
cursor,
|
||||||
|
context):
|
||||||
|
if self.invalidated:
|
||||||
|
# 0.9's _handle_dbapi_exception() can't handle
|
||||||
|
# a Connection that is invalidated already, meaning
|
||||||
|
# its "__connection" attribute is not set. So if we are
|
||||||
|
# in that case, call our "no connection" invalidator.
|
||||||
|
# this is fine as we are only supporting handle_error listeners
|
||||||
|
# that are applied at the engine level.
|
||||||
|
_handle_dbapi_exception_noconnection(
|
||||||
|
e, self.dialect, self.engine)
|
||||||
|
else:
|
||||||
|
super(Connection, self)._handle_dbapi_exception(
|
||||||
|
e, statement, parameters, cursor, context)
|
||||||
|
|
||||||
newraise = None
|
def _revalidate_connection(self):
|
||||||
if _oslo_handle_error_events:
|
if self._Connection__can_reconnect and self._Connection__invalid:
|
||||||
if isinstance(reraised_exception,
|
if self._Connection__transaction is not None:
|
||||||
sqla_exc.StatementError):
|
raise sqla_exc.InvalidRequestError(
|
||||||
sqlalchemy_exception = reraised_exception
|
"Can't reconnect until invalid "
|
||||||
original_exception = sqlalchemy_exception.orig
|
"transaction is rolled back")
|
||||||
self._is_disconnect = is_disconnect = (
|
self._Connection__connection = self.engine.raw_connection(
|
||||||
isinstance(sqlalchemy_exception,
|
_connection=self)
|
||||||
sqla_exc.DBAPIError)
|
self._Connection__invalid = False
|
||||||
and sqlalchemy_exception.connection_invalidated)
|
return self._Connection__connection
|
||||||
else:
|
raise sqla_exc.ResourceClosedError("This Connection is closed")
|
||||||
sqlalchemy_exception = None
|
|
||||||
original_exception = reraised_exception
|
|
||||||
is_disconnect = False
|
|
||||||
|
|
||||||
# new handle_error event
|
engine._connection_cls = Connection
|
||||||
ctx = ExceptionContextImpl(
|
|
||||||
original_exception, sqlalchemy_exception,
|
|
||||||
self.engine, self, cursor, statement,
|
|
||||||
parameters, context, is_disconnect)
|
|
||||||
|
|
||||||
for fn in _oslo_handle_error_events:
|
|
||||||
try:
|
|
||||||
# handler returns an exception;
|
|
||||||
# call next handler in a chain
|
|
||||||
per_fn = fn(ctx)
|
|
||||||
if per_fn is not None:
|
|
||||||
ctx.chained_exception = newraise = per_fn
|
|
||||||
except Exception as _raised:
|
|
||||||
# handler raises an exception - stop processing
|
|
||||||
newraise = _raised
|
|
||||||
break
|
|
||||||
|
|
||||||
if sqlalchemy_exception and \
|
def _handle_dbapi_exception_noconnection(e, dialect, engine):
|
||||||
self._is_disconnect != ctx.is_disconnect:
|
|
||||||
|
|
||||||
if not ctx.is_disconnect:
|
exc_info = sys.exc_info()
|
||||||
raise NotImplementedError(
|
|
||||||
"Can't reset 'disconnect' status of exception "
|
|
||||||
"once it is set with this version of "
|
|
||||||
"SQLAlchemy")
|
|
||||||
|
|
||||||
sqlalchemy_exception.connection_invalidated = \
|
is_disconnect = dialect.is_disconnect(e, None, None)
|
||||||
self._is_disconnect = ctx.is_disconnect
|
|
||||||
if self._is_disconnect:
|
|
||||||
self._do_disconnect(e)
|
|
||||||
|
|
||||||
if newraise:
|
should_wrap = isinstance(e, dialect.dbapi.Error)
|
||||||
six.reraise(type(newraise), newraise, sys.exc_info()[2])
|
|
||||||
else:
|
|
||||||
six.reraise(type(reraised_exception),
|
|
||||||
reraised_exception, sys.exc_info()[2])
|
|
||||||
|
|
||||||
def _do_disconnect(self, e):
|
if should_wrap:
|
||||||
del self._is_disconnect
|
sqlalchemy_exception = sqla_exc.DBAPIError.instance(
|
||||||
if utils.sqla_094:
|
None,
|
||||||
dbapi_conn_wrapper = self.connection
|
None,
|
||||||
self.engine.pool._invalidate(dbapi_conn_wrapper, e)
|
e,
|
||||||
self.invalidate(e)
|
dialect.dbapi.Error,
|
||||||
else:
|
connection_invalidated=is_disconnect)
|
||||||
dbapi_conn_wrapper = self.connection
|
else:
|
||||||
self.invalidate(e)
|
sqlalchemy_exception = None
|
||||||
if not hasattr(dbapi_conn_wrapper, '_pool') or \
|
|
||||||
dbapi_conn_wrapper._pool is self.engine.pool:
|
|
||||||
self.engine.dispose()
|
|
||||||
|
|
||||||
engine._connection_cls = Connection
|
newraise = None
|
||||||
engine._oslo_handle_error_events.append(listener)
|
|
||||||
|
ctx = ExceptionContextImpl(
|
||||||
|
e, sqlalchemy_exception, engine, None, None, None,
|
||||||
|
None, None, is_disconnect)
|
||||||
|
|
||||||
|
if hasattr(engine, '_oslo_handle_error_events'):
|
||||||
|
fns = engine._oslo_handle_error_events
|
||||||
|
else:
|
||||||
|
fns = engine.dispatch.handle_error
|
||||||
|
for fn in fns:
|
||||||
|
try:
|
||||||
|
# handler returns an exception;
|
||||||
|
# call next handler in a chain
|
||||||
|
per_fn = fn(ctx)
|
||||||
|
if per_fn is not None:
|
||||||
|
ctx.chained_exception = newraise = per_fn
|
||||||
|
except Exception as _raised:
|
||||||
|
# handler raises an exception - stop processing
|
||||||
|
newraise = _raised
|
||||||
|
break
|
||||||
|
|
||||||
|
if sqlalchemy_exception and \
|
||||||
|
is_disconnect != ctx.is_disconnect:
|
||||||
|
sqlalchemy_exception.connection_invalidated = \
|
||||||
|
is_disconnect = ctx.is_disconnect
|
||||||
|
|
||||||
|
if newraise:
|
||||||
|
six.reraise(type(newraise), newraise, exc_info[2])
|
||||||
|
elif should_wrap:
|
||||||
|
six.reraise(
|
||||||
|
type(sqlalchemy_exception), sqlalchemy_exception, exc_info[2])
|
||||||
|
else:
|
||||||
|
six.reraise(*exc_info)
|
||||||
|
|
||||||
|
|
||||||
class ExceptionContextImpl(object):
|
class ExceptionContextImpl(object):
|
||||||
@ -266,24 +443,3 @@ class ExceptionContextImpl(object):
|
|||||||
:meth:`.ConnectionEvents.handle_error` handler.
|
:meth:`.ConnectionEvents.handle_error` handler.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
@contextlib.contextmanager
|
|
||||||
def handle_connect_context(handler, engine):
|
|
||||||
"""Wrap connect() routines with a "handle error" context."""
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
except Exception as e:
|
|
||||||
if utils.sqla_100:
|
|
||||||
raise
|
|
||||||
|
|
||||||
if isinstance(e, sqla_exc.StatementError):
|
|
||||||
s_exc, orig = e, e.orig
|
|
||||||
else:
|
|
||||||
s_exc, orig = None, e
|
|
||||||
|
|
||||||
ctx = ExceptionContextImpl(
|
|
||||||
orig, s_exc, engine, None, None,
|
|
||||||
None, None, None, False
|
|
||||||
)
|
|
||||||
handler(ctx)
|
|
||||||
|
@ -346,14 +346,10 @@ def register_engine(engine):
|
|||||||
|
|
||||||
|
|
||||||
def handle_connect_error(engine):
|
def handle_connect_error(engine):
|
||||||
"""Handle connect error.
|
"""Connect to the engine, including handle_error handlers.
|
||||||
|
|
||||||
Provide a special context that will allow on-connect errors
|
The compat library now builds this into the engine.connect()
|
||||||
to be treated within the filtering context.
|
system as per SQLAlchemy 1.0's behavior.
|
||||||
|
|
||||||
This routine is dependent on SQLAlchemy version, as version 1.0.0
|
|
||||||
provides this functionality natively.
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
with compat.handle_connect_context(handler, engine):
|
return engine.connect()
|
||||||
return engine.connect()
|
|
||||||
|
@ -576,7 +576,7 @@ def _test_connection(engine, max_retries, retry_interval):
|
|||||||
de_ref = None
|
de_ref = None
|
||||||
for attempt in attempts:
|
for attempt in attempts:
|
||||||
try:
|
try:
|
||||||
return exc_filters.handle_connect_error(engine)
|
return engine.connect()
|
||||||
except exception.DBConnectionError as de:
|
except exception.DBConnectionError as de:
|
||||||
msg = _LW('SQL connection failed. %s attempts left.')
|
msg = _LW('SQL connection failed. %s attempts left.')
|
||||||
LOG.warning(msg, max_retries - attempt)
|
LOG.warning(msg, max_retries - attempt)
|
||||||
|
@ -94,13 +94,16 @@ class TestsExceptionFilter(_SQLAExceptionMatcher, oslo_test_base.BaseTestCase):
|
|||||||
self.engine.connect().close() # initialize
|
self.engine.connect().close() # initialize
|
||||||
|
|
||||||
@contextlib.contextmanager
|
@contextlib.contextmanager
|
||||||
def _dbapi_fixture(self, dialect_name):
|
def _dbapi_fixture(self, dialect_name, is_disconnect=False):
|
||||||
engine = self.engine
|
engine = self.engine
|
||||||
with test_utils.nested(
|
with test_utils.nested(
|
||||||
mock.patch.object(engine.dialect.dbapi,
|
mock.patch.object(engine.dialect.dbapi,
|
||||||
"Error",
|
"Error",
|
||||||
self.Error),
|
self.Error),
|
||||||
mock.patch.object(engine.dialect, "name", dialect_name),
|
mock.patch.object(engine.dialect, "name", dialect_name),
|
||||||
|
mock.patch.object(engine.dialect,
|
||||||
|
"is_disconnect",
|
||||||
|
lambda *args: is_disconnect)
|
||||||
):
|
):
|
||||||
yield
|
yield
|
||||||
|
|
||||||
@ -846,3 +849,94 @@ class TestDBConnectRetry(TestsExceptionFilter):
|
|||||||
self.OperationalError("blah blah -39981 blah blah"),
|
self.OperationalError("blah blah -39981 blah blah"),
|
||||||
2, 3
|
2, 3
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestDBConnectPingWrapping(TestsExceptionFilter):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(TestDBConnectPingWrapping, self).setUp()
|
||||||
|
compat.engine_connect(self.engine, session._connect_ping_listener)
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _fixture(
|
||||||
|
self, dialect_name, exception, good_conn_count,
|
||||||
|
is_disconnect=True):
|
||||||
|
engine = self.engine
|
||||||
|
|
||||||
|
# empty out the connection pool
|
||||||
|
engine.dispose()
|
||||||
|
|
||||||
|
connect_fn = engine.dialect.connect
|
||||||
|
real_do_execute = engine.dialect.do_execute
|
||||||
|
|
||||||
|
counter = itertools.count(1)
|
||||||
|
|
||||||
|
def cant_execute(*arg, **kw):
|
||||||
|
value = next(counter)
|
||||||
|
if value > good_conn_count:
|
||||||
|
raise exception
|
||||||
|
else:
|
||||||
|
return real_do_execute(*arg, **kw)
|
||||||
|
|
||||||
|
def cant_connect(*arg, **kw):
|
||||||
|
value = next(counter)
|
||||||
|
if value > good_conn_count:
|
||||||
|
raise exception
|
||||||
|
else:
|
||||||
|
return connect_fn(*arg, **kw)
|
||||||
|
|
||||||
|
with self._dbapi_fixture(dialect_name, is_disconnect=is_disconnect):
|
||||||
|
with mock.patch.object(engine.dialect, "connect", cant_connect):
|
||||||
|
with mock.patch.object(
|
||||||
|
engine.dialect, "do_execute", cant_execute):
|
||||||
|
yield
|
||||||
|
|
||||||
|
def _test_ping_listener_disconnected(
|
||||||
|
self, dialect_name, exc_obj, is_disconnect=True):
|
||||||
|
with self._fixture(dialect_name, exc_obj, 3, is_disconnect):
|
||||||
|
conn = self.engine.connect()
|
||||||
|
self.assertEqual(conn.scalar(sqla.select([1])), 1)
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
with self._fixture(dialect_name, exc_obj, 1, is_disconnect):
|
||||||
|
self.assertRaises(
|
||||||
|
exception.DBConnectionError,
|
||||||
|
self.engine.connect
|
||||||
|
)
|
||||||
|
self.assertRaises(
|
||||||
|
exception.DBConnectionError,
|
||||||
|
self.engine.connect
|
||||||
|
)
|
||||||
|
self.assertRaises(
|
||||||
|
exception.DBConnectionError,
|
||||||
|
self.engine.connect
|
||||||
|
)
|
||||||
|
|
||||||
|
with self._fixture(dialect_name, exc_obj, 1, is_disconnect):
|
||||||
|
self.assertRaises(
|
||||||
|
exception.DBConnectionError,
|
||||||
|
self.engine.contextual_connect
|
||||||
|
)
|
||||||
|
self.assertRaises(
|
||||||
|
exception.DBConnectionError,
|
||||||
|
self.engine.contextual_connect
|
||||||
|
)
|
||||||
|
self.assertRaises(
|
||||||
|
exception.DBConnectionError,
|
||||||
|
self.engine.contextual_connect
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_mysql_w_disconnect_flag(self):
|
||||||
|
for code in [2002, 2003, 2002]:
|
||||||
|
self._test_ping_listener_disconnected(
|
||||||
|
"mysql",
|
||||||
|
self.OperationalError('%d MySQL server has gone away' % code)
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_mysql_wo_disconnect_flag(self):
|
||||||
|
for code in [2002, 2003]:
|
||||||
|
self._test_ping_listener_disconnected(
|
||||||
|
"mysql",
|
||||||
|
self.OperationalError('%d MySQL server has gone away' % code),
|
||||||
|
is_disconnect=False
|
||||||
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user