259 lines
9.7 KiB
Python
259 lines
9.7 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import annotations
|
|
|
|
import typing
|
|
from typing import Any, Generator, Iterable, Optional, Union
|
|
|
|
from keystoneauth1 import exceptions as ks_exc
|
|
from keystoneauth1 import identity
|
|
from keystoneauth1 import loading as ka_loading
|
|
from keystoneclient import client
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
from oslo_utils import strutils
|
|
import webob
|
|
from webob import exc
|
|
|
|
if typing.TYPE_CHECKING:
|
|
# conditional import to avoid a circular import problem from cinderlib
|
|
from cinder import context
|
|
from cinder import exception
|
|
from cinder.i18n import _
|
|
|
|
CONF = cfg.CONF
|
|
CONF.import_group('keystone_authtoken',
|
|
'keystonemiddleware.auth_token.__init__')
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def _parse_is_public(is_public: Optional[str]) -> Optional[bool]:
|
|
"""Parse is_public into something usable.
|
|
|
|
* True: List public volume types only
|
|
* False: List private volume types only
|
|
* None: List both public and private volume types
|
|
"""
|
|
|
|
if is_public is None:
|
|
# preserve default value of showing only public types
|
|
return True
|
|
elif is_none_string(is_public):
|
|
return None
|
|
else:
|
|
try:
|
|
return strutils.bool_from_string(is_public, strict=True)
|
|
except ValueError:
|
|
msg = _('Invalid is_public filter [%s]') % is_public
|
|
raise exc.HTTPBadRequest(explanation=msg)
|
|
|
|
|
|
def is_none_string(val: Any) -> bool:
|
|
"""Check if a string represents a None value."""
|
|
if not isinstance(val, str):
|
|
return False
|
|
|
|
return val.lower() == 'none'
|
|
|
|
|
|
def remove_invalid_filter_options(
|
|
context: 'context.RequestContext',
|
|
filters: dict,
|
|
allowed_search_options: Iterable[str]) -> None:
|
|
"""Remove search options that are not valid for non-admin API/context."""
|
|
|
|
if context.is_admin:
|
|
# Allow all options
|
|
return
|
|
# Otherwise, strip out all unknown options
|
|
unknown_options = [opt for opt in filters
|
|
if opt not in allowed_search_options]
|
|
bad_options = ", ".join(unknown_options)
|
|
LOG.debug("Removing options '%s' from query.", bad_options)
|
|
for opt in unknown_options:
|
|
del filters[opt]
|
|
|
|
|
|
_visible_admin_metadata_keys = ['readonly', 'attached_mode']
|
|
|
|
|
|
def add_visible_admin_metadata(volume) -> None:
|
|
"""Add user-visible admin metadata to regular metadata.
|
|
|
|
Extracts the admin metadata keys that are to be made visible to
|
|
non-administrators, and adds them to the regular metadata structure for the
|
|
passed-in volume.
|
|
"""
|
|
visible_admin_meta = {}
|
|
|
|
if volume.get('volume_admin_metadata'):
|
|
if isinstance(volume['volume_admin_metadata'], dict):
|
|
volume_admin_metadata = volume['volume_admin_metadata']
|
|
for key in volume_admin_metadata:
|
|
if key in _visible_admin_metadata_keys:
|
|
visible_admin_meta[key] = volume_admin_metadata[key]
|
|
else:
|
|
for item in volume['volume_admin_metadata']:
|
|
if item['key'] in _visible_admin_metadata_keys:
|
|
visible_admin_meta[item['key']] = item['value']
|
|
# avoid circular ref when volume is a Volume instance
|
|
elif (volume.get('admin_metadata') and
|
|
isinstance(volume.get('admin_metadata'), dict)):
|
|
for key in _visible_admin_metadata_keys:
|
|
if key in volume['admin_metadata'].keys():
|
|
visible_admin_meta[key] = volume['admin_metadata'][key]
|
|
|
|
if not visible_admin_meta:
|
|
return
|
|
|
|
# NOTE(zhiyan): update visible administration metadata to
|
|
# volume metadata, administration metadata will rewrite existing key.
|
|
if volume.get('volume_metadata'):
|
|
orig_meta = list(volume.get('volume_metadata'))
|
|
for item in orig_meta:
|
|
if item['key'] in visible_admin_meta.keys():
|
|
item['value'] = visible_admin_meta.pop(item['key'])
|
|
for key, value in visible_admin_meta.items():
|
|
orig_meta.append({'key': key, 'value': value})
|
|
volume['volume_metadata'] = orig_meta
|
|
# avoid circular ref when vol is a Volume instance
|
|
elif (volume.get('metadata') and
|
|
isinstance(volume.get('metadata'), dict)):
|
|
volume['metadata'].update(visible_admin_meta)
|
|
else:
|
|
volume['metadata'] = visible_admin_meta
|
|
|
|
|
|
def validate_integer(value: int, name: str,
|
|
min_value: Optional[int] = None,
|
|
max_value: Optional[int] = None) -> int:
|
|
"""Make sure that value is a valid integer, potentially within range.
|
|
|
|
:param value: the value of the integer
|
|
:param name: the name of the integer
|
|
:param min_value: the min value of the integer
|
|
:param max_value: the max value of the integer
|
|
:returns: integer
|
|
"""
|
|
try:
|
|
value = strutils.validate_integer(value, name, min_value, max_value)
|
|
return value
|
|
except ValueError as e:
|
|
raise webob.exc.HTTPBadRequest(explanation=str(e))
|
|
|
|
|
|
def walk_class_hierarchy(clazz: type,
|
|
encountered: Optional[list[type]] = None) -> \
|
|
Generator[type, None, None]:
|
|
"""Walk class hierarchy, yielding most derived classes first."""
|
|
if not encountered:
|
|
encountered = []
|
|
for subclass in clazz.__subclasses__():
|
|
if subclass not in encountered:
|
|
encountered.append(subclass)
|
|
# drill down to leaves first
|
|
for subsubclass in walk_class_hierarchy(subclass, encountered):
|
|
yield subsubclass
|
|
yield subclass
|
|
|
|
|
|
def _keystone_client(context: 'context.RequestContext',
|
|
version: tuple[int, int] = (3, 0)) -> client.Client:
|
|
"""Creates and returns an instance of a generic keystone client.
|
|
|
|
:param context: The request context
|
|
:param version: version of Keystone to request
|
|
:return: keystoneclient.client.Client object
|
|
"""
|
|
if context.system_scope is not None:
|
|
auth_plugin = identity.Token(
|
|
auth_url=CONF.keystone_authtoken.auth_url,
|
|
token=context.auth_token,
|
|
system_scope=context.system_scope
|
|
)
|
|
elif context.domain_id is not None:
|
|
auth_plugin = identity.Token(
|
|
auth_url=CONF.keystone_authtoken.auth_url,
|
|
token=context.auth_token,
|
|
domain_id=context.domain_id
|
|
)
|
|
elif context.project_id is not None:
|
|
auth_plugin = identity.Token(
|
|
auth_url=CONF.keystone_authtoken.auth_url,
|
|
token=context.auth_token,
|
|
project_id=context.project_id
|
|
)
|
|
else:
|
|
# We're dealing with an unscoped token from keystone that doesn't
|
|
# carry any authoritative power outside of the user simplify proving
|
|
# they know their own password. This token isn't associated with any
|
|
# authorization target (e.g., system, domain, or project).
|
|
auth_plugin = context.get_auth_plugin()
|
|
|
|
client_session = ka_loading.session.Session().load_from_options(
|
|
auth=auth_plugin,
|
|
insecure=CONF.keystone_authtoken.insecure,
|
|
cacert=CONF.keystone_authtoken.cafile,
|
|
key=CONF.keystone_authtoken.keyfile,
|
|
cert=CONF.keystone_authtoken.certfile,
|
|
split_loggers=CONF.service_user.split_loggers)
|
|
return client.Client(auth_url=CONF.keystone_authtoken.auth_url,
|
|
session=client_session, version=version)
|
|
|
|
|
|
class GenericProjectInfo(object):
|
|
"""Abstraction layer for Keystone V2 and V3 project objects"""
|
|
def __init__(self,
|
|
project_id: str,
|
|
project_keystone_api_version: str,
|
|
domain_id: Optional[str] = None,
|
|
name: Optional[str] = None,
|
|
description: Optional[str] = None):
|
|
self.id = project_id
|
|
self.keystone_api_version = project_keystone_api_version
|
|
self.domain_id = domain_id
|
|
self.name = name
|
|
self.description = description
|
|
|
|
|
|
def get_project(context: 'context.RequestContext',
|
|
project_id: str) -> GenericProjectInfo:
|
|
"""Method to verify project exists in keystone"""
|
|
keystone = _keystone_client(context)
|
|
generic_project = GenericProjectInfo(project_id, keystone.version)
|
|
project = keystone.projects.get(project_id)
|
|
generic_project.domain_id = project.domain_id
|
|
generic_project.name = project.name
|
|
generic_project.description = project.description
|
|
return generic_project
|
|
|
|
|
|
def validate_project_and_authorize(context: 'context.RequestContext',
|
|
project_id: str,
|
|
policy_check: str,
|
|
validate_only: bool = False) -> None:
|
|
target_project: Union[GenericProjectInfo, dict]
|
|
try:
|
|
target_project = get_project(context, project_id)
|
|
if not validate_only:
|
|
target_project = {'project_id': target_project.id}
|
|
context.authorize(policy_check, target=target_project)
|
|
except ks_exc.http.NotFound:
|
|
explanation = _("Project with id %s not found." % project_id)
|
|
raise exc.HTTPNotFound(explanation=explanation)
|
|
except exception.NotAuthorized:
|
|
explanation = _("You are not authorized to perform this "
|
|
"operation.")
|
|
raise exc.HTTPForbidden(explanation=explanation)
|