Add testing framework for RBAC
Current tests do not have good test coverage of existing RBAC policies. Either tests for policies do not exist or if they exist then they do not cover the actual negative and positive testing. Along with what all users have access, it is important to test what all users does not have access. For Example, if any policy with default rule as admin only then test should verify: - policy check pass with context having admin role - policy check fail with context having non-admin role As we are implementing the project personas (project member and reader role) in policies, we need to have the enough testing coverage of existing policy behavior and to know that with new defaults how the access permissions will looks like. These test coverage will be extended to adopt the new changes and also make sure we do not break the existing behavior. This commit adds the testing framework for RBAC and implement create VNF test as example. It will cover other APIs in further changes in this series. Partial implement blueprint implement-project-personas Change-Id: I5b0d039c6aebda6ba0653032ac5a1963a704cb59
This commit is contained in:
parent
c056e248b2
commit
9eac5d363f
0
tacker/tests/unit/policies/__init__.py
Normal file
0
tacker/tests/unit/policies/__init__.py
Normal file
149
tacker/tests/unit/policies/base.py
Normal file
149
tacker/tests/unit/policies/base.py
Normal file
@ -0,0 +1,149 @@
|
||||
# Copyright (C) 2024 NEC, Corp.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils.fixture import uuidsentinel as uuids
|
||||
|
||||
from tacker.common import exceptions
|
||||
from tacker import context
|
||||
from tacker.tests.unit import base
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BasePolicyTest(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(BasePolicyTest, self).setUp()
|
||||
self.admin_project_id = uuids.admin_project_id
|
||||
self.project_id = uuids.project_id
|
||||
self.other_project_id = uuids.project_id_other
|
||||
|
||||
# Create the user context with implied roles so that we can test
|
||||
# each user's context for RBAC permission.
|
||||
#
|
||||
# Legacy admin user
|
||||
self.legacy_admin_context = context.Context(
|
||||
user_id="legacy_admin", project_id=self.admin_project_id,
|
||||
roles=['admin', 'member', 'reader'])
|
||||
|
||||
# project scoped users
|
||||
self.project_admin_context = context.Context(
|
||||
user_id="project_admin", project_id=self.project_id,
|
||||
roles=['admin', 'member', 'reader'])
|
||||
|
||||
self.project_member_context = context.Context(
|
||||
user_id="project_member", project_id=self.project_id,
|
||||
roles=['member', 'reader'])
|
||||
|
||||
self.project_reader_context = context.Context(
|
||||
user_id="project_reader", project_id=self.project_id,
|
||||
roles=['reader'])
|
||||
|
||||
self.project_foo_context = context.Context(
|
||||
user_id="project_foo", project_id=self.project_id,
|
||||
roles=['foo'])
|
||||
|
||||
self.other_project_member_context = context.Context(
|
||||
user_id="other_project_member",
|
||||
project_id=self.other_project_id,
|
||||
roles=['member', 'reader'])
|
||||
|
||||
self.other_project_reader_context = context.Context(
|
||||
user_id="other_project_member",
|
||||
project_id=self.other_project_id,
|
||||
roles=['reader'])
|
||||
|
||||
self.all_contexts = [
|
||||
self.legacy_admin_context, self.project_admin_context,
|
||||
self.project_member_context, self.project_reader_context,
|
||||
self.project_foo_context, self.other_project_member_context,
|
||||
self.other_project_reader_context
|
||||
]
|
||||
|
||||
def common_policy_check(self, authorized_contexts,
|
||||
unauthorized_contexts, rule_name,
|
||||
func, req, *arg, **kwarg):
|
||||
|
||||
# NOTE(gmann): When fatal=False is passed as a parameter
|
||||
# then this function does not raise error instead return
|
||||
# the responses for all contexts.
|
||||
fatal = kwarg.pop('fatal', True)
|
||||
authorized_response = []
|
||||
unauthorize_response = []
|
||||
|
||||
def ensure_return(req, *args, **kwargs):
|
||||
return func(req, *arg, **kwargs)
|
||||
|
||||
def ensure_raises(req, *args, **kwargs):
|
||||
exc = self.assertRaises(
|
||||
exceptions.PolicyNotAuthorized, func, req, *arg, **kwarg)
|
||||
# NOTE(gmann): In case of multi-policy APIs, PolicyNotAuthorized
|
||||
# exception can be raised from either of the policy so checking
|
||||
# the error message, which includes the rule name, can mismatch.
|
||||
# Tests verifying the multi policy can pass rule_name as None
|
||||
# to skip the error message assert.
|
||||
if rule_name is not None:
|
||||
self.assertEqual(
|
||||
"Policy doesn't allow %s to be performed." %
|
||||
rule_name, exc.format_message())
|
||||
# Verify all the context having allowed scope and roles pass
|
||||
# the policy check.
|
||||
for auth_context in authorized_contexts:
|
||||
LOG.info("Testing authorized user: %s", auth_context.user_id)
|
||||
req.environ['tacker.context'] = auth_context
|
||||
_args = copy.deepcopy(arg)
|
||||
_kwargs = copy.deepcopy(kwarg)
|
||||
if not fatal:
|
||||
authorized_response.append(
|
||||
ensure_return(req, *_args, **_kwargs))
|
||||
else:
|
||||
func(req, *_args, **_kwargs)
|
||||
|
||||
# Verify all the context not having allowed scope or roles fail
|
||||
# the policy check.
|
||||
for unauth_context in unauthorized_contexts:
|
||||
LOG.info("Testing unauthorized user: %s", unauth_context.user_id)
|
||||
req.environ['tacker.context'] = unauth_context
|
||||
_args = copy.deepcopy(arg)
|
||||
_kwargs = copy.deepcopy(kwarg)
|
||||
if not fatal:
|
||||
try:
|
||||
unauthorize_response.append(
|
||||
ensure_return(req, *_args, **_kwargs))
|
||||
# NOTE(gmann): We need to ignore the PolicyNotAuthorized
|
||||
# exception here so that we can add the correct response
|
||||
# in unauthorize_response for the case of fatal=False.
|
||||
# This handle the case of multi policy checks where tests
|
||||
# are verifying the second policy via the response of
|
||||
# fatal-False and ignoring the response checks where the
|
||||
# first policy itself fail to pass (even test override the
|
||||
# first policy to allow for everyone but still, scope
|
||||
# checks can leads to PolicyNotAuthorized error).
|
||||
# For example: flavor extra specs policy for GET flavor
|
||||
# API. In that case, flavor extra spec policy is checked
|
||||
# after the GET flavor policy. So any context failing on
|
||||
# GET flavor will raise the PolicyNotAuthorized and for
|
||||
# that case we do not have any way to verify the flavor
|
||||
# extra specs so skip that context to check in test.
|
||||
except exceptions.PolicyNotAuthorized:
|
||||
continue
|
||||
else:
|
||||
ensure_raises(req, *_args, **_kwargs)
|
||||
|
||||
return authorized_response, unauthorize_response
|
105
tacker/tests/unit/policies/test_vnf_lcm.py
Normal file
105
tacker/tests/unit/policies/test_vnf_lcm.py
Normal file
@ -0,0 +1,105 @@
|
||||
# Copyright (C) 2024 NEC, Corp.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from unittest import mock
|
||||
|
||||
from tacker.api.vnflcm.v1 import controller
|
||||
from tacker import objects
|
||||
from tacker.policies import vnf_lcm as policies
|
||||
from tacker.tests.unit import fake_request
|
||||
import tacker.tests.unit.nfvo.test_nfvo_plugin as test_nfvo_plugin
|
||||
from tacker.tests.unit.policies import base as base_test
|
||||
from tacker.tests.unit.vnflcm import fakes
|
||||
from tacker.tests import uuidsentinel
|
||||
from tacker.vnfm import vim_client
|
||||
|
||||
|
||||
class VNFLCMPolicyTest(base_test.BasePolicyTest):
|
||||
"""Test VNF LCM APIs policies with all possible context.
|
||||
|
||||
This class defines the set of context with different roles
|
||||
which are allowed and not allowed to pass the policy checks.
|
||||
With those set of context, it will call the API operation and
|
||||
verify the expected behaviour.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
super(VNFLCMPolicyTest, self).setUp()
|
||||
self.patcher = mock.patch(
|
||||
'tacker.manager.TackerManager.get_service_plugins',
|
||||
return_value={'VNFM': test_nfvo_plugin.FakeVNFMPlugin()})
|
||||
self.mock_manager = self.patcher.start()
|
||||
self.controller = controller.VnfLcmController()
|
||||
self.vim_info = {
|
||||
'vim_id': uuidsentinel.vnfd_id,
|
||||
'vim_type': 'test',
|
||||
'vim_auth': {'username': 'test', 'password': 'test'},
|
||||
'placement_attr': {'region': 'TestRegionOne'},
|
||||
'tenant': 'test',
|
||||
'extra': {}
|
||||
}
|
||||
# Below user's context will be allowed to create the VNF
|
||||
# in their project.
|
||||
self.create_authorized_contexts = [
|
||||
self.legacy_admin_context, self.project_admin_context,
|
||||
self.project_member_context, self.project_reader_context,
|
||||
self.project_foo_context, self.other_project_member_context,
|
||||
self.other_project_reader_context
|
||||
]
|
||||
self.create_unauthorized_contexts = []
|
||||
|
||||
@mock.patch.object(vim_client.VimClient, "get_vim")
|
||||
@mock.patch.object(objects.VnfPackage, 'get_by_id')
|
||||
@mock.patch('tacker.api.vnflcm.v1.controller.'
|
||||
'VnfLcmController._create_vnf')
|
||||
@mock.patch.object(objects.vnf_package.VnfPackage, 'save')
|
||||
@mock.patch.object(objects.vnf_instance, '_vnf_instance_update')
|
||||
@mock.patch.object(objects.vnf_instance, '_vnf_instance_create')
|
||||
@mock.patch.object(objects.vnf_package_vnfd.VnfPackageVnfd, 'get_by_id')
|
||||
def test_create_vnf(
|
||||
self, mock_get_by_id,
|
||||
mock_vnf_instance_create,
|
||||
mock_vnf_instance_update,
|
||||
mock_package_save,
|
||||
mock_private_create_vnf,
|
||||
mock_vnf_package_get_by_id,
|
||||
mock_get_vim):
|
||||
mock_get_vim.return_value = self.vim_info
|
||||
mock_get_by_id.return_value = fakes.return_vnf_package_vnfd()
|
||||
mock_vnf_package_get_by_id.return_value = \
|
||||
fakes.return_vnf_package_with_deployment_flavour()
|
||||
|
||||
updates = {'vnfd_id': uuidsentinel.vnfd_id,
|
||||
'vnf_instance_description': 'SampleVnf Description',
|
||||
'vnf_instance_name': 'SampleVnf',
|
||||
'vnf_pkg_id': uuidsentinel.vnf_pkg_id,
|
||||
'vnf_metadata': {'key': 'value'}}
|
||||
|
||||
mock_vnf_instance_create.return_value = (
|
||||
fakes.return_vnf_instance_model(**updates))
|
||||
mock_vnf_instance_update.return_value = (
|
||||
fakes.return_vnf_instance_model(**updates))
|
||||
|
||||
body = {'vnfdId': uuidsentinel.vnfd_id,
|
||||
'vnfInstanceName': 'SampleVnf',
|
||||
'vnfInstanceDescription': 'SampleVnf Description',
|
||||
'metadata': {'key': 'value'}}
|
||||
req = fake_request.HTTPRequest.blank('/vnf_instances')
|
||||
rule_name = policies.VNFLCM % 'create'
|
||||
self.common_policy_check(self.create_authorized_contexts,
|
||||
self.create_unauthorized_contexts,
|
||||
rule_name,
|
||||
self.controller.create,
|
||||
req, body=body)
|
Loading…
Reference in New Issue
Block a user