diff --git a/lower-constraints.txt b/lower-constraints.txt
index 6d56bc26d..aa5361ee1 100644
--- a/lower-constraints.txt
+++ b/lower-constraints.txt
@@ -52,9 +52,11 @@ oslo.service==1.24.0
 oslo.utils==3.33.0
 oslo.versionedobjects===1.31.2
 oslotest==3.2.0
+osprofiler===1.4.0
 Paste==2.0.2
 PasteDeploy==1.5.0
 pbr==2.0.0
+pecan===1.0.0
 pep8==1.5.7
 pika==0.10.0
 pika-pool==0.1.3
diff --git a/neutron_lib/db/api.py b/neutron_lib/db/api.py
index 4ee17001e..26f0b278a 100644
--- a/neutron_lib/db/api.py
+++ b/neutron_lib/db/api.py
@@ -10,9 +10,35 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from oslo_concurrency import lockutils
-from oslo_db.sqlalchemy import enginefacade
+import contextlib
+import copy
+import weakref
 
+from oslo_concurrency import lockutils
+from oslo_config import cfg
+from oslo_db import api as oslo_db_api
+from oslo_db import exception as db_exc
+from oslo_db.sqlalchemy import enginefacade
+from oslo_log import log as logging
+from oslo_utils import excutils
+from osprofiler import opts as profiler_opts
+import osprofiler.sqlalchemy
+from pecan import util as p_util
+import six
+import sqlalchemy
+from sqlalchemy import event  # noqa
+from sqlalchemy import exc as sql_exc
+from sqlalchemy import orm
+from sqlalchemy.orm import exc
+
+from neutron_lib._i18n import _
+from neutron_lib import exceptions
+from neutron_lib.objects import exceptions as obj_exc
+
+
+MAX_RETRIES = 10
+OSPROFILER_TRACE_NAMES = {'neutron.db', 'neutron_lib.db'}
+LOG = logging.getLogger(__name__)
 _synchronized = lockutils.synchronized_with_prefix("neutron-")
 _CTX_MANAGER = None
 
@@ -38,6 +64,22 @@ def get_context_manager():
     return _CTX_MANAGER
 
 
+def _set_hook(engine):
+    if (profiler_opts.is_trace_enabled() and
+            profiler_opts.is_db_trace_enabled()):
+        for trace_name in OSPROFILER_TRACE_NAMES:
+            osprofiler.sqlalchemy.add_tracing(
+                sqlalchemy, engine, trace_name)
+
+
+# TODO(ihrachys) the hook assumes options defined by osprofiler, and the only
+# public function that is provided by osprofiler that will register them is
+# set_defaults, that's why we call it here even though we don't need to change
+# defaults
+profiler_opts.set_defaults(cfg.CONF)
+get_context_manager().append_on_engine_create(_set_hook)
+
+
 def get_reader_session():
     """Helper to get reader session.
 
@@ -52,3 +94,255 @@ def get_writer_session():
     :returns: The writer session.
     """
     return get_context_manager().writer.get_sessionmaker()()
