1f7f1bd3c8
A few of the tests overwrite the name of the policy file (so they can use a temp file instead). However, if it is left that way, subsequent tests that rely on it may fail. A separate patch will look at doing a more comprehensive reset in the setup of test_v3 - ensuring we always start from a completely clean slate for all confirguration parameters. Fixes Bug #1131819 Change-Id: Ibe5ee12f44310de00b12ddd405c83f59b2d840b7
191 lines
6.9 KiB
Python
191 lines
6.9 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2011 Piston Cloud Computing, Inc.
|
|
# All Rights Reserved.
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import StringIO
|
|
import tempfile
|
|
import urllib2
|
|
|
|
from keystone import config
|
|
from keystone import exception
|
|
from keystone import test
|
|
from keystone.openstack.common import policy as common_policy
|
|
from keystone.policy.backends import rules
|
|
|
|
|
|
CONF = config.CONF
|
|
|
|
|
|
class PolicyFileTestCase(test.TestCase):
|
|
def setUp(self):
|
|
super(PolicyFileTestCase, self).setUp()
|
|
self.orig_policy_file = CONF.policy_file
|
|
rules.reset()
|
|
_unused, self.tmpfilename = tempfile.mkstemp()
|
|
self.opt(policy_file=self.tmpfilename)
|
|
self.target = {}
|
|
|
|
def tearDown(self):
|
|
super(PolicyFileTestCase, self).tearDown()
|
|
rules.reset()
|
|
self.opt(policy_file=self.orig_policy_file)
|
|
|
|
def test_modified_policy_reloads(self):
|
|
action = "example:test"
|
|
empty_credentials = {}
|
|
with open(self.tmpfilename, "w") as policyfile:
|
|
policyfile.write("""{"example:test": []}""")
|
|
rules.enforce(empty_credentials, action, self.target)
|
|
with open(self.tmpfilename, "w") as policyfile:
|
|
policyfile.write("""{"example:test": ["false:false"]}""")
|
|
# NOTE(vish): reset stored policy cache so we don't have to sleep(1)
|
|
rules._POLICY_CACHE = {}
|
|
self.assertRaises(exception.ForbiddenAction, rules.enforce,
|
|
empty_credentials, action, self.target)
|
|
|
|
|
|
class PolicyTestCase(test.TestCase):
|
|
def setUp(self):
|
|
super(PolicyTestCase, self).setUp()
|
|
rules.reset()
|
|
# NOTE(vish): preload rules to circumvent reloading from file
|
|
rules.init()
|
|
self.rules = {
|
|
"true": [],
|
|
"example:allowed": [],
|
|
"example:denied": [["false:false"]],
|
|
"example:get_http": [["http:http://www.example.com"]],
|
|
"example:my_file": [["role:compute_admin"],
|
|
["project_id:%(project_id)s"]],
|
|
"example:early_and_fail": [["false:false", "rule:true"]],
|
|
"example:early_or_success": [["rule:true"], ["false:false"]],
|
|
"example:lowercase_admin": [["role:admin"], ["role:sysadmin"]],
|
|
"example:uppercase_admin": [["role:ADMIN"], ["role:sysadmin"]],
|
|
}
|
|
|
|
# NOTE(vish): then overload underlying policy engine
|
|
self._set_rules()
|
|
self.credentials = {}
|
|
self.target = {}
|
|
|
|
def _set_rules(self):
|
|
these_rules = common_policy.Rules(
|
|
dict((k, common_policy.parse_rule(v))
|
|
for k, v in self.rules.items()))
|
|
common_policy.set_rules(these_rules)
|
|
|
|
def tearDown(self):
|
|
rules.reset()
|
|
super(PolicyTestCase, self).tearDown()
|
|
|
|
def test_enforce_nonexistent_action_throws(self):
|
|
action = "example:noexist"
|
|
self.assertRaises(exception.ForbiddenAction, rules.enforce,
|
|
self.credentials, action, self.target)
|
|
|
|
def test_enforce_bad_action_throws(self):
|
|
action = "example:denied"
|
|
self.assertRaises(exception.ForbiddenAction, rules.enforce,
|
|
self.credentials, action, self.target)
|
|
|
|
def test_enforce_good_action(self):
|
|
action = "example:allowed"
|
|
rules.enforce(self.credentials, action, self.target)
|
|
|
|
def test_enforce_http_true(self):
|
|
|
|
def fakeurlopen(url, post_data):
|
|
return StringIO.StringIO("True")
|
|
|
|
self.stubs.Set(urllib2, 'urlopen', fakeurlopen)
|
|
action = "example:get_http"
|
|
target = {}
|
|
result = rules.enforce(self.credentials, action, target)
|
|
self.assertTrue(result)
|
|
|
|
def test_enforce_http_false(self):
|
|
|
|
def fakeurlopen(url, post_data):
|
|
return StringIO.StringIO("False")
|
|
self.stubs.Set(urllib2, 'urlopen', fakeurlopen)
|
|
action = "example:get_http"
|
|
target = {}
|
|
self.assertRaises(exception.ForbiddenAction, rules.enforce,
|
|
self.credentials, action, target)
|
|
|
|
def test_templatized_enforcement(self):
|
|
target_mine = {'project_id': 'fake'}
|
|
target_not_mine = {'project_id': 'another'}
|
|
credentials = {'project_id': 'fake', 'roles': []}
|
|
action = "example:my_file"
|
|
rules.enforce(credentials, action, target_mine)
|
|
self.assertRaises(exception.ForbiddenAction, rules.enforce,
|
|
credentials, action, target_not_mine)
|
|
|
|
def test_early_AND_enforcement(self):
|
|
action = "example:early_and_fail"
|
|
self.assertRaises(exception.ForbiddenAction, rules.enforce,
|
|
self.credentials, action, self.target)
|
|
|
|
def test_early_OR_enforcement(self):
|
|
action = "example:early_or_success"
|
|
rules.enforce(self.credentials, action, self.target)
|
|
|
|
def test_ignore_case_role_check(self):
|
|
lowercase_action = "example:lowercase_admin"
|
|
uppercase_action = "example:uppercase_admin"
|
|
# NOTE(dprince) we mix case in the Admin role here to ensure
|
|
# case is ignored
|
|
admin_credentials = {'roles': ['AdMiN']}
|
|
rules.enforce(admin_credentials, lowercase_action, self.target)
|
|
rules.enforce(admin_credentials, uppercase_action, self.target)
|
|
|
|
|
|
class DefaultPolicyTestCase(test.TestCase):
|
|
def setUp(self):
|
|
super(DefaultPolicyTestCase, self).setUp()
|
|
rules.reset()
|
|
rules.init()
|
|
|
|
self.rules = {
|
|
"default": [],
|
|
"example:exist": [["false:false"]]
|
|
}
|
|
self._set_rules('default')
|
|
self.credentials = {}
|
|
|
|
def _set_rules(self, default_rule):
|
|
these_rules = common_policy.Rules(
|
|
dict((k, common_policy.parse_rule(v))
|
|
for k, v in self.rules.items()), default_rule)
|
|
common_policy.set_rules(these_rules)
|
|
|
|
def tearDown(self):
|
|
super(DefaultPolicyTestCase, self).setUp()
|
|
rules.reset()
|
|
|
|
def test_policy_called(self):
|
|
self.assertRaises(exception.ForbiddenAction, rules.enforce,
|
|
self.credentials, "example:exist", {})
|
|
|
|
def test_not_found_policy_calls_default(self):
|
|
rules.enforce(self.credentials, "example:noexist", {})
|
|
|
|
def test_default_not_found(self):
|
|
self._set_rules("default_noexist")
|
|
self.assertRaises(exception.ForbiddenAction, rules.enforce,
|
|
self.credentials, "example:noexist", {})
|