223 lines
7.4 KiB
Python
223 lines
7.4 KiB
Python
# Copyright 2016 - Nokia Networks
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from cachetools import keys as cachetools_keys
|
|
import decorator
|
|
import functools
|
|
import inspect
|
|
|
|
from alembic import op
|
|
from sqlalchemy import exc as sqla_exc
|
|
from sqlalchemy import inspect as ins
|
|
|
|
from oslo_db import exception as db_exc
|
|
from oslo_log import log as logging
|
|
|
|
import tenacity
|
|
|
|
from mistral import context
|
|
from mistral.db.sqlalchemy import base as db_base
|
|
from mistral import exceptions as exc
|
|
from mistral.services import security
|
|
from mistral_lib import utils as ml_utils
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
_RETRY_ERRORS = (
|
|
db_exc.DBDeadlock,
|
|
db_exc.DBConnectionError,
|
|
sqla_exc.OperationalError
|
|
)
|
|
|
|
|
|
def _with_auth_context(auth_ctx, func, *args, **kw):
|
|
"""Runs the given function with the specified auth context.
|
|
|
|
:param auth_ctx: Authentication context.
|
|
:param func: Function to run with the specified auth context.
|
|
:param args: Function positional arguments.
|
|
:param kw: Function keyword arguments.
|
|
:return: Function result.
|
|
"""
|
|
old_auth_ctx = context.ctx() if context.has_ctx() else None
|
|
|
|
context.set_ctx(auth_ctx)
|
|
|
|
try:
|
|
return func(*args, **kw)
|
|
except Exception as e:
|
|
# Note (rakhmerov): In case of "Too many connections" error from the
|
|
# database it doesn't get wrapped with a SQLAlchemy exception for some
|
|
# reason so we have to check the exception message explicitly.
|
|
if isinstance(e, _RETRY_ERRORS) or 'Too many connections' in str(e):
|
|
LOG.exception(
|
|
"DB error detected, operation will be retried: %s", func
|
|
)
|
|
|
|
raise
|
|
finally:
|
|
context.set_ctx(old_auth_ctx)
|
|
|
|
|
|
def retry_on_db_error(func, retry=None):
|
|
"""Decorates the given function so that it retries on DB errors.
|
|
|
|
Note that the decorator retries the function/method only on some
|
|
of the DB errors that are considered to be worth retrying, like
|
|
deadlocks and disconnections.
|
|
|
|
:param func: Function to decorate.
|
|
:param retry: a Retrying object
|
|
:return: Decorated function.
|
|
"""
|
|
if not retry:
|
|
retry = tenacity.Retrying(
|
|
retry=(
|
|
tenacity.retry_if_exception_type(_RETRY_ERRORS) |
|
|
tenacity.retry_if_exception_message(
|
|
match='Too many connections'
|
|
)
|
|
),
|
|
stop=tenacity.stop_after_attempt(50),
|
|
wait=tenacity.wait_incrementing(start=0, increment=0.1, max=2)
|
|
)
|
|
|
|
# The `assigned` arg should be empty as some of the default values are not
|
|
# supported by simply initialized MagicMocks. The consequence may
|
|
# be that the representation will contain the wrapper and not the
|
|
# wrapped function.
|
|
@functools.wraps(func, assigned=[])
|
|
def decorate(*args, **kw):
|
|
# Retrying library decorator might potentially run a decorated
|
|
# function within a new thread so it's safer not to apply the
|
|
# decorator directly to a target method/function because we can
|
|
# lose an authentication context.
|
|
# The solution is to create one more function and explicitly set
|
|
# auth context before calling it (potentially in a new thread).
|
|
auth_ctx = context.ctx() if context.has_ctx() else None
|
|
|
|
return retry(_with_auth_context, auth_ctx, func, *args, **kw)
|
|
|
|
return decorate
|
|
|
|
|
|
def check_db_obj_access(db_obj):
|
|
"""Check accessibility to db object."""
|
|
ctx = context.ctx()
|
|
is_admin = ctx.is_admin
|
|
|
|
if not is_admin and db_obj.project_id != security.get_project_id():
|
|
raise exc.NotAllowedException(
|
|
"Can not access %s resource of other projects, ID: %s" %
|
|
(db_obj.__class__.__name__, db_obj.id)
|
|
)
|
|
|
|
if not is_admin and hasattr(db_obj, 'is_system') and db_obj.is_system:
|
|
raise exc.InvalidActionException(
|
|
"Can not modify a system %s resource, ID: %s" %
|
|
(db_obj.__class__.__name__, db_obj.id)
|
|
)
|
|
|
|
|
|
def tx_cached(use_args=None, ignore_args=None):
|
|
"""Decorates any function to cache its result within a DB transaction.
|
|
|
|
Since a DB transaction is coupled with the current thread, the scope
|
|
of the underlying cache doesn't go beyond the thread. The decorator
|
|
is mainly useful for situations when we know we can safely cache a
|
|
result of some calculation if we know that it's not going to change
|
|
till the end of the current transaction.
|
|
|
|
:param use_args: A tuple with argument names of the decorated function
|
|
used to build a cache key.
|
|
:param ignore_args: A tuple with argument names of the decorated function
|
|
that should be ignored when building a cache key.
|
|
:return: Decorated function.
|
|
"""
|
|
|
|
if use_args and ignore_args:
|
|
raise ValueError(
|
|
"Only one of 'use_args' and 'ignore_args' can be used."
|
|
)
|
|
|
|
def _build_cache_key(func, *args, **kw):
|
|
# { arg name => arg value }
|
|
arg_dict = inspect.getcallargs(func, *args, **kw)
|
|
|
|
if ignore_args:
|
|
if not isinstance(ignore_args, (str, tuple)):
|
|
raise ValueError(
|
|
"'ignore_args' must be either a tuple or a string,"
|
|
" actual type: %s" % type(ignore_args)
|
|
)
|
|
|
|
ignore_args_tup = (
|
|
ignore_args if isinstance(ignore_args, tuple) else
|
|
(ignore_args,)
|
|
)
|
|
|
|
for arg_name in ignore_args_tup:
|
|
arg_dict.pop(arg_name, None)
|
|
|
|
if use_args:
|
|
if not isinstance(use_args, (str, tuple)):
|
|
raise ValueError(
|
|
"'use_args' must be either a tuple or a string,"
|
|
" actual type: %s" % type(use_args)
|
|
)
|
|
|
|
use_args_tup = (
|
|
use_args if isinstance(use_args, tuple) else (use_args,)
|
|
)
|
|
|
|
for arg_name in arg_dict.keys():
|
|
if arg_name not in tuple(use_args_tup):
|
|
arg_dict.pop(arg_name, None)
|
|
|
|
return cachetools_keys.hashkey(**arg_dict)
|
|
|
|
@decorator.decorator
|
|
def _decorator(func, *args, **kw):
|
|
cache = db_base.get_tx_scoped_cache()
|
|
|
|
# A DB transaction may not be necessarily open at the moment.
|
|
if cache is None:
|
|
return func(*args, **kw)
|
|
|
|
cache_key = _build_cache_key(func, *args, **kw)
|
|
|
|
result = cache.get(cache_key, default=ml_utils.NotDefined)
|
|
|
|
if result is not ml_utils.NotDefined:
|
|
return result
|
|
|
|
# We don't do any exception handling here. In case of an exception
|
|
# nothing will be put into the cache and the exception will just
|
|
# bubble up as if there wasn't any wrapper.
|
|
result = func(*args, **kw)
|
|
|
|
cache[cache_key] = result
|
|
|
|
return result
|
|
|
|
return _decorator
|
|
|
|
|
|
def column_exists(table_name, column_name):
|
|
bind = op.get_context().bind
|
|
insp = ins(bind)
|
|
columns = insp.get_columns(table_name)
|
|
return any(c["name"] == column_name for c in columns)
|