+
+
+def _is_nested_instance(e, etypes):
+    """Check if exception or its inner excepts are an instance of etypes."""
+    if isinstance(e, etypes):
+        return True
+    if isinstance(e, exceptions.MultipleExceptions):
+        return any(_is_nested_instance(i, etypes) for i in e.inner_exceptions)
+    if isinstance(e, db_exc.DBError):
+        return _is_nested_instance(e.inner_exception, etypes)
+    return False
+
+
+def is_retriable(e):
+    """Determine if the exception is retriable.
+
+    :param e: The exception to check.
+    :returns: True if e is retriable and False otherwise.
+    """
+    if getattr(e, '_RETRY_EXCEEDED', False):
+        return False
+    if _is_nested_instance(e, (db_exc.DBDeadlock, exc.StaleDataError,
+                               db_exc.DBConnectionError,
+                               db_exc.DBDuplicateEntry, db_exc.RetryRequest,
+                               obj_exc.NeutronDbObjectDuplicateEntry)):
+        return True
+    # looking savepoints mangled by deadlocks. see bug/1590298 for details.
+    return _is_nested_instance(e, db_exc.DBError) and '1305' in str(e)
+
+
+def _tag_retriables_as_unretriable(f):
+    """Puts a flag on retriable exceptions so is_retriable returns False.
+
+    This decorator can be used outside of a retry decorator to prevent
+    decorators higher up from retrying again.
+    """
+    @six.wraps(f)
+    def wrapped(*args, **kwargs):
+        try:
+            return f(*args, **kwargs)
+        except Exception as e:
+            with excutils.save_and_reraise_exception():
+                if is_retriable(e):
+                    setattr(e, '_RETRY_EXCEEDED', True)
+    return wrapped
+
+
+def _copy_if_lds(item):
+    """Deepcopy lists/dicts/sets, leave everything else alone."""
+    return copy.deepcopy(item) if isinstance(item, (list, dict, set)) else item
+
+
+_retry_db_errors = oslo_db_api.wrap_db_retry(
+    max_retries=MAX_RETRIES,
+    retry_interval=0.1,
+    inc_retry_interval=True,
+    exception_checker=is_retriable
+)
+
+
+def retry_db_errors(f):
+    """Nesting-safe retry decorator with auto-arg-copy and logging.
+
+    Retry decorator for all functions which do not accept a context as an
+    argument. If the function accepts a context, use
+    'retry_if_session_inactive' below.
+
+    If retriable errors are retried and exceed the count, they will be tagged
+    with a flag so is_retriable will no longer recognize them as retriable.
+    This prevents multiple applications of this decorator (and/or the one
+    below) from retrying the same exception.
+    """
+
+    @_tag_retriables_as_unretriable
+    @_retry_db_errors
+    @six.wraps(f)
+    def wrapped(*args, **kwargs):
+        try:
+            # copy mutable args and kwargs to make retries safe. this doesn't
+            # prevent mutations of complex objects like the context or 'self'
+            dup_args = [_copy_if_lds(a) for a in args]
+            dup_kwargs = {k: _copy_if_lds(v) for k, v in kwargs.items()}
+            return f(*dup_args, **dup_kwargs)
+        except Exception as e:
+            with excutils.save_and_reraise_exception():
+                if is_retriable(e):
+                    LOG.debug("Retry wrapper got retriable exception: %s", e)
+    return wrapped
+
+
+@contextlib.contextmanager
+def autonested_transaction(sess):
+    """This is a convenience context to not bother with 'nested' parameter.
+
+    :param sess: The database session.
+    :returns: Yields the context transaction from sess.
+    """
+    if sess.is_active:
+        session_context = sess.begin(nested=True)
+    else:
+        session_context = sess.begin(subtransactions=True)
+    with session_context as tx:
+        yield tx
+
+
+def retry_if_session_inactive(context_var_name='context'):
+    """Retries only if the session in the context is inactive.
+
+    Calls a retry_db_errors wrapped version of the function if the context's
+    session passed in is inactive, otherwise it just calls the function
+    directly. This is useful to avoid retrying things inside of a transaction
+    which is ineffective for DB races/errors.
+
+    This should be used in all cases where retries are desired and the method
+    accepts a context.
+    """
+    def decorator(f):
+        try:
+            # NOTE(kevinbenton): we use pecan's util function here because it
+            # deals with the horrors of finding args of already decorated
+            # functions
+            ctx_arg_index = p_util.getargspec(f).args.index(context_var_name)
+        except ValueError:
+            msg = _("Could not find position of var %s") % context_var_name
+            raise RuntimeError(msg)
+        f_with_retry = retry_db_errors(f)
+
+        @six.wraps(f)
+        def wrapped(*args, **kwargs):
+            # only use retry wrapper if we aren't nested in an active
+            # transaction
+            if context_var_name in kwargs:
+                context = kwargs[context_var_name]
+            else:
+                context = args[ctx_arg_index]
+            method = f if context.session.is_active else f_with_retry
+            return method(*args, **kwargs)
+        return wrapped
+    return decorator
+
+
+@contextlib.contextmanager
+def exc_to_retry(etypes):
+    """Contextually reraise Exceptions as a RetryRequests.
+
+    :param etypes: The class type to check the exception for.
+    :returns: None
+    :raises: A RetryRequest if any exception is caught in the context
+        is a nested instance of etypes.
+    """
+    try:
+        yield
+    except Exception as e:
+        with excutils.save_and_reraise_exception() as ctx:
+            if _is_nested_instance(e, etypes):
+                ctx.reraise = False
+                raise db_exc.RetryRequest(e)
+
+
+# for convenient access as decorators
+CONTEXT_READER = get_context_manager().reader
+CONTEXT_WRITER = get_context_manager().writer
+
+_REGISTERED_SQLA_EVENTS = []
+
+
+def sqla_listen(*args):
+    """Wrapper to track subscribers for test teardowns.
+
+    SQLAlchemy has no "unsubscribe all" option for its event listener
+    framework so we need to keep track of the subscribers by having
+    them call through here for test teardowns.
+
+    :param args: The arguments to pass onto the listen call.
+    :returns: None
+    """
+    event.listen(*args)
+    _REGISTERED_SQLA_EVENTS.append(args)
+
+
+def sqla_remove(*args):
+    """Remove SQLA listeners.
+
+    :param args: The args to pass onto remove.
+    :returns: None.
+    """
+    event.remove(*args)
+    _REGISTERED_SQLA_EVENTS.remove(args)
+
+
+def sqla_remove_all():
+    """Removes all SQLA listeners.
+
+    :returns: None.
+    """
+    for args in _REGISTERED_SQLA_EVENTS:
+        try:
+            event.remove(*args)
+        except sql_exc.InvalidRequestError:
+            # already removed
+            pass
+    del _REGISTERED_SQLA_EVENTS[:]
+
+
+@event.listens_for(orm.session.Session, "after_flush")
+def _add_to_rel_load_list(session, flush_context=None):
+    # keep track of new items to load relationships on during commit
+    session.info.setdefault('_load_rels', weakref.WeakSet()).update(
+        session.new)
+
+
+@event.listens_for(orm.session.Session, "before_commit")
+def _load_one_to_manys(session):
+    # TODO(kevinbenton): we should be able to remove this after we
+    # have eliminated all places where related objects are constructed
+    # using a key rather than a relationship.
+
+    # capture any new objects
+    if session.new:
+        session.flush()
+
+    if session.transaction.nested:
+        # wait until final commit
+        return
+
+    for new_object in session.info.pop('_load_rels', []):
+        if new_object not in session:
+            # don't load detached objects because that brings them back into
+            # session
+            continue
+        state = sqlalchemy.inspect(new_object)
+
+        # set up relationship loading so that we can call lazy
+        # loaders on the object even though the ".key" is not set up yet
+        # (normally happens by in after_flush_postexec, but we're trying
+        # to do this more succinctly).  in this context this is only
+        # setting a simple flag on the object's state.
+        session.enable_relationship_loading(new_object)
+
+        # look for eager relationships and do normal load.
+        # For relationships where the related object is also
+        # in the session these lazy loads will pull from the
+        # identity map and not emit SELECT.  Otherwise, we are still
+        # local in the transaction so a normal SELECT load will work fine.
+        for relationship_attr in state.mapper.relationships:
+            if relationship_attr.lazy not in ('joined', 'subquery'):
+                # we only want to automatically load relationships that would
+                # automatically load during a lookup operation
+                continue
+            if relationship_attr.key not in state.dict:
+                getattr(new_object, relationship_attr.key)
+                assert relationship_attr.key in state.dict
diff --git a/neutron_lib/fixture.py b/neutron_lib/fixture.py
index 4faa45802..76639361b 100644
--- a/neutron_lib/fixture.py
+++ b/neutron_lib/fixture.py
@@ -198,3 +198,37 @@ class PlacementAPIClientFixture(fixtures.Fixture):
         self._mock_post.stop()
         self._mock_put.stop()
         self._mock_delete.stop()
