mistral/mistral/db/utils.py

223 lines
7.4 KiB
Python

# Copyright 2016 - Nokia Networks
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from cachetools import keys as cachetools_keys
import decorator
import functools
import inspect
from alembic import op
from sqlalchemy import exc as sqla_exc
from sqlalchemy import inspect as ins
from oslo_db import exception as db_exc
from oslo_log import log as logging
import tenacity
from mistral import context
from mistral.db.sqlalchemy import base as db_base
from mistral import exceptions as exc
from mistral.services import security
from mistral_lib import utils as ml_utils
LOG = logging.getLogger(__name__)
_RETRY_ERRORS = (
db_exc.DBDeadlock,
db_exc.DBConnectionError,
sqla_exc.OperationalError
)
def _with_auth_context(auth_ctx, func, *args, **kw):
"""Runs the given function with the specified auth context.
:param auth_ctx: Authentication context.
:param func: Function to run with the specified auth context.
:param args: Function positional arguments.
:param kw: Function keyword arguments.
:return: Function result.
"""
old_auth_ctx = context.ctx() if context.has_ctx() else None
context.set_ctx(auth_ctx)
try:
return func(*args, **kw)
except Exception as e:
# Note (rakhmerov): In case of "Too many connections" error from the
# database it doesn't get wrapped with a SQLAlchemy exception for some
# reason so we have to check the exception message explicitly.
if isinstance(e, _RETRY_ERRORS) or 'Too many connections' in str(e):
LOG.exception(
"DB error detected, operation will be retried: %s", func
)
raise
finally:
context.set_ctx(old_auth_ctx)
def retry_on_db_error(func, retry=None):
"""Decorates the given function so that it retries on DB errors.
Note that the decorator retries the function/method only on some
of the DB errors that are considered to be worth retrying, like
deadlocks and disconnections.
:param func: Function to decorate.
:param retry: a Retrying object
:return: Decorated function.
"""
if not retry:
retry = tenacity.Retrying(
retry=(
tenacity.retry_if_exception_type(_RETRY_ERRORS) |
tenacity.retry_if_exception_message(
match='Too many connections'
)
),
stop=tenacity.stop_after_attempt(50),
wait=tenacity.wait_incrementing(start=0, increment=0.1, max=2)
)
# The `assigned` arg should be empty as some of the default values are not
# supported by simply initialized MagicMocks. The consequence may
# be that the representation will contain the wrapper and not the
# wrapped function.
@functools.wraps(func, assigned=[])
def decorate(*args, **kw):
# Retrying library decorator might potentially run a decorated
# function within a new thread so it's safer not to apply the
# decorator directly to a target method/function because we can
# lose an authentication context.
# The solution is to create one more function and explicitly set
# auth context before calling it (potentially in a new thread).
auth_ctx = context.ctx() if context.has_ctx() else None
return retry(_with_auth_context, auth_ctx, func, *args, **kw)
return decorate
def check_db_obj_access(db_obj):
"""Check accessibility to db object."""
ctx = context.ctx()
is_admin = ctx.is_admin
if not is_admin and db_obj.project_id != security.get_project_id():
raise exc.NotAllowedException(
"Can not access %s resource of other projects, ID: %s" %
(db_obj.__class__.__name__, db_obj.id)
)
if not is_admin and hasattr(db_obj, 'is_system') and db_obj.is_system:
raise exc.InvalidActionException(
"Can not modify a system %s resource, ID: %s" %
(db_obj.__class__.__name__, db_obj.id)
)
def tx_cached(use_args=None, ignore_args=None):
"""Decorates any function to cache its result within a DB transaction.
Since a DB transaction is coupled with the current thread, the scope
of the underlying cache doesn't go beyond the thread. The decorator
is mainly useful for situations when we know we can safely cache a
result of some calculation if we know that it's not going to change
till the end of the current transaction.
:param use_args: A tuple with argument names of the decorated function
used to build a cache key.
:param ignore_args: A tuple with argument names of the decorated function
that should be ignored when building a cache key.
:return: Decorated function.
"""
if use_args and ignore_args:
raise ValueError(
"Only one of 'use_args' and 'ignore_args' can be used."
)
def _build_cache_key(func, *args, **kw):
# { arg name => arg value }
arg_dict = inspect.getcallargs(func, *args, **kw)
if ignore_args:
if not isinstance(ignore_args, (str, tuple)):
raise ValueError(
"'ignore_args' must be either a tuple or a string,"
" actual type: %s" % type(ignore_args)
)
ignore_args_tup = (
ignore_args if isinstance(ignore_args, tuple) else
(ignore_args,)
)
for arg_name in ignore_args_tup:
arg_dict.pop(arg_name, None)
if use_args:
if not isinstance(use_args, (str, tuple)):
raise ValueError(
"'use_args' must be either a tuple or a string,"
" actual type: %s" % type(use_args)
)
use_args_tup = (
use_args if isinstance(use_args, tuple) else (use_args,)
)
for arg_name in arg_dict.keys():
if arg_name not in tuple(use_args_tup):
arg_dict.pop(arg_name, None)
return cachetools_keys.hashkey(**arg_dict)
@decorator.decorator
def _decorator(func, *args, **kw):
cache = db_base.get_tx_scoped_cache()
# A DB transaction may not be necessarily open at the moment.
if cache is None:
return func(*args, **kw)
cache_key = _build_cache_key(func, *args, **kw)
result = cache.get(cache_key, default=ml_utils.NotDefined)
if result is not ml_utils.NotDefined:
return result
# We don't do any exception handling here. In case of an exception
# nothing will be put into the cache and the exception will just
# bubble up as if there wasn't any wrapper.
result = func(*args, **kw)
cache[cache_key] = result
return result
return _decorator
def column_exists(table_name, column_name):
bind = op.get_context().bind
insp = ins(bind)
columns = insp.get_columns(table_name)
return any(c["name"] == column_name for c in columns)