# Copyright (c) 2017 Huawei Technologies Co., Ltd. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import os.path from oslo_config import cfg from oslo_config import fixture as config_fixture from oslo_policy import policy as oslo_policy from karbor import context from karbor import exception from karbor.tests import base from karbor import utils from karbor import policy CONF = cfg.CONF class PolicyFileTestCase(base.TestCase): def setUp(self): super(PolicyFileTestCase, self).setUp() self.context = context.get_admin_context() self.target = {} self.fixture = self.useFixture(config_fixture.Config(CONF)) self.addCleanup(policy.reset) def test_modified_policy_reloads(self): with utils.tempdir() as tmpdir: tmpfilename = os.path.join(tmpdir, 'policy') self.fixture.config(policy_file=tmpfilename, group='oslo_policy') policy.reset() policy.init() rule = oslo_policy.RuleDefault('example:test', "") policy._ENFORCER.register_defaults([rule]) action = "example:test" with open(tmpfilename, "w") as policyfile: policyfile.write('{"example:test": ""}') policy.authorize(self.context, action, self.target) with open(tmpfilename, "w") as policyfile: policyfile.write('{"example:test": "!"}') policy._ENFORCER.load_rules(True) self.assertRaises(exception.PolicyNotAuthorized, policy.authorize, self.context, action, self.target) class PolicyTestCase(base.TestCase): def setUp(self): super(PolicyTestCase, self).setUp() rules = [ oslo_policy.RuleDefault("true", '@'), oslo_policy.RuleDefault("test:allowed", '@'), oslo_policy.RuleDefault("test:denied", "!"), oslo_policy.RuleDefault("test:my_file", "role:compute_admin or " "project_id:%(project_id)s"), oslo_policy.RuleDefault("test:early_and_fail", "! and @"), oslo_policy.RuleDefault("test:early_or_success", "@ or !"), oslo_policy.RuleDefault("test:lowercase_admin", "role:admin"), oslo_policy.RuleDefault("test:uppercase_admin", "role:ADMIN"), ] policy.reset() policy.init() # before a policy rule can be used, its default has to be registered. policy._ENFORCER.register_defaults(rules) self.context = context.RequestContext('fake', 'fake', roles=['member']) self.target = {} self.addCleanup(policy.reset) def test_authorize_nonexistent_action_throws(self): action = "test:noexist" self.assertRaises(oslo_policy.PolicyNotRegistered, policy.authorize, self.context, action, self.target) def test_authorize_bad_action_throws(self): action = "test:denied" self.assertRaises(exception.PolicyNotAuthorized, policy.authorize, self.context, action, self.target) def test_authorize_bad_action_noraise(self): action = "test:denied" result = policy.authorize(self.context, action, self.target, False) self.assertFalse(result) def test_authorize_good_action(self): action = "test:allowed" result = policy.authorize(self.context, action, self.target) self.assertTrue(result) def test_templatized_authorization(self): target_mine = {'project_id': 'fake'} target_not_mine = {'project_id': 'another'} action = "test:my_file" policy.authorize(self.context, action, target_mine) self.assertRaises(exception.PolicyNotAuthorized, policy.authorize, self.context, action, target_not_mine) def test_early_AND_authorization(self): action = "test:early_and_fail" self.assertRaises(exception.PolicyNotAuthorized, policy.authorize, self.context, action, self.target) def test_early_OR_authorization(self): action = "test:early_or_success" policy.authorize(self.context, action, self.target) def test_ignore_case_role_check(self): lowercase_action = "test:lowercase_admin" uppercase_action = "test:uppercase_admin" admin_context = context.RequestContext('admin', 'fake', roles=['AdMiN']) policy.authorize(admin_context, lowercase_action, self.target) policy.authorize(admin_context, uppercase_action, self.target)