cinder/cinder/api/api_utils.py

259 lines
9.7 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import typing
from typing import Any, Generator, Iterable, Optional, Union
from keystoneauth1 import exceptions as ks_exc
from keystoneauth1 import identity
from keystoneauth1 import loading as ka_loading
from keystoneclient import client
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import strutils
import webob
from webob import exc
if typing.TYPE_CHECKING:
# conditional import to avoid a circular import problem from cinderlib
from cinder import context
from cinder import exception
from cinder.i18n import _
CONF = cfg.CONF
CONF.import_group('keystone_authtoken',
'keystonemiddleware.auth_token.__init__')
LOG = logging.getLogger(__name__)
def _parse_is_public(is_public: Optional[str]) -> Optional[bool]:
"""Parse is_public into something usable.
* True: List public volume types only
* False: List private volume types only
* None: List both public and private volume types
"""
if is_public is None:
# preserve default value of showing only public types
return True
elif is_none_string(is_public):
return None
else:
try:
return strutils.bool_from_string(is_public, strict=True)
except ValueError:
msg = _('Invalid is_public filter [%s]') % is_public
raise exc.HTTPBadRequest(explanation=msg)
def is_none_string(val: Any) -> bool:
"""Check if a string represents a None value."""
if not isinstance(val, str):
return False
return val.lower() == 'none'
def remove_invalid_filter_options(
context: 'context.RequestContext',
filters: dict,
allowed_search_options: Iterable[str]) -> None:
"""Remove search options that are not valid for non-admin API/context."""
if context.is_admin:
# Allow all options
return
# Otherwise, strip out all unknown options
unknown_options = [opt for opt in filters
if opt not in allowed_search_options]
bad_options = ", ".join(unknown_options)
LOG.debug("Removing options '%s' from query.", bad_options)
for opt in unknown_options:
del filters[opt]
_visible_admin_metadata_keys = ['readonly', 'attached_mode']
def add_visible_admin_metadata(volume) -> None:
"""Add user-visible admin metadata to regular metadata.
Extracts the admin metadata keys that are to be made visible to
non-administrators, and adds them to the regular metadata structure for the
passed-in volume.
"""
visible_admin_meta = {}
if volume.get('volume_admin_metadata'):
if isinstance(volume['volume_admin_metadata'], dict):
volume_admin_metadata = volume['volume_admin_metadata']
for key in volume_admin_metadata:
if key in _visible_admin_metadata_keys:
visible_admin_meta[key] = volume_admin_metadata[key]
else:
for item in volume['volume_admin_metadata']:
if item['key'] in _visible_admin_metadata_keys:
visible_admin_meta[item['key']] = item['value']
# avoid circular ref when volume is a Volume instance
elif (volume.get('admin_metadata') and
isinstance(volume.get('admin_metadata'), dict)):
for key in _visible_admin_metadata_keys:
if key in volume['admin_metadata'].keys():
visible_admin_meta[key] = volume['admin_metadata'][key]
if not visible_admin_meta:
return
# NOTE(zhiyan): update visible administration metadata to
# volume metadata, administration metadata will rewrite existing key.
if volume.get('volume_metadata'):
orig_meta = list(volume.get('volume_metadata'))
for item in orig_meta:
if item['key'] in visible_admin_meta.keys():
item['value'] = visible_admin_meta.pop(item['key'])
for key, value in visible_admin_meta.items():
orig_meta.append({'key': key, 'value': value})
volume['volume_metadata'] = orig_meta
# avoid circular ref when vol is a Volume instance
elif (volume.get('metadata') and
isinstance(volume.get('metadata'), dict)):
volume['metadata'].update(visible_admin_meta)
else:
volume['metadata'] = visible_admin_meta
def validate_integer(value: int, name: str,
min_value: Optional[int] = None,
max_value: Optional[int] = None) -> int:
"""Make sure that value is a valid integer, potentially within range.
:param value: the value of the integer
:param name: the name of the integer
:param min_value: the min value of the integer
:param max_value: the max value of the integer
:returns: integer
"""
try:
value = strutils.validate_integer(value, name, min_value, max_value)
return value
except ValueError as e:
raise webob.exc.HTTPBadRequest(explanation=str(e))
def walk_class_hierarchy(clazz: type,
encountered: Optional[list[type]] = None) -> \
Generator[type, None, None]:
"""Walk class hierarchy, yielding most derived classes first."""
if not encountered:
encountered = []
for subclass in clazz.__subclasses__():
if subclass not in encountered:
encountered.append(subclass)
# drill down to leaves first
for subsubclass in walk_class_hierarchy(subclass, encountered):
yield subsubclass
yield subclass
def _keystone_client(context: 'context.RequestContext',
version: tuple[int, int] = (3, 0)) -> client.Client:
"""Creates and returns an instance of a generic keystone client.
:param context: The request context
:param version: version of Keystone to request
:return: keystoneclient.client.Client object
"""
if context.system_scope is not None:
auth_plugin = identity.Token(
auth_url=CONF.keystone_authtoken.auth_url,
token=context.auth_token,
system_scope=context.system_scope
)
elif context.domain_id is not None:
auth_plugin = identity.Token(
auth_url=CONF.keystone_authtoken.auth_url,
token=context.auth_token,
domain_id=context.domain_id
)
elif context.project_id is not None:
auth_plugin = identity.Token(
auth_url=CONF.keystone_authtoken.auth_url,
token=context.auth_token,
project_id=context.project_id
)
else:
# We're dealing with an unscoped token from keystone that doesn't
# carry any authoritative power outside of the user simplify proving
# they know their own password. This token isn't associated with any
# authorization target (e.g., system, domain, or project).
auth_plugin = context.get_auth_plugin()
client_session = ka_loading.session.Session().load_from_options(
auth=auth_plugin,
insecure=CONF.keystone_authtoken.insecure,
cacert=CONF.keystone_authtoken.cafile,
key=CONF.keystone_authtoken.keyfile,
cert=CONF.keystone_authtoken.certfile,
split_loggers=CONF.service_user.split_loggers)
return client.Client(auth_url=CONF.keystone_authtoken.auth_url,
session=client_session, version=version)
class GenericProjectInfo(object):
"""Abstraction layer for Keystone V2 and V3 project objects"""
def __init__(self,
project_id: str,
project_keystone_api_version: str,
domain_id: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None):
self.id = project_id
self.keystone_api_version = project_keystone_api_version
self.domain_id = domain_id
self.name = name
self.description = description
def get_project(context: 'context.RequestContext',
project_id: str) -> GenericProjectInfo:
"""Method to verify project exists in keystone"""
keystone = _keystone_client(context)
generic_project = GenericProjectInfo(project_id, keystone.version)
project = keystone.projects.get(project_id)
generic_project.domain_id = project.domain_id
generic_project.name = project.name
generic_project.description = project.description
return generic_project
def validate_project_and_authorize(context: 'context.RequestContext',
project_id: str,
policy_check: str,
validate_only: bool = False) -> None:
target_project: Union[GenericProjectInfo, dict]
try:
target_project = get_project(context, project_id)
if not validate_only:
target_project = {'project_id': target_project.id}
context.authorize(policy_check, target=target_project)
except ks_exc.http.NotFound:
explanation = _("Project with id %s not found." % project_id)
raise exc.HTTPNotFound(explanation=explanation)
except exception.NotAuthorized:
explanation = _("You are not authorized to perform this "
"operation.")
raise exc.HTTPForbidden(explanation=explanation)