From 5fc7df24a549603652171edccd4422d84f9500c1 Mon Sep 17 00:00:00 2001 From: Lance Bragstad Date: Wed, 28 Oct 2020 16:35:08 +0000 Subject: [PATCH] Xena project personas for volume type access API Implements the 3 Xena personas (no system scope). Also introduces a new policy 'volume_extension:volume_type_access:get_all_for_type' that governs the GET /types/{type_id}/os-volume-type-access call. Also removes an overriding policy definition from policy.yaml in cinder/tests/unit/ that doesn't appear to be needed. Co-authored-by: Lance Bragstad Co-authored-by: Brian Rosmaita Change-Id: I6db266315efa02f08dfc0c069514f2ff9b8c551e --- cinder/api/contrib/volume_type_access.py | 3 +- cinder/policies/volume_access.py | 46 +++- .../tests/unit/policies/test_volume_access.py | 246 ++++++++++++++++++ cinder/tests/unit/policy.yaml | 7 - 4 files changed, 287 insertions(+), 15 deletions(-) create mode 100644 cinder/tests/unit/policies/test_volume_access.py diff --git a/cinder/api/contrib/volume_type_access.py b/cinder/api/contrib/volume_type_access.py index 344f58b5149..1ac10172a5a 100644 --- a/cinder/api/contrib/volume_type_access.py +++ b/cinder/api/contrib/volume_type_access.py @@ -40,7 +40,7 @@ class VolumeTypeAccessController(object): def index(self, req, type_id): context = req.environ['cinder.context'] - context.authorize(policy.TYPE_ACCESS_POLICY) + context.authorize(policy.TYPE_ACCESS_WHO_POLICY) # Not found exception will be handled at the wsgi level vol_type = volume_types.get_volume_type( @@ -77,6 +77,7 @@ class VolumeTypeActionController(wsgi.Controller): vol_type = req.cached_resource_by_id(type_id, name='types') self._extend_vol_type(vol_type_rval, vol_type) + # TODO: remove this, there is no /types/detail call for this to extend @wsgi.extends def detail(self, req, resp_obj): context = req.environ['cinder.context'] diff --git a/cinder/policies/volume_access.py b/cinder/policies/volume_access.py index 78aeeac46f8..e71b5d99431 100644 --- a/cinder/policies/volume_access.py +++ b/cinder/policies/volume_access.py @@ -22,21 +22,37 @@ ADD_PROJECT_POLICY = "volume_extension:volume_type_access:addProjectAccess" REMOVE_PROJECT_POLICY = \ "volume_extension:volume_type_access:removeProjectAccess" TYPE_ACCESS_POLICY = "volume_extension:volume_type_access" +TYPE_ACCESS_WHO_POLICY = "volume_extension:volume_type_access:get_all_for_type" + + +deprecated_volume_type_access = base.CinderDeprecatedRule( + name=TYPE_ACCESS_POLICY, + check_str=base.RULE_ADMIN_OR_OWNER +) +deprecated_type_access_who_policy = base.CinderDeprecatedRule( + name=TYPE_ACCESS_WHO_POLICY, + # TODO: revise check_str and dep_reason in Yoga + check_str=TYPE_ACCESS_POLICY, + deprecated_reason=( + f"Reason: '{TYPE_ACCESS_WHO_POLICY}' is a new policy that protects " + f"an API call formerly governed by '{TYPE_ACCESS_POLICY}', but which " + 'has been separated for finer-grained policy control.'), +) + volume_access_policies = [ policy.DocumentedRuleDefault( name=TYPE_ACCESS_POLICY, - check_str=base.RULE_ADMIN_OR_OWNER, - description="Volume type access related APIs.", + check_str=base.SYSTEM_ADMIN_OR_PROJECT_MEMBER, + description=( + "Adds the boolean field 'os-volume-type-access:is_public' to " + 'the responses for these API calls. The ability to make these ' + 'calls is governed by other policies.'), operations=[ { 'method': 'GET', 'path': '/types' }, - { - 'method': 'GET', - 'path': '/types/detail' - }, { 'method': 'GET', 'path': '/types/{type_id}' @@ -45,7 +61,9 @@ volume_access_policies = [ 'method': 'POST', 'path': '/types' } - ]), + ], + deprecated_rule=deprecated_volume_type_access, + ), policy.DocumentedRuleDefault( name=ADD_PROJECT_POLICY, check_str=base.RULE_ADMIN_API, @@ -66,6 +84,20 @@ volume_access_policies = [ 'path': '/types/{type_id}/action (removeProjectAccess)' } ]), + policy.DocumentedRuleDefault( + name=TYPE_ACCESS_WHO_POLICY, + check_str=base.RULE_ADMIN_API, + description=( + 'List private volume type access detail, that is, list the ' + 'projects that have access to this volume type.'), + operations=[ + { + 'method': 'GET', + 'path': '/types/{type_id}/os-volume-type-access' + } + ], + deprecated_rule=deprecated_type_access_who_policy, + ), ] diff --git a/cinder/tests/unit/policies/test_volume_access.py b/cinder/tests/unit/policies/test_volume_access.py new file mode 100644 index 00000000000..90165396bba --- /dev/null +++ b/cinder/tests/unit/policies/test_volume_access.py @@ -0,0 +1,246 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from copy import deepcopy + +import ddt + +from cinder.api.contrib import volume_type_access as vta +from cinder.api import microversions as mv +from cinder import objects +from cinder.policies import volume_access as vta_policies +from cinder.tests.unit.api.contrib import test_volume_type_access as vta_test +from cinder.tests.unit.api import fakes as fake_api +from cinder.tests.unit import fake_constants as fake +from cinder.tests.unit.policies import base + +IS_PUBLIC_FIELD = 'os-volume-type-access:is_public' + + +# the original uses a class var and admin context +class FakeRequest(vta_test.FakeRequest): + def __init__(self, context): + self.environ = {"cinder.context": context} + + +FAKE_RESP_OBJ = { + 'volume_type': {'id': fake.VOLUME_TYPE_ID}, + 'volume_types': [ + {'id': fake.VOLUME_TYPE_ID}, + {'id': fake.VOLUME_TYPE3_ID} + ]} + + +# need an instance var so this will work with ddt +class FakeResponse(vta_test.FakeResponse): + def __init__(self): + self.obj = deepcopy(FAKE_RESP_OBJ) + + +@ddt.ddt +class VolumeTypeAccessFieldPolicyTest(base.BasePolicyTest): + + # NOTE: We are testing directly against the extension controller. + # Its call to context.authorize doesn't provide a target, so + # "is_admin" or "project_id:%(project_id)s" always matches. + authorized_users = [ + 'legacy_admin', + 'project_admin', + 'system_admin', + 'legacy_owner', + 'project_member', + 'project_reader', + 'project_foo', + 'system_member', + 'system_reader', + 'system_foo', + 'other_project_member', + 'other_project_reader', + ] + + # note: authorize is called with fatal=False, so everyone is a winner! + everyone = authorized_users + + # Basic policy test is without enforcing scope (which cinder doesn't + # yet support) and deprecated rules enabled. + def setUp(self, enforce_scope=False, enforce_new_defaults=False, + *args, **kwargs): + super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs) + self.controller = vta.VolumeTypeActionController() + self.rule_name = vta_policies.TYPE_ACCESS_POLICY + self.api_version = mv.BASE_VERSION + self.api_path = f'/v3/{self.project_id}/types' + + @ddt.data(*base.all_users) + def test_type_access_policy_types_list(self, user_id): + unauthorized_exceptions = None + req = FakeRequest(self.create_context(user_id)) + resp = FakeResponse() + + self.common_policy_check(user_id, + self.everyone, + [], + unauthorized_exceptions, + self.rule_name, + self.controller.index, + req, + resp) + + # this is where the real check happens + if user_id in self.authorized_users: + for vol_type in resp.obj['volume_types']: + self.assertIn(IS_PUBLIC_FIELD, vol_type) + else: + for vol_type in resp.obj['volume_types']: + self.assertNotIn(IS_PUBLIC_FIELD, vol_type) + + @ddt.data(*base.all_users) + def test_type_access_policy_type_show(self, user_id): + unauthorized_exceptions = None + req = FakeRequest(self.create_context(user_id)) + resp = FakeResponse() + + self.common_policy_check(user_id, + self.everyone, + [], + unauthorized_exceptions, + self.rule_name, + self.controller.show, + req, + resp, + fake.VOLUME_TYPE_ID) + + if user_id in self.authorized_users: + self.assertIn(IS_PUBLIC_FIELD, resp.obj['volume_type']) + else: + self.assertNotIn(IS_PUBLIC_FIELD, resp.obj['volume_type']) + + @ddt.data(*base.all_users) + def test_type_access_policy_type_create(self, user_id): + unauthorized_exceptions = None + req = FakeRequest(self.create_context(user_id)) + resp = FakeResponse() + body = None + + self.common_policy_check(user_id, + self.everyone, + [], + unauthorized_exceptions, + self.rule_name, + self.controller.create, + req, + body, + resp) + + if user_id in self.authorized_users: + self.assertIn(IS_PUBLIC_FIELD, resp.obj['volume_type']) + else: + self.assertNotIn(IS_PUBLIC_FIELD, resp.obj['volume_type']) + + +class VolumeTypeAccessFieldPolicySecureRbacTest( + VolumeTypeAccessFieldPolicyTest): + + # Remember that we are testing directly against the extension controller, + # so while the below may seem over-permissive, in real life there is + # a more selective check that happens first. + authorized_users = [ + 'legacy_admin', + 'project_admin', + 'system_admin', + 'project_member', + 'system_member', + 'other_project_member', + ] + # this will be anyone without the 'admin' or 'member' role + unauthorized_users = [ + 'legacy_owner', + 'project_foo', + 'project_reader', + 'system_reader', + 'system_foo', + 'other_project_reader', + ] + everyone = authorized_users + unauthorized_users + + def setUp(self, *args, **kwargs): + # Test secure RBAC by disabling deprecated policy rules (scope + # is still not enabled). + super().setUp(enforce_scope=False, enforce_new_defaults=True, + *args, **kwargs) + + +@ddt.ddt +class VolumeTypeAccessListProjectsPolicyTest(base.BasePolicyTest): + authorized_users = [ + 'legacy_admin', + 'project_admin', + 'system_admin', + ] + unauthorized_users = [ + 'legacy_owner', + 'project_member', + 'project_reader', + 'project_foo', + 'system_member', + 'system_reader', + 'system_foo', + 'other_project_member', + 'other_project_reader', + ] + + # Basic policy test is without enforcing scope (which cinder doesn't + # yet support) and deprecated rules enabled. + def setUp(self, enforce_scope=False, enforce_new_defaults=False, + *args, **kwargs): + super().setUp(enforce_scope, enforce_new_defaults, *args, **kwargs) + self.controller = vta.VolumeTypeAccessController() + self.volume_type = objects.VolumeType( + self.project_admin_context, + name='private_volume_type', + is_public=False, + description='volume type for srbac testing', + extra_specs=None, + projects=[self.project_id, self.project_id_other]) + self.volume_type.create() + self.addCleanup(self.volume_type.destroy) + self.api_version = mv.BASE_VERSION + self.api_path = (f'/v3/{self.project_id}/types/' + f'{self.volume_type.id}/os-volume-type-access') + + @ddt.data(*base.all_users) + def test_type_access_who_policy(self, user_id): + """Test policy for listing projects with access to a volume type.""" + + rule_name = vta_policies.TYPE_ACCESS_WHO_POLICY + unauthorized_exceptions = None + req = fake_api.HTTPRequest.blank(self.api_path) + + self.common_policy_check(user_id, + self.authorized_users, + self.unauthorized_users, + unauthorized_exceptions, + rule_name, + self.controller.index, + req, + self.volume_type.id) + + +class VolumeTypeAccessListProjectsPolicySecureRbacTest( + VolumeTypeAccessListProjectsPolicyTest): + + def setUp(self, *args, **kwargs): + # Test secure RBAC by disabling deprecated policy rules (scope + # is still not enabled). + super().setUp(enforce_scope=False, enforce_new_defaults=True, + *args, **kwargs) diff --git a/cinder/tests/unit/policy.yaml b/cinder/tests/unit/policy.yaml index 9cff066f4cd..544d5ff3729 100644 --- a/cinder/tests/unit/policy.yaml +++ b/cinder/tests/unit/policy.yaml @@ -139,13 +139,6 @@ # DELETE /types "volume_extension:types_manage": "" -# Volume type access related APIs. -# GET /types -# GET /types/detail -# GET /types/{type_id} -# POST /types -"volume_extension:volume_type_access": "" - # Revert a volume to a snapshot. # POST /volumes/{volume_id}/action (revert) "volume:revert_to_snapshot": ""