+
+
+class DBRetryErrorsFixture(fixtures.Fixture):
+
+    def __init__(self, **retry_kwargs):
+        self._retry_kwargs = retry_kwargs
+        self._patchers = []
+
+    def _setUp(self):
+        for k, v in self._retry_kwargs.items():
+            patcher = mock.patch.object(db_api._retry_db_errors, k, new=v)
+            patcher.start()
+            self._patchers.append(patcher)
+        self.addCleanup(self._restore)
+
+    def _restore(self):
+        for p in self._patchers:
+            p.stop()
+
+
+class DBAPIContextManagerFixture(fixtures.Fixture):
+
+    def __init__(self, mock_context_manager=mock.ANY):
+        self.cxt_manager = (mock.Mock() if mock_context_manager == mock.ANY
+                            else mock_context_manager)
+        self._backup_mgr = None
+
+    def _setUp(self):
+        self._backup_mgr = db_api._CTX_MANAGER
+        db_api._CTX_MANAGER = self.cxt_manager
+        self.addCleanup(self._restore)
+
+    def _restore(self):
+        db_api._CTX_MANAGER = self._backup_mgr
diff --git a/neutron_lib/tests/unit/db/test_api.py b/neutron_lib/tests/unit/db/test_api.py
new file mode 100644
index 000000000..3b38bc37b
--- /dev/null
+++ b/neutron_lib/tests/unit/db/test_api.py
@@ -0,0 +1,186 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+from oslo_db import exception as db_exc
+import osprofiler
+import sqlalchemy
+from sqlalchemy.orm import exc
+import testtools
+
+from neutron_lib.db import api as db_api
+from neutron_lib import exceptions
+from neutron_lib import fixture
+from neutron_lib.tests import _base
+
+
+class TestExceptionToRetryContextManager(_base.BaseTestCase):
+
+    def test_translates_single_exception(self):
+        with testtools.ExpectedException(db_exc.RetryRequest):
+            with db_api.exc_to_retry(ValueError):
+                raise ValueError()
+
+    def test_translates_multiple_exception_types(self):
+        with testtools.ExpectedException(db_exc.RetryRequest):
+            with db_api.exc_to_retry((ValueError, TypeError)):
+                raise TypeError()
+
+    def test_translates_DBerror_inner_exception(self):
+        with testtools.ExpectedException(db_exc.RetryRequest):
+            with db_api.exc_to_retry(ValueError):
+                raise db_exc.DBError(ValueError())
+
+    def test_passes_other_exceptions(self):
+        with testtools.ExpectedException(ValueError):
+            with db_api.exc_to_retry(TypeError):
+                raise ValueError()
+
+    def test_inner_exception_preserved_in_retryrequest(self):
+        try:
+            exc = ValueError('test')
+            with db_api.exc_to_retry(ValueError):
+                raise exc
+        except db_exc.RetryRequest as e:
+            self.assertEqual(exc, e.inner_exc)
+
+    def test_retries_on_multi_exception_containing_target(self):
+        with testtools.ExpectedException(db_exc.RetryRequest):
+            with db_api.exc_to_retry(ValueError):
+                e = exceptions.MultipleExceptions([ValueError(), TypeError()])
+                raise e
+
+
+class TestDeadLockDecorator(_base.BaseTestCase):
+
+    @db_api.retry_db_errors
+    def _decorated_function(self, fail_count, exc_to_raise):
+        self.fail_count = getattr(self, 'fail_count', fail_count + 1) - 1
+        if self.fail_count:
+            raise exc_to_raise
+
+    def test_regular_exception_excluded(self):
+        with testtools.ExpectedException(ValueError):
+            self._decorated_function(1, ValueError)
+
+    def test_staledata_error_caught(self):
+        e = exc.StaleDataError()
+        self.assertIsNone(self._decorated_function(1, e))
+
+    def test_dbconnection_error_caught(self):
+        e = db_exc.DBConnectionError()
+        self.assertIsNone(self._decorated_function(1, e))
+
+    def test_multi_exception_contains_retry(self):
+        e = exceptions.MultipleExceptions(
+            [ValueError(), db_exc.RetryRequest(TypeError())])
+        self.assertIsNone(self._decorated_function(1, e))
+
+    def test_multi_exception_contains_deadlock(self):
+        e = exceptions.MultipleExceptions([ValueError(), db_exc.DBDeadlock()])
+        self.assertIsNone(self._decorated_function(1, e))
+
+    def test_multi_nested_exception_contains_deadlock(self):
+        i = exceptions.MultipleExceptions([ValueError(), db_exc.DBDeadlock()])
+        e = exceptions.MultipleExceptions([ValueError(), i])
+        self.assertIsNone(self._decorated_function(1, e))
+
+    def test_multi_exception_raised_on_exceed(self):
+        # limit retries so this doesn't take 40 seconds
+        retry_fixture = fixture.DBRetryErrorsFixture(max_retries=2)
+        retry_fixture.setUp()
+        e = exceptions.MultipleExceptions([ValueError(), db_exc.DBDeadlock()])
+        with testtools.ExpectedException(exceptions.MultipleExceptions):
+            self._decorated_function(db_api.MAX_RETRIES + 1, e)
+        retry_fixture.cleanUp()
+
+    def test_mysql_savepoint_error(self):
+        e = db_exc.DBError("(pymysql.err.InternalError) (1305, u'SAVEPOINT "
+                           "sa_savepoint_1 does not exist')")
+        self.assertIsNone(self._decorated_function(1, e))
+
+    @db_api.retry_if_session_inactive('alt_context')
+    def _alt_context_function(self, alt_context, *args, **kwargs):
+        return self._decorated_function(*args, **kwargs)
+
+    @db_api.retry_if_session_inactive()
+    def _context_function(self, context, list_arg, dict_arg,
+                          fail_count, exc_to_raise):
+        list_arg.append(1)
+        dict_arg[max(dict_arg.keys()) + 1] = True
+        self.fail_count = getattr(self, 'fail_count', fail_count + 1) - 1
+        if self.fail_count:
+            raise exc_to_raise
+        return list_arg, dict_arg
+
+    def test_stacked_retries_dont_explode_retry_count(self):
+        context = mock.Mock()
+        context.session.is_active = False
+        e = db_exc.DBConnectionError()
+        mock.patch('time.sleep').start()
+        with testtools.ExpectedException(db_exc.DBConnectionError):
+            # after 10 failures, the inner retry should give up and
+            # the exception should be tagged to prevent the outer retry
+            self._alt_context_function(context, 11, e)
+
+    def test_retry_if_session_inactive_args_not_mutated_after_retries(self):
+        context = mock.Mock()
+        context.session.is_active = False
+        list_arg = [1, 2, 3, 4]
+        dict_arg = {1: 'a', 2: 'b'}
+        l, d = self._context_function(context, list_arg, dict_arg,
+                                      5, db_exc.DBDeadlock())
+        # even though we had 5 failures the list and dict should only
+        # be mutated once
+        self.assertEqual(5, len(l))
+        self.assertEqual(3, len(d))
+
+    def test_retry_if_session_inactive_kwargs_not_mutated_after_retries(self):
+        context = mock.Mock()
+        context.session.is_active = False
+        list_arg = [1, 2, 3, 4]
+        dict_arg = {1: 'a', 2: 'b'}
+        l, d = self._context_function(context, list_arg=list_arg,
+                                      dict_arg=dict_arg,
+                                      fail_count=5,
+                                      exc_to_raise=db_exc.DBDeadlock())
+        # even though we had 5 failures the list and dict should only
+        # be mutated once
+        self.assertEqual(5, len(l))
+        self.assertEqual(3, len(d))
+
+    def test_retry_if_session_inactive_no_retry_in_active_session(self):
+        context = mock.Mock()
+        context.session.is_active = True
+        with testtools.ExpectedException(db_exc.DBDeadlock):
+            # retry decorator should have no effect in an active session
+            self._context_function(context, [], {1: 2},
+                                   fail_count=1,
+                                   exc_to_raise=db_exc.DBDeadlock())
+
+
+class TestDBProfiler(_base.BaseTestCase):
+
+    @mock.patch.object(osprofiler.opts, 'is_trace_enabled',
+                       return_value=True)
+    @mock.patch.object(osprofiler.opts, 'is_db_trace_enabled',
+                       return_value=True)
+    def test_set_hook(self, _mock_dbt, _mock_t):
+        with mock.patch.object(
+                osprofiler.sqlalchemy, 'add_tracing') as add_tracing:
+            engine_mock = mock.Mock()
+            db_api._set_hook(engine_mock)
+            self.assertEqual(2, len(add_tracing.mock_calls))
+            expected_calls = [mock.call(sqlalchemy, mock.ANY, n)
+                              for n in db_api.OSPROFILER_TRACE_NAMES]
+            self.assertEqual(expected_calls, add_tracing.mock_calls)
diff --git a/releasenotes/notes/rehome-db-api-63300ddab6a41e28.yaml b/releasenotes/notes/rehome-db-api-63300ddab6a41e28.yaml
new file mode 100644
index 000000000..ae60c1ee9
--- /dev/null
+++ b/releasenotes/notes/rehome-db-api-63300ddab6a41e28.yaml
@@ -0,0 +1,15 @@
+---
+features:
+  - The public APIs from ``neutron.db.api`` are now available in
+    the ``neutron_lib.db.api`` module.
+  - The ``CONTEXT_READER`` and ``CONTEXT_WRITER`` global database contexts
+    are available in ``neutron_lib.db.api`` for convenient access as
+    decorators.
+  - The ``DBRetryErrorsFixture`` and ``DBAPIContextManagerFixture`` test
+    fixtures are now available in ``neutron_lib.fixture`` allowing consumers
+    to patch out retry error values and the gobal context manager.
+upgrade:
+  - Consumers using the global ``context_manager`` from ``neutron.db.api``
+    should now use the ``get_context_manager()`` function in
+    the ``neutron_lib.db.api`` module or the global ``CONTEXT_READER``
+    and ``CONTEXT_WRITER`` if needed.
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index 2030e9c5b..266fa9330 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -5,7 +5,9 @@
 pbr!=2.1.0,>=2.0.0 # Apache-2.0
 
 SQLAlchemy!=1.1.5,!=1.1.6,!=1.1.7,!=1.1.8,>=1.0.10 # MIT
+pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD
 keystoneauth1>=3.4.0 # Apache-2.0
+six>=1.10.0 # MIT
 stevedore>=1.20.0 # Apache-2.0
 oslo.concurrency>=3.26.0 # Apache-2.0
 oslo.config>=5.2.0 # Apache-2.0
@@ -19,4 +21,5 @@ oslo.serialization!=2.19.1,>=2.18.0 # Apache-2.0
 oslo.service!=1.28.1,>=1.24.0 # Apache-2.0
 oslo.utils>=3.33.0 # Apache-2.0
 oslo.versionedobjects>=1.31.2 # Apache-2.0
+osprofiler>=1.4.0 # Apache-2.0
 WebOb>=1.7.1 # MIT