diff --git a/oslo_db/sqlalchemy/compat/__init__.py b/oslo_db/sqlalchemy/compat/__init__.py
index b49d5c41..3dc29ac0 100644
--- a/oslo_db/sqlalchemy/compat/__init__.py
+++ b/oslo_db/sqlalchemy/compat/__init__.py
@@ -23,8 +23,6 @@ from oslo_db.sqlalchemy.compat import handle_error as _h_err
 # flake8 won't let me import handle_error directly
 engine_connect = _e_conn.engine_connect
 handle_error = _h_err.handle_error
-handle_connect_context = _h_err.handle_connect_context
 
 __all__ = [
-    'engine_connect', 'handle_error',
-    'handle_connect_context']
+    'engine_connect', 'handle_error']
diff --git a/oslo_db/sqlalchemy/compat/handle_error.py b/oslo_db/sqlalchemy/compat/handle_error.py
index 7e476a0f..5169857a 100644
--- a/oslo_db/sqlalchemy/compat/handle_error.py
+++ b/oslo_db/sqlalchemy/compat/handle_error.py
@@ -16,10 +16,10 @@ http://docs.sqlalchemy.org/en/rel_0_9/core/events.html.
 
 
 """
-import contextlib
 import sys
 
 import six
+from sqlalchemy.engine import base as engine_base
 from sqlalchemy.engine import Engine
 from sqlalchemy import event
 from sqlalchemy import exc as sqla_exc
@@ -39,117 +39,294 @@ def handle_error(engine, listener):
     if utils.sqla_100:
         event.listen(engine, "handle_error", listener)
         return
-    elif utils.sqla_097:
-        # ctx.engine added per
-        # https://bitbucket.org/zzzeek/sqlalchemy/issue/3266/
-        def wrap_listener(ctx):
-            ctx.engine = ctx.connection.engine
-            return listener(ctx)
-        event.listen(engine, "handle_error", wrap_listener)
-        return
 
     assert isinstance(engine, Engine), \
         "engine argument must be an Engine instance, not a Connection"
 
-    # use a Connection-wrapper class to wrap _handle_dbapi_exception.
-    if not getattr(engine._connection_cls,
-                   '_oslo_handle_error_wrapper', False):
-        engine._oslo_handle_error_events = []
+    if not utils.sqla_097:
+        _rework_handle_exception_for_events(engine)
+        engine._oslo_handle_error_events.append(listener)
 
-        class Connection(engine._connection_cls):
-            _oslo_handle_error_wrapper = True
+    _rework_connect_and_revalidate_for_events(engine)
 
-            def _handle_dbapi_exception(self, e, statement, parameters,
-                                        cursor, context):
+    if utils.sqla_097:
+        # ctx.engine added per
+        # https://bitbucket.org/zzzeek/sqlalchemy/issue/3266/
+        def wrap_listener(ctx):
+            if isinstance(ctx, engine_base.ExceptionContextImpl):
+                ctx.engine = ctx.connection.engine
+            return listener(ctx)
+        event.listen(engine, "handle_error", wrap_listener)
 
+
+def _rework_handle_exception_for_events(engine):
+    """Patch the _handle_dbapi_error() system on Connection.
+
+    This allows the 0.9.7-style handle_error() event to be available on
+    the Connection object.
+
+    """
+    engine._oslo_handle_error_events = []
+
+    class Connection(engine._connection_cls):
+        def _handle_dbapi_exception(self, e, statement, parameters,
+                                    cursor, context):
+
+            try:
+                super(Connection, self)._handle_dbapi_exception(
+                    e, statement, parameters, cursor, context)
+            except Exception as reraised_exception:
+                # all versions:
+                #   _handle_dbapi_exception reraises all DBAPI errors
+                # 0.8 and above:
+                #   reraises all errors unconditionally
+                pass
+            else:
+                # 0.7.8:
+                #   _handle_dbapi_exception does not unconditionally
+                #   re-raise
+                reraised_exception = e
+
+            _oslo_handle_error_events = getattr(
+                self.engine,
+                '_oslo_handle_error_events',
+                False)
+
+            newraise = None
+            if _oslo_handle_error_events:
+                if isinstance(reraised_exception,
+                              sqla_exc.StatementError):
+                    sqlalchemy_exception = reraised_exception
+                    original_exception = sqlalchemy_exception.orig
+                    self._is_disconnect = is_disconnect = (
+                        isinstance(sqlalchemy_exception,
+                                   sqla_exc.DBAPIError)
+                        and sqlalchemy_exception.connection_invalidated)
+                else:
+                    sqlalchemy_exception = None
+                    original_exception = reraised_exception
+                    is_disconnect = False
+
+                # new handle_error event
+                ctx = ExceptionContextImpl(
+                    original_exception, sqlalchemy_exception,
+                    self.engine, self, cursor, statement,
+                    parameters, context, is_disconnect)
+
+                for fn in _oslo_handle_error_events:
+                    try:
+                        # handler returns an exception;
+                        # call next handler in a chain
+                        per_fn = fn(ctx)
+                        if per_fn is not None:
+                            ctx.chained_exception = newraise = per_fn
+                    except Exception as _raised:
+                        # handler raises an exception - stop processing
+                        newraise = _raised
+                        break
+
+                if sqlalchemy_exception and \
+                        self._is_disconnect != ctx.is_disconnect:
+
+                    if not ctx.is_disconnect:
+                        raise NotImplementedError(
+                            "Can't reset 'disconnect' status of exception "
+                            "once it is set with this version of "
+                            "SQLAlchemy")
+
+                    sqlalchemy_exception.connection_invalidated = \
+                        self._is_disconnect = ctx.is_disconnect
+                    if self._is_disconnect:
+                        self._do_disconnect(e)
+
+            if newraise:
+                six.reraise(type(newraise), newraise, sys.exc_info()[2])
+            else:
+                six.reraise(type(reraised_exception),
+                            reraised_exception, sys.exc_info()[2])
+
+        def _do_disconnect(self, e):
+            del self._is_disconnect
+            if utils.sqla_094:
+                dbapi_conn_wrapper = self.connection
+                self.engine.pool._invalidate(dbapi_conn_wrapper, e)
+                self.invalidate(e)
+            else:
+                dbapi_conn_wrapper = self.connection
+                self.invalidate(e)
+                if not hasattr(dbapi_conn_wrapper, '_pool') or \
+                        dbapi_conn_wrapper._pool is self.engine.pool:
+                    self.engine.dispose()
+
+    engine._connection_cls = Connection
+
+
+def _rework_connect_and_revalidate_for_events(engine):
+    """Patch the _revalidate_connection() system on Connection.
+
+    This applies 1.0's _revalidate_connection() approach into an 0.9
+    version of SQLAlchemy, and consists of three steps:
+
+    1. wrap the pool._creator function, which in 0.9 has a local
+    call to sqlalchemy.exc.DBAPIError.instance(), so that this exception is
+    again unwrapped back to the original DBAPI-specific Error, then raise
+    that.  This is essentially the same as if the dbapi.connect() isn't
+    wrapped in the first place, which is how SQLAlchemy 1.0 now functions.
+
+    2. patch the Engine object's raw_connection() method.  In SQLAlchemy 1.0,
+    this is now where the error wrapping occurs when a pool connect attempt
+    is made.  Here, when raw_connection() is called without a hosting
+    Connection, we send exception raises to
+    _handle_dbapi_exception_noconnection(), here copied from SQLAlchemy
+    1.0, which is an alternate version of Connection._handle_dbapi_exception()
+    tailored for an initial connect failure when there is no
+    Connection object being dealt with.  This allows the error handler
+    events to be called.
+
+    3. patch the Connection class to follow 1.0's behavior for
+    _revalidate_connection(); here, the call to engine.raw_connection()
+    will pass the raised error to Connection._handle_dbapi_exception(),
+    again allowing error handler events to be called.
+
+    """
+
+    _orig_connect = engine.pool._creator
+
+    def connect():
+        try:
+            return _orig_connect()
+        except sqla_exc.DBAPIError as err:
+            original_exception = err.orig
+            raise original_exception
+    engine.pool._creator = connect
+
+    self = engine
+
+    def contextual_connect(close_with_result=False, **kwargs):
+        return self._connection_cls(
+            self,
+            self._wrap_pool_connect(self.pool.connect, None),
+            close_with_result=close_with_result,
+            **kwargs)
+
+    def _wrap_pool_connect(fn, connection):
+        dialect = self.dialect
+        try:
+            return fn()
+        except dialect.dbapi.Error as e:
+            if connection is None:
+                _handle_dbapi_exception_noconnection(
+                    e, dialect, self)
+            else:
+                six.reraise(*sys.exc_info())
+
+    def raw_connection(_connection=None):
+        return self._wrap_pool_connect(
+            self.pool.unique_connection, _connection)
+
+    engine.contextual_connect = contextual_connect
+    engine._wrap_pool_connect = _wrap_pool_connect
+    engine.raw_connection = raw_connection
+
+    class Connection(engine._connection_cls):
+
+        @property
+        def connection(self):
+            "The underlying DB-API connection managed by this Connection."
+            try:
+                return self.__connection
+            except AttributeError:
                 try:
-                    super(Connection, self)._handle_dbapi_exception(
-                        e, statement, parameters, cursor, context)
-                except Exception as reraised_exception:
-                    # all versions:
-                    #   _handle_dbapi_exception reraises all DBAPI errors
-                    # 0.8 and above:
-                    #   reraises all errors unconditionally
-                    pass
-                else:
-                    # 0.7.8:
-                    #   _handle_dbapi_exception does not unconditionally
-                    #   re-raise
-                    reraised_exception = e
+                    return self._revalidate_connection()
+                except Exception as e:
+                    self._handle_dbapi_exception(e, None, None, None, None)
 
-                _oslo_handle_error_events = getattr(
-                    self.engine,
-                    '_oslo_handle_error_events',
-                    False)
+        def _handle_dbapi_exception(self,
+                                    e,
+                                    statement,
+                                    parameters,
+                                    cursor,
+                                    context):
+            if self.invalidated:
+                # 0.9's _handle_dbapi_exception() can't handle
+                # a Connection that is invalidated already, meaning
+                # its "__connection" attribute is not set.  So if we are
+                # in that case, call our "no connection" invalidator.
+                # this is fine as we are only supporting handle_error listeners
+                # that are applied at the engine level.
+                _handle_dbapi_exception_noconnection(
+                    e, self.dialect, self.engine)
+            else:
+                super(Connection, self)._handle_dbapi_exception(
+                    e, statement, parameters, cursor, context)
 
-                newraise = None
-                if _oslo_handle_error_events:
-                    if isinstance(reraised_exception,
-                                  sqla_exc.StatementError):
-                        sqlalchemy_exception = reraised_exception
-                        original_exception = sqlalchemy_exception.orig
-                        self._is_disconnect = is_disconnect = (
-                            isinstance(sqlalchemy_exception,
-                                       sqla_exc.DBAPIError)
-                            and sqlalchemy_exception.connection_invalidated)
-                    else:
-                        sqlalchemy_exception = None
-                        original_exception = reraised_exception
-                        is_disconnect = False
+        def _revalidate_connection(self):
+            if self._Connection__can_reconnect and self._Connection__invalid:
+                if self._Connection__transaction is not None:
+                    raise sqla_exc.InvalidRequestError(
+                        "Can't reconnect until invalid "
+                        "transaction is rolled back")
+                self._Connection__connection = self.engine.raw_connection(
+                    _connection=self)
+                self._Connection__invalid = False
+                return self._Connection__connection
+            raise sqla_exc.ResourceClosedError("This Connection is closed")
 
-                    # new handle_error event
-                    ctx = ExceptionContextImpl(
-                        original_exception, sqlalchemy_exception,
-                        self.engine, self, cursor, statement,
-                        parameters, context, is_disconnect)
+    engine._connection_cls = Connection
 
-                    for fn in _oslo_handle_error_events:
-                        try:
-                            # handler returns an exception;
-                            # call next handler in a chain
-                            per_fn = fn(ctx)
-                            if per_fn is not None:
-                                ctx.chained_exception = newraise = per_fn
-                        except Exception as _raised:
-                            # handler raises an exception - stop processing
-                            newraise = _raised
-                            break
 
-                    if sqlalchemy_exception and \
-                            self._is_disconnect != ctx.is_disconnect:
+def _handle_dbapi_exception_noconnection(e, dialect, engine):
 
-                        if not ctx.is_disconnect:
-                            raise NotImplementedError(
-                                "Can't reset 'disconnect' status of exception "
-                                "once it is set with this version of "
-                                "SQLAlchemy")
+    exc_info = sys.exc_info()
 
-                        sqlalchemy_exception.connection_invalidated = \
-                            self._is_disconnect = ctx.is_disconnect
-                        if self._is_disconnect:
-                            self._do_disconnect(e)
+    is_disconnect = dialect.is_disconnect(e, None, None)
 
-                if newraise:
-                    six.reraise(type(newraise), newraise, sys.exc_info()[2])
-                else:
-                    six.reraise(type(reraised_exception),
-                                reraised_exception, sys.exc_info()[2])
+    should_wrap = isinstance(e, dialect.dbapi.Error)
 
-            def _do_disconnect(self, e):
-                del self._is_disconnect
-                if utils.sqla_094:
-                    dbapi_conn_wrapper = self.connection
-                    self.engine.pool._invalidate(dbapi_conn_wrapper, e)
-                    self.invalidate(e)
-                else:
-                    dbapi_conn_wrapper = self.connection
-                    self.invalidate(e)
-                    if not hasattr(dbapi_conn_wrapper, '_pool') or \
-                            dbapi_conn_wrapper._pool is self.engine.pool:
-                        self.engine.dispose()
+    if should_wrap:
+        sqlalchemy_exception = sqla_exc.DBAPIError.instance(
+            None,
+            None,
+            e,
+            dialect.dbapi.Error,
+            connection_invalidated=is_disconnect)
+    else:
+        sqlalchemy_exception = None
 
-        engine._connection_cls = Connection
-    engine._oslo_handle_error_events.append(listener)
+    newraise = None
+
+    ctx = ExceptionContextImpl(
+        e, sqlalchemy_exception, engine, None, None, None,
+        None, None, is_disconnect)
+
+    if hasattr(engine, '_oslo_handle_error_events'):
+        fns = engine._oslo_handle_error_events
+    else:
+        fns = engine.dispatch.handle_error
+    for fn in fns:
+        try:
+            # handler returns an exception;
+            # call next handler in a chain
+            per_fn = fn(ctx)
+            if per_fn is not None:
+                ctx.chained_exception = newraise = per_fn
+        except Exception as _raised:
+            # handler raises an exception - stop processing
+            newraise = _raised
+            break
+
+    if sqlalchemy_exception and \
+            is_disconnect != ctx.is_disconnect:
+        sqlalchemy_exception.connection_invalidated = \
+            is_disconnect = ctx.is_disconnect
+
+    if newraise:
+        six.reraise(type(newraise), newraise, exc_info[2])
+    elif should_wrap:
+        six.reraise(
+            type(sqlalchemy_exception), sqlalchemy_exception, exc_info[2])
+    else:
+        six.reraise(*exc_info)
 
 
 class ExceptionContextImpl(object):
@@ -266,24 +443,3 @@ class ExceptionContextImpl(object):
     :meth:`.ConnectionEvents.handle_error` handler.
 
     """
