port common policy code to keystone
keystone.common.policy is copied from nova leave simple backend in as a shim until devstack stops referencing it Change-Id: Ibd579cfeb99465706d525b6565818a2d8f5f3b7c
This commit is contained in:
parent
e5254d48b1
commit
a2f2274c69
@ -47,7 +47,7 @@ driver = keystone.token.backends.kvs.Token
|
||||
expiration = 86400
|
||||
|
||||
[policy]
|
||||
driver = keystone.policy.backends.simple.SimpleMatch
|
||||
driver = keystone.policy.backends.rules.Policy
|
||||
|
||||
[ec2]
|
||||
driver = keystone.contrib.ec2.backends.kvs.Ec2
|
||||
|
3
etc/policy.json
Normal file
3
etc/policy.json
Normal file
@ -0,0 +1,3 @@
|
||||
{
|
||||
"admin_required": [["role:admin"], ["is_admin:1"]]
|
||||
}
|
207
keystone/common/policy.py
Normal file
207
keystone/common/policy.py
Normal file
@ -0,0 +1,207 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2011 OpenStack, LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Common Policy Engine Implementation"""
|
||||
|
||||
import json
|
||||
import urllib
|
||||
import urllib2
|
||||
|
||||
|
||||
class NotAuthorized(Exception):
|
||||
pass
|
||||
|
||||
|
||||
_BRAIN = None
|
||||
|
||||
|
||||
def set_brain(brain):
|
||||
"""Set the brain used by enforce().
|
||||
|
||||
Defaults use Brain() if not set.
|
||||
|
||||
"""
|
||||
global _BRAIN
|
||||
_BRAIN = brain
|
||||
|
||||
|
||||
def reset():
|
||||
"""Clear the brain used by enforce()."""
|
||||
global _BRAIN
|
||||
_BRAIN = None
|
||||
|
||||
|
||||
def enforce(match_list, target_dict, credentials_dict):
|
||||
"""Enforces authorization of some rules against credentials.
|
||||
|
||||
:param match_list: nested tuples of data to match against
|
||||
The basic brain supports three types of match lists:
|
||||
1) rules
|
||||
looks like: ('rule:compute:get_instance',)
|
||||
Retrieves the named rule from the rules dict and recursively
|
||||
checks against the contents of the rule.
|
||||
2) roles
|
||||
looks like: ('role:compute:admin',)
|
||||
Matches if the specified role is in credentials_dict['roles'].
|
||||
3) generic
|
||||
('tenant_id:%(tenant_id)s',)
|
||||
Substitutes values from the target dict into the match using
|
||||
the % operator and matches them against the creds dict.
|
||||
|
||||
Combining rules:
|
||||
The brain returns True if any of the outer tuple of rules match
|
||||
and also True if all of the inner tuples match. You can use this to
|
||||
perform simple boolean logic. For example, the following rule would
|
||||
return True if the creds contain the role 'admin' OR the if the
|
||||
tenant_id matches the target dict AND the the creds contains the
|
||||
role 'compute_sysadmin':
|
||||
|
||||
{
|
||||
"rule:combined": (
|
||||
'role:admin',
|
||||
('tenant_id:%(tenant_id)s', 'role:compute_sysadmin')
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
Note that rule and role are reserved words in the credentials match, so
|
||||
you can't match against properties with those names. Custom brains may
|
||||
also add new reserved words. For example, the HttpBrain adds http as a
|
||||
reserved word.
|
||||
|
||||
:param target_dict: dict of object properties
|
||||
Target dicts contain as much information as we can about the object being
|
||||
operated on.
|
||||
|
||||
:param credentials_dict: dict of actor properties
|
||||
Credentials dicts contain as much information as we can about the user
|
||||
performing the action.
|
||||
|
||||
:raises NotAuthorized if the check fails
|
||||
|
||||
"""
|
||||
global _BRAIN
|
||||
if not _BRAIN:
|
||||
_BRAIN = Brain()
|
||||
if not _BRAIN.check(match_list, target_dict, credentials_dict):
|
||||
raise NotAuthorized()
|
||||
|
||||
|
||||
class Brain(object):
|
||||
"""Implements policy checking."""
|
||||
@classmethod
|
||||
def load_json(cls, data, default_rule=None):
|
||||
"""Init a brain using json instead of a rules dictionary."""
|
||||
rules_dict = json.loads(data)
|
||||
return cls(rules=rules_dict, default_rule=default_rule)
|
||||
|
||||
def __init__(self, rules=None, default_rule=None):
|
||||
self.rules = rules or {}
|
||||
self.default_rule = default_rule
|
||||
|
||||
def add_rule(self, key, match):
|
||||
self.rules[key] = match
|
||||
|
||||
def _check(self, match, target_dict, cred_dict):
|
||||
match_kind, match_value = match.split(':', 1)
|
||||
try:
|
||||
f = getattr(self, '_check_%s' % match_kind)
|
||||
except AttributeError:
|
||||
if not self._check_generic(match, target_dict, cred_dict):
|
||||
return False
|
||||
else:
|
||||
if not f(match_value, target_dict, cred_dict):
|
||||
return False
|
||||
return True
|
||||
|
||||
def check(self, match_list, target_dict, cred_dict):
|
||||
"""Checks authorization of some rules against credentials.
|
||||
|
||||
Detailed description of the check with examples in policy.enforce().
|
||||
|
||||
:param match_list: nested tuples of data to match against
|
||||
:param target_dict: dict of object properties
|
||||
:param credentials_dict: dict of actor properties
|
||||
|
||||
:returns: True if the check passes
|
||||
|
||||
"""
|
||||
if not match_list:
|
||||
return True
|
||||
for and_list in match_list:
|
||||
if isinstance(and_list, basestring):
|
||||
and_list = (and_list,)
|
||||
if all([self._check(item, target_dict, cred_dict)
|
||||
for item in and_list]):
|
||||
return True
|
||||
return False
|
||||
|
||||
def _check_rule(self, match, target_dict, cred_dict):
|
||||
"""Recursively checks credentials based on the brains rules."""
|
||||
try:
|
||||
new_match_list = self.rules[match]
|
||||
except KeyError:
|
||||
if self.default_rule and match != self.default_rule:
|
||||
new_match_list = ('rule:%s' % self.default_rule,)
|
||||
else:
|
||||
return False
|
||||
|
||||
return self.check(new_match_list, target_dict, cred_dict)
|
||||
|
||||
def _check_role(self, match, target_dict, cred_dict):
|
||||
"""Check that there is a matching role in the cred dict."""
|
||||
return match.lower() in [x.lower() for x in cred_dict['roles']]
|
||||
|
||||
def _check_generic(self, match, target_dict, cred_dict):
|
||||
"""Check an individual match.
|
||||
|
||||
Matches look like:
|
||||
|
||||
tenant:%(tenant_id)s
|
||||
role:compute:admin
|
||||
|
||||
"""
|
||||
|
||||
# TODO(termie): do dict inspection via dot syntax
|
||||
match = match % target_dict
|
||||
key, value = match.split(':', 1)
|
||||
if key in cred_dict:
|
||||
return value == cred_dict[key]
|
||||
return False
|
||||
|
||||
|
||||
class HttpBrain(Brain):
|
||||
"""A brain that can check external urls for policy.
|
||||
|
||||
Posts json blobs for target and credentials.
|
||||
|
||||
"""
|
||||
|
||||
def _check_http(self, match, target_dict, cred_dict):
|
||||
"""Check http: rules by calling to a remote server.
|
||||
|
||||
This example implementation simply verifies that the response is
|
||||
exactly 'True'. A custom brain using response codes could easily
|
||||
be implemented.
|
||||
|
||||
"""
|
||||
url = match % target_dict
|
||||
data = {'target': json.dumps(target_dict),
|
||||
'credentials': json.dumps(cred_dict)}
|
||||
post_data = urllib.urlencode(data)
|
||||
f = urllib2.urlopen(url, post_data)
|
||||
return f.read() == "True"
|
@ -22,6 +22,7 @@ import base64
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
@ -61,6 +62,48 @@ def import_object(import_str, *args, **kw):
|
||||
return cls(*args, **kw)
|
||||
|
||||
|
||||
def find_config(config_path):
|
||||
"""Find a configuration file using the given hint.
|
||||
|
||||
:param config_path: Full or relative path to the config.
|
||||
:returns: Full path of the config, if it exists.
|
||||
|
||||
"""
|
||||
possible_locations = [
|
||||
config_path,
|
||||
os.path.join('etc', 'keystone', config_path),
|
||||
os.path.join('etc', config_path),
|
||||
os.path.join(config_path),
|
||||
'/etc/keystone/%s' % config_path,
|
||||
]
|
||||
|
||||
for path in possible_locations:
|
||||
if os.path.exists(path):
|
||||
return os.path.abspath(path)
|
||||
|
||||
raise Exception('Config not found: %s', os.path.abspath(config_path))
|
||||
|
||||
|
||||
def read_cached_file(filename, cache_info, reload_func=None):
|
||||
"""Read from a file if it has been modified.
|
||||
|
||||
:param cache_info: dictionary to hold opaque cache.
|
||||
:param reload_func: optional function to be called with data when
|
||||
file is reloaded due to a modification.
|
||||
|
||||
:returns: data from file
|
||||
|
||||
"""
|
||||
mtime = os.path.getmtime(filename)
|
||||
if not cache_info or mtime != cache_info.get('mtime'):
|
||||
with open(filename) as fap:
|
||||
cache_info['data'] = fap.read()
|
||||
cache_info['mtime'] = mtime
|
||||
if reload_func:
|
||||
reload_func(cache_info['data'])
|
||||
return cache_info['data']
|
||||
|
||||
|
||||
class SmarterEncoder(json.JSONEncoder):
|
||||
"""Help for JSON encoding dict-like objects."""
|
||||
def default(self, obj):
|
||||
|
@ -214,9 +214,10 @@ class Application(BaseApplication):
|
||||
creds['roles'] = [self.identity_api.get_role(context, role)['name']
|
||||
for role in creds.get('roles', [])]
|
||||
# Accept either is_admin or the admin role
|
||||
assert self.policy_api.can_haz(context,
|
||||
('is_admin:1', 'roles:admin'),
|
||||
creds)
|
||||
self.policy_api.enforce(context,
|
||||
creds,
|
||||
'admin_required',
|
||||
{})
|
||||
|
||||
|
||||
class Middleware(Application):
|
||||
|
104
keystone/policy/backends/rules.py
Normal file
104
keystone/policy/backends/rules.py
Normal file
@ -0,0 +1,104 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (c) 2011 OpenStack, LLC.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Rules-based Policy Engine."""
|
||||
|
||||
from keystone import config
|
||||
from keystone import exception
|
||||
from keystone import policy
|
||||
from keystone.common import logging
|
||||
from keystone.common import policy as common_policy
|
||||
from keystone.common import utils
|
||||
from keystone.openstack.common import cfg
|
||||
|
||||
|
||||
policy_opts = [
|
||||
cfg.StrOpt('policy_file',
|
||||
default='policy.json',
|
||||
help=_('JSON file representing policy')),
|
||||
cfg.StrOpt('policy_default_rule',
|
||||
default='default',
|
||||
help=_('Rule checked when requested rule is not found')),
|
||||
]
|
||||
|
||||
|
||||
CONF = config.CONF
|
||||
CONF.register_opts(policy_opts)
|
||||
|
||||
|
||||
LOG = logging.getLogger('keystone.policy.backends.rules')
|
||||
|
||||
|
||||
_POLICY_PATH = None
|
||||
_POLICY_CACHE = {}
|
||||
|
||||
|
||||
def reset():
|
||||
global _POLICY_PATH
|
||||
global _POLICY_CACHE
|
||||
_POLICY_PATH = None
|
||||
_POLICY_CACHE = {}
|
||||
common_policy.reset()
|
||||
|
||||
|
||||
def init():
|
||||
global _POLICY_PATH
|
||||
global _POLICY_CACHE
|
||||
if not _POLICY_PATH:
|
||||
_POLICY_PATH = utils.find_config(CONF.policy_file)
|
||||
utils.read_cached_file(_POLICY_PATH,
|
||||
_POLICY_CACHE,
|
||||
reload_func=_set_brain)
|
||||
|
||||
|
||||
def _set_brain(data):
|
||||
default_rule = CONF.policy_default_rule
|
||||
common_policy.set_brain(
|
||||
common_policy.HttpBrain.load_json(data, default_rule))
|
||||
|
||||
|
||||
def enforce(credentials, action, target):
|
||||
"""Verifies that the action is valid on the target in this context.
|
||||
|
||||
:param credentials: user credentials
|
||||
:param action: string representing the action to be checked
|
||||
this should be colon separated for clarity.
|
||||
i.e. compute:create_instance
|
||||
compute:attach_volume
|
||||
volume:attach_volume
|
||||
|
||||
:param object: dictionary representing the object of the action
|
||||
for object creation this should be a dictionary representing the
|
||||
location of the object e.g. {'tenant_id': object.tenant_id}
|
||||
|
||||
:raises: `exception.Forbidden` if verification fails.
|
||||
|
||||
"""
|
||||
init()
|
||||
|
||||
match_list = ('rule:%s' % action,)
|
||||
|
||||
try:
|
||||
common_policy.enforce(match_list, target, credentials)
|
||||
except common_policy.NotAuthorized:
|
||||
raise exception.Forbidden(action=action)
|
||||
|
||||
|
||||
class Policy(policy.Driver):
|
||||
def enforce(self, credentials, action, target):
|
||||
LOG.debug('enforce %s: %s', action, credentials)
|
||||
enforce(credentials, action, target)
|
@ -14,24 +14,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# This file exists as a shim to get devstack testing to pass.
|
||||
# It will be removed once devstack has been updated.
|
||||
|
||||
from keystone.common import logging
|
||||
from keystone.policy.backends import rules
|
||||
|
||||
|
||||
class TrivialTrue(object):
|
||||
def can_haz(self, target, credentials):
|
||||
return True
|
||||
|
||||
|
||||
class SimpleMatch(object):
|
||||
def can_haz(self, target, credentials):
|
||||
"""Check whether key-values in target are present in credentials."""
|
||||
# TODO(termie): handle ANDs, probably by providing a tuple instead of a
|
||||
# string
|
||||
for requirement in target:
|
||||
key, match = requirement.split(':', 1)
|
||||
check = credentials.get(key)
|
||||
if check is None or isinstance(check, basestring):
|
||||
check = [check]
|
||||
if match in check:
|
||||
return True
|
||||
SimpleMatch = rules.Policy
|
||||
|
@ -33,3 +33,13 @@ class Manager(manager.Manager):
|
||||
|
||||
def __init__(self):
|
||||
super(Manager, self).__init__(CONF.policy.driver)
|
||||
|
||||
|
||||
class Driver(object):
|
||||
def enforce(context, credentials, action, target):
|
||||
"""Verify that a user is authorized to perform action.
|
||||
|
||||
For more information on a full implementation of this see:
|
||||
`keystone.common.policy.enforce`.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
@ -20,7 +20,9 @@ import subprocess
|
||||
import sys
|
||||
import time
|
||||
|
||||
import mox
|
||||
from paste import deploy
|
||||
import stubout
|
||||
|
||||
from keystone import config
|
||||
from keystone.common import kvs
|
||||
@ -123,18 +125,26 @@ class TestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
super(TestCase, self).setUp()
|
||||
self.config()
|
||||
self.mox = mox.Mox()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
|
||||
def config(self):
|
||||
CONF(config_files=[etcdir('keystone.conf'),
|
||||
testsdir('test_overrides.conf')])
|
||||
|
||||
def tearDown(self):
|
||||
for path in self._paths:
|
||||
if path in sys.path:
|
||||
sys.path.remove(path)
|
||||
kvs.INMEMDB.clear()
|
||||
self.reset_opts()
|
||||
super(TestCase, self).tearDown()
|
||||
try:
|
||||
self.mox.UnsetStubs()
|
||||
self.stubs.UnsetAll()
|
||||
self.stubs.SmartUnsetAll()
|
||||
self.mox.VerifyAll()
|
||||
super(TestCase, self).tearDown()
|
||||
finally:
|
||||
for path in self._paths:
|
||||
if path in sys.path:
|
||||
sys.path.remove(path)
|
||||
kvs.INMEMDB.clear()
|
||||
self.reset_opts()
|
||||
|
||||
def opt(self, **kw):
|
||||
for k, v in kw.iteritems():
|
||||
|
3
tests/policy.json
Normal file
3
tests/policy.json
Normal file
@ -0,0 +1,3 @@
|
||||
{
|
||||
"admin_required": [["role:Keystadasd"], ["is_admin:1"]]
|
||||
}
|
180
tests/test_policy.py
Normal file
180
tests/test_policy.py
Normal file
@ -0,0 +1,180 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 Piston Cloud Computing, Inc.
|
||||
# All Rights Reserved.
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import StringIO
|
||||
import tempfile
|
||||
import urllib2
|
||||
|
||||
from keystone import config
|
||||
from keystone import exception
|
||||
from keystone import test
|
||||
from keystone.common import policy as common_policy
|
||||
from keystone.policy.backends import rules
|
||||
|
||||
|
||||
CONF = config.CONF
|
||||
|
||||
|
||||
class PolicyFileTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(PolicyFileTestCase, self).setUp()
|
||||
rules.reset()
|
||||
_unused, self.tmpfilename = tempfile.mkstemp()
|
||||
self.opt(policy_file=self.tmpfilename)
|
||||
self.target = {}
|
||||
|
||||
def tearDown(self):
|
||||
super(PolicyFileTestCase, self).tearDown()
|
||||
rules.reset()
|
||||
|
||||
def test_modified_policy_reloads(self):
|
||||
action = "example:test"
|
||||
empty_credentials = {}
|
||||
with open(self.tmpfilename, "w") as policyfile:
|
||||
policyfile.write("""{"example:test": []}""")
|
||||
rules.enforce(empty_credentials, action, self.target)
|
||||
with open(self.tmpfilename, "w") as policyfile:
|
||||
policyfile.write("""{"example:test": ["false:false"]}""")
|
||||
# NOTE(vish): reset stored policy cache so we don't have to sleep(1)
|
||||
rules._POLICY_CACHE = {}
|
||||
self.assertRaises(exception.Forbidden, rules.enforce,
|
||||
empty_credentials, action, self.target)
|
||||
|
||||
|
||||
class PolicyTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(PolicyTestCase, self).setUp()
|
||||
rules.reset()
|
||||
# NOTE(vish): preload rules to circumvent reloading from file
|
||||
rules.init()
|
||||
brain = {
|
||||
"true": [],
|
||||
"example:allowed": [],
|
||||
"example:denied": [["false:false"]],
|
||||
"example:get_http": [["http:http://www.example.com"]],
|
||||
"example:my_file": [["role:compute_admin"],
|
||||
["project_id:%(project_id)s"]],
|
||||
"example:early_and_fail": [["false:false", "rule:true"]],
|
||||
"example:early_or_success": [["rule:true"], ["false:false"]],
|
||||
"example:lowercase_admin": [["role:admin"], ["role:sysadmin"]],
|
||||
"example:uppercase_admin": [["role:ADMIN"], ["role:sysadmin"]],
|
||||
}
|
||||
# NOTE(vish): then overload underlying brain
|
||||
common_policy.set_brain(common_policy.HttpBrain(brain))
|
||||
self.credentials = {}
|
||||
self.target = {}
|
||||
|
||||
def tearDown(self):
|
||||
rules.reset()
|
||||
super(PolicyTestCase, self).tearDown()
|
||||
|
||||
def test_enforce_nonexistent_action_throws(self):
|
||||
action = "example:noexist"
|
||||
self.assertRaises(exception.Forbidden, rules.enforce,
|
||||
self.credentials, action, self.target)
|
||||
|
||||
def test_enforce_bad_action_throws(self):
|
||||
action = "example:denied"
|
||||
self.assertRaises(exception.Forbidden, rules.enforce,
|
||||
self.credentials, action, self.target)
|
||||
|
||||
def test_enforce_good_action(self):
|
||||
action = "example:allowed"
|
||||
rules.enforce(self.credentials, action, self.target)
|
||||
|
||||
def test_enforce_http_true(self):
|
||||
|
||||
def fakeurlopen(url, post_data):
|
||||
return StringIO.StringIO("True")
|
||||
|
||||
self.stubs.Set(urllib2, 'urlopen', fakeurlopen)
|
||||
action = "example:get_http"
|
||||
target = {}
|
||||
result = rules.enforce(self.credentials, action, target)
|
||||
self.assertEqual(result, None)
|
||||
|
||||
def test_enforce_http_false(self):
|
||||
|
||||
def fakeurlopen(url, post_data):
|
||||
return StringIO.StringIO("False")
|
||||
self.stubs.Set(urllib2, 'urlopen', fakeurlopen)
|
||||
action = "example:get_http"
|
||||
target = {}
|
||||
self.assertRaises(exception.Forbidden, rules.enforce,
|
||||
self.credentials, action, target)
|
||||
|
||||
def test_templatized_enforcement(self):
|
||||
target_mine = {'project_id': 'fake'}
|
||||
target_not_mine = {'project_id': 'another'}
|
||||
credentials = {'project_id': 'fake', 'roles': []}
|
||||
action = "example:my_file"
|
||||
rules.enforce(credentials, action, target_mine)
|
||||
self.assertRaises(exception.Forbidden, rules.enforce,
|
||||
credentials, action, target_not_mine)
|
||||
|
||||
def test_early_AND_enforcement(self):
|
||||
action = "example:early_and_fail"
|
||||
self.assertRaises(exception.Forbidden, rules.enforce,
|
||||
self.credentials, action, self.target)
|
||||
|
||||
def test_early_OR_enforcement(self):
|
||||
action = "example:early_or_success"
|
||||
rules.enforce(self.credentials, action, self.target)
|
||||
|
||||
def test_ignore_case_role_check(self):
|
||||
lowercase_action = "example:lowercase_admin"
|
||||
uppercase_action = "example:uppercase_admin"
|
||||
# NOTE(dprince) we mix case in the Admin role here to ensure
|
||||
# case is ignored
|
||||
admin_credentials = {'roles': ['AdMiN']}
|
||||
rules.enforce(admin_credentials, lowercase_action, self.target)
|
||||
rules.enforce(admin_credentials, uppercase_action, self.target)
|
||||
|
||||
|
||||
class DefaultPolicyTestCase(test.TestCase):
|
||||
def setUp(self):
|
||||
super(DefaultPolicyTestCase, self).setUp()
|
||||
rules.reset()
|
||||
rules.init()
|
||||
|
||||
self.brain = {
|
||||
"default": [],
|
||||
"example:exist": [["false:false"]]
|
||||
}
|
||||
|
||||
self._set_brain('default')
|
||||
self.credentials = {}
|
||||
|
||||
def _set_brain(self, default_rule):
|
||||
brain = common_policy.HttpBrain(self.brain, default_rule)
|
||||
common_policy.set_brain(brain)
|
||||
|
||||
def tearDown(self):
|
||||
super(DefaultPolicyTestCase, self).setUp()
|
||||
rules.reset()
|
||||
|
||||
def test_policy_called(self):
|
||||
self.assertRaises(exception.Forbidden, rules.enforce,
|
||||
self.credentials, "example:exist", {})
|
||||
|
||||
def test_not_found_policy_calls_default(self):
|
||||
rules.enforce(self.credentials, "example:noexist", {})
|
||||
|
||||
def test_default_not_found(self):
|
||||
self._set_brain("default_noexist")
|
||||
self.assertRaises(exception.Forbidden, rules.enforce,
|
||||
self.credentials, "example:noexist", {})
|
Loading…
Reference in New Issue
Block a user