-
-
-@contextlib.contextmanager
-def handle_connect_context(handler, engine):
-    """Wrap connect() routines with a "handle error" context."""
-    try:
-        yield
-    except Exception as e:
-        if utils.sqla_100:
-            raise
-
-        if isinstance(e, sqla_exc.StatementError):
-            s_exc, orig = e, e.orig
-        else:
-            s_exc, orig = None, e
-
-        ctx = ExceptionContextImpl(
-            orig, s_exc, engine, None, None,
-            None, None, None, False
-        )
-        handler(ctx)
diff --git a/oslo_db/sqlalchemy/exc_filters.py b/oslo_db/sqlalchemy/exc_filters.py
index aa5b55a7..5df1eb74 100644
--- a/oslo_db/sqlalchemy/exc_filters.py
+++ b/oslo_db/sqlalchemy/exc_filters.py
@@ -346,14 +346,10 @@ def register_engine(engine):
 
 
 def handle_connect_error(engine):
-    """Handle connect error.
+    """Connect to the engine, including handle_error handlers.
 
-    Provide a special context that will allow on-connect errors
-    to be treated within the filtering context.
-
-    This routine is dependent on SQLAlchemy version, as version 1.0.0
-    provides this functionality natively.
+    The compat library now builds this into the engine.connect()
+    system as per SQLAlchemy 1.0's behavior.
 
     """
-    with compat.handle_connect_context(handler, engine):
-        return engine.connect()
+    return engine.connect()
diff --git a/oslo_db/sqlalchemy/session.py b/oslo_db/sqlalchemy/session.py
index 24bf31d4..ef253b9f 100644
--- a/oslo_db/sqlalchemy/session.py
+++ b/oslo_db/sqlalchemy/session.py
@@ -576,7 +576,7 @@ def _test_connection(engine, max_retries, retry_interval):
     de_ref = None
     for attempt in attempts:
         try:
-            return exc_filters.handle_connect_error(engine)
+            return engine.connect()
         except exception.DBConnectionError as de:
             msg = _LW('SQL connection failed. %s attempts left.')
             LOG.warning(msg, max_retries - attempt)
diff --git a/oslo_db/tests/sqlalchemy/test_exc_filters.py b/oslo_db/tests/sqlalchemy/test_exc_filters.py
index b2845a22..edab9d61 100644
--- a/oslo_db/tests/sqlalchemy/test_exc_filters.py
+++ b/oslo_db/tests/sqlalchemy/test_exc_filters.py
@@ -94,13 +94,16 @@ class TestsExceptionFilter(_SQLAExceptionMatcher, oslo_test_base.BaseTestCase):
         self.engine.connect().close()  # initialize
 
     @contextlib.contextmanager
-    def _dbapi_fixture(self, dialect_name):
+    def _dbapi_fixture(self, dialect_name, is_disconnect=False):
         engine = self.engine
         with test_utils.nested(
             mock.patch.object(engine.dialect.dbapi,
                               "Error",
                               self.Error),
             mock.patch.object(engine.dialect, "name", dialect_name),
+            mock.patch.object(engine.dialect,
+                              "is_disconnect",
+                              lambda *args: is_disconnect)
         ):
             yield
 
@@ -846,3 +849,94 @@ class TestDBConnectRetry(TestsExceptionFilter):
             self.OperationalError("blah blah -39981 blah blah"),
             2, 3
         )
+
+
+class TestDBConnectPingWrapping(TestsExceptionFilter):
+
+    def setUp(self):
+        super(TestDBConnectPingWrapping, self).setUp()
+        compat.engine_connect(self.engine, session._connect_ping_listener)
+
+    @contextlib.contextmanager
+    def _fixture(
+            self, dialect_name, exception, good_conn_count,
+            is_disconnect=True):
+        engine = self.engine
+
+        # empty out the connection pool
+        engine.dispose()
+
+        connect_fn = engine.dialect.connect
+        real_do_execute = engine.dialect.do_execute
+
+        counter = itertools.count(1)
+
+        def cant_execute(*arg, **kw):
+            value = next(counter)
+            if value > good_conn_count:
+                raise exception
+            else:
+                return real_do_execute(*arg, **kw)
+
+        def cant_connect(*arg, **kw):
+            value = next(counter)
+            if value > good_conn_count:
+                raise exception
+            else:
+                return connect_fn(*arg, **kw)
+
+        with self._dbapi_fixture(dialect_name, is_disconnect=is_disconnect):
+            with mock.patch.object(engine.dialect, "connect", cant_connect):
+                with mock.patch.object(
+                        engine.dialect, "do_execute", cant_execute):
+                    yield
+
+    def _test_ping_listener_disconnected(
+            self, dialect_name, exc_obj, is_disconnect=True):
+        with self._fixture(dialect_name, exc_obj, 3, is_disconnect):
+            conn = self.engine.connect()
+            self.assertEqual(conn.scalar(sqla.select([1])), 1)
+            conn.close()
+
+        with self._fixture(dialect_name, exc_obj, 1, is_disconnect):
+            self.assertRaises(
+                exception.DBConnectionError,
+                self.engine.connect
+            )
+            self.assertRaises(
+                exception.DBConnectionError,
+                self.engine.connect
+            )
+            self.assertRaises(
+                exception.DBConnectionError,
+                self.engine.connect
+            )
+
+        with self._fixture(dialect_name, exc_obj, 1, is_disconnect):
+            self.assertRaises(
+                exception.DBConnectionError,
+                self.engine.contextual_connect
+            )
+            self.assertRaises(
+                exception.DBConnectionError,
+                self.engine.contextual_connect
+            )
+            self.assertRaises(
+                exception.DBConnectionError,
+                self.engine.contextual_connect
+            )
+
+    def test_mysql_w_disconnect_flag(self):
+        for code in [2002, 2003, 2002]:
+            self._test_ping_listener_disconnected(
+                "mysql",
+                self.OperationalError('%d MySQL server has gone away' % code)
+            )
+
+    def test_mysql_wo_disconnect_flag(self):
+        for code in [2002, 2003]:
+            self._test_ping_listener_disconnected(
+                "mysql",
+                self.OperationalError('%d MySQL server has gone away' % code),
+                is_disconnect=False
+            )