Add policy registration and authorize method
A new RuleDefault class has been added which can be used to register policies that will be used by a consumer of oslo.policy. These policies are merged with those defined in policy files, with the file definitions overriding the defaults Registering a policy with the same name twice is considered an error and will raise a DuplicateRuleDefaultError exception. To facilitate projects wishing to ensure that all policies are registered before use an authorize method has been added which errors when checking an unregistered policy. If the policy has been registered then the enforce method is called in the normal manner. Change-Id: I3b6423aeed8ae80e8bf73dbda0f63ef379ccef43 Implements: bp policy-in-code
This commit is contained in:
@@ -37,3 +37,40 @@ When using oslo.policy
|
||||
from keystone import config
|
||||
CONF = config.CONF
|
||||
enforcer = policy.Enforcer(CONF, policy_file=_POLICY_PATH)
|
||||
|
||||
Registering policy defaults in code
|
||||
===================================
|
||||
|
||||
A project can register policy defaults in their code which brings with it some
|
||||
benefits.
|
||||
|
||||
* A deployer only needs to add a policy file if they wish to override the
|
||||
project defaults.
|
||||
|
||||
* Projects can use Enforcer.authorize to ensure that a policy check is being
|
||||
done against a registered policy. This can be used to ensure that all
|
||||
policies used are registered. The signature of Enforcer.authorize matches
|
||||
Enforcer.enforce.
|
||||
|
||||
* More will be documented as capabilities are added.
|
||||
|
||||
How to register
|
||||
---------------
|
||||
|
||||
::
|
||||
|
||||
from oslo_config import cfg
|
||||
CONF = cfg.CONF
|
||||
enforcer = policy.Enforcer(CONF, policy_file=_POLICY_PATH)
|
||||
|
||||
base_rules = [
|
||||
policy.RuleDefault('admin_required', 'role:admin or is_admin:1',
|
||||
description='Who is considered an admin'),
|
||||
policy.RuleDefault('service_role', 'role:service',
|
||||
description='service role'),
|
||||
]
|
||||
|
||||
enforcer.register_defaults(base_rules)
|
||||
enforcer.register_default(policy.RuleDefault('identity:create_region',
|
||||
'rule:admin_required',
|
||||
description='helpful text'))
|
||||
|
||||
@@ -296,6 +296,37 @@ class PolicyNotAuthorized(Exception):
|
||||
super(PolicyNotAuthorized, self).__init__(msg)
|
||||
|
||||
|
||||
class DuplicatePolicyError(Exception):
|
||||
def __init__(self, name):
|
||||
msg = _('Policy %(name)s is already registered') % {'name': name}
|
||||
super(DuplicatePolicyError, self).__init__(msg)
|
||||
|
||||
|
||||
class PolicyNotRegistered(Exception):
|
||||
def __init__(self, name):
|
||||
msg = _('Policy %(name)s has not been registered') % {'name': name}
|
||||
super(PolicyNotRegistered, self).__init__(msg)
|
||||
|
||||
|
||||
def parse_file_contents(data):
|
||||
"""Parse the raw contents of a policy file.
|
||||
|
||||
Parses the contents of a policy file which currently can be in either
|
||||
yaml or json format. Both can be parsed as yaml.
|
||||
|
||||
:param data: A string containing the contents of a policy file.
|
||||
:returns: A dict of of the form {'policy_name1': 'policy1',
|
||||
'policy_name2': 'policy2,...}
|
||||
"""
|
||||
try:
|
||||
parsed = yaml.safe_load(data)
|
||||
except yaml.YAMLError as e:
|
||||
# For backwards-compatibility, convert yaml error to ValueError,
|
||||
# which is what JSON loader raised.
|
||||
raise ValueError(six.text_type(e))
|
||||
return parsed
|
||||
|
||||
|
||||
class Rules(dict):
|
||||
"""A store for rules. Handles the default_rule setting directly."""
|
||||
|
||||
@@ -306,16 +337,10 @@ class Rules(dict):
|
||||
.. versionadded:: 1.5.0
|
||||
|
||||
"""
|
||||
|
||||
try:
|
||||
parsed = yaml.safe_load(data)
|
||||
except yaml.YAMLError as e:
|
||||
# For backwards-compatibility, convert yaml error to ValueError,
|
||||
# which is what JSON loader raised.
|
||||
raise ValueError(six.text_type(e))
|
||||
parsed_file = parse_file_contents(data)
|
||||
|
||||
# Parse the rules
|
||||
rules = {k: _parser.parse_rule(v) for k, v in parsed.items()}
|
||||
rules = {k: _parser.parse_rule(v) for k, v in parsed_file.items()}
|
||||
|
||||
return cls(rules, default_rule)
|
||||
|
||||
@@ -413,6 +438,8 @@ class Enforcer(object):
|
||||
self.default_rule = (default_rule or
|
||||
self.conf.oslo_policy.policy_default_rule)
|
||||
self.rules = Rules(rules, self.default_rule)
|
||||
self.registered_rules = {}
|
||||
self.file_rules = {}
|
||||
|
||||
self.policy_path = None
|
||||
|
||||
@@ -453,6 +480,8 @@ class Enforcer(object):
|
||||
self._loaded_files = []
|
||||
self._policy_dir_mtimes = {}
|
||||
self._file_cache.clear()
|
||||
self.registered_rules = {}
|
||||
self.file_rules = {}
|
||||
|
||||
def load_rules(self, force_reload=False):
|
||||
"""Loads policy_path's rules.
|
||||
@@ -482,6 +511,10 @@ class Enforcer(object):
|
||||
self._load_policy_file,
|
||||
force_reload, False)
|
||||
|
||||
for default in self.registered_rules.values():
|
||||
if default.name not in self.rules:
|
||||
self.rules[default.name] = default.check
|
||||
|
||||
@staticmethod
|
||||
def _is_directory_updated(cache, path):
|
||||
# Get the current modified time and compare it to what is in
|
||||
@@ -510,12 +543,29 @@ class Enforcer(object):
|
||||
for policy_file in [p for p in policy_files if not p.startswith('.')]:
|
||||
func(os.path.join(path, policy_file), *args)
|
||||
|
||||
def _record_file_rules(self, data, overwrite=False):
|
||||
"""Store a copy of rules loaded from a file.
|
||||
|
||||
It is useful to be able to distinguish between rules loaded from a file
|
||||
and those registered by a consuming service. In order to do so we keep
|
||||
a record of rules loaded from a file.
|
||||
|
||||
:param data: The raw contents of a policy file.
|
||||
:param overwrite: If True clear out previously loaded rules.
|
||||
"""
|
||||
if overwrite:
|
||||
self.file_rules = {}
|
||||
parsed_file = parse_file_contents(data)
|
||||
for name, check_str in parsed_file.items():
|
||||
self.file_rules[name] = RuleDefault(name, check_str)
|
||||
|
||||
def _load_policy_file(self, path, force_reload, overwrite=True):
|
||||
reloaded, data = _cache_handler.read_cached_file(
|
||||
self._file_cache, path, force_reload=force_reload)
|
||||
if reloaded or not self.rules:
|
||||
rules = Rules.load(data, self.default_rule)
|
||||
self.set_rules(rules, overwrite=overwrite, use_conf=True)
|
||||
self._record_file_rules(data, overwrite)
|
||||
self._loaded_files.append(path)
|
||||
LOG.debug('Reloaded policy file: %(path)s', {'path': path})
|
||||
|
||||
@@ -589,3 +639,70 @@ class Enforcer(object):
|
||||
raise PolicyNotAuthorized(rule, target, creds)
|
||||
|
||||
return result
|
||||
|
||||
def register_default(self, default):
|
||||
"""Registers a RuleDefault.
|
||||
|
||||
Adds a RuleDefault to the list of registered rules. Rules must be
|
||||
registered before using the Enforcer.authorize method.
|
||||
|
||||
:param default: A RuleDefault object to register.
|
||||
"""
|
||||
if default.name in self.registered_rules:
|
||||
raise DuplicatePolicyError(default.name)
|
||||
|
||||
self.registered_rules[default.name] = default
|
||||
|
||||
def register_defaults(self, defaults):
|
||||
"""Registers a list of RuleDefaults.
|
||||
|
||||
Adds each RuleDefault to the list of registered rules. Rules must be
|
||||
registered before using the Enforcer.authorize method.
|
||||
|
||||
:param default: A list of RuleDefault objects to register.
|
||||
"""
|
||||
for default in defaults:
|
||||
self.register_default(default)
|
||||
|
||||
def authorize(self, rule, target, creds, do_raise=False,
|
||||
exc=None, *args, **kwargs):
|
||||
"""A wrapper around 'enforce' that checks for policy registration.
|
||||
|
||||
To ensure that a policy being checked has been registered this method
|
||||
should be used rather than enforce. By doing so a project can be sure
|
||||
that all of it's used policies are registered and therefore available
|
||||
for sample file generation.
|
||||
|
||||
The parameters match the enforce method and a description of them can
|
||||
be found there.
|
||||
"""
|
||||
if rule not in self.registered_rules:
|
||||
raise PolicyNotRegistered(rule)
|
||||
return self.enforce(rule, target, creds, do_raise, exc,
|
||||
*args, **kwargs)
|
||||
|
||||
|
||||
class RuleDefault(object):
|
||||
"""A class for holding policy definitions.
|
||||
|
||||
It is required to supply a name and value at creation time. It is
|
||||
encouraged to also supply a description to assist operators.
|
||||
|
||||
:param name: The name of the policy. This is used when referencing it
|
||||
from another rule or during policy enforcement.
|
||||
:param check_str: The policy. This is a string defining a policy that
|
||||
conforms to the policy language outlined at the top of
|
||||
the file.
|
||||
:param description: A plain text description of the policy. This will be
|
||||
used to comment sample policy files for use by
|
||||
deployers.
|
||||
"""
|
||||
def __init__(self, name, check_str, description=None):
|
||||
self.name = name
|
||||
self.check_str = check_str
|
||||
self.check = _parser.parse_rule(check_str)
|
||||
self.description = description
|
||||
|
||||
def __str__(self):
|
||||
return '"%(name)s": "%(check_str)s"' % {'name': self.name,
|
||||
'check_str': self.check_str}
|
||||
|
||||
@@ -39,6 +39,29 @@ POLICY_JSON_CONTENTS = jsonutils.dumps({
|
||||
})
|
||||
|
||||
|
||||
@_checks.register('field')
|
||||
class FieldCheck(_checks.Check):
|
||||
"""A non reversible check.
|
||||
|
||||
All oslo.policy defined checks have a __str__ method with the property that
|
||||
rule == str(_parser.parse_rule(rule)). Consumers of olso.policy may have
|
||||
defined checks for which that does not hold true. This FieldCheck is not
|
||||
reversible so we can use it for testing to ensure that this type of check
|
||||
does not break anything.
|
||||
"""
|
||||
def __init__(self, kind, match):
|
||||
# Process the match
|
||||
resource, field_value = match.split(':', 1)
|
||||
field, value = field_value.split('=', 1)
|
||||
super(FieldCheck, self).__init__(kind, '%s:%s:%s' %
|
||||
(resource, field, value))
|
||||
self.field = field
|
||||
self.value = value
|
||||
|
||||
def __call__(self, target_dict, cred_dict, enforcer):
|
||||
return True
|
||||
|
||||
|
||||
class MyException(Exception):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.args = args
|
||||
@@ -184,12 +207,41 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
self.enforcer._loaded_files
|
||||
)
|
||||
|
||||
def _test_scenario_with_opts_registered(self, scenario, *args, **kwargs):
|
||||
# This test registers some rules, calls the scenario and then checks
|
||||
# the registered rules. The scenario should be a method which loads
|
||||
# policy files containing POLICY_*_CONTENTS defined above. They should
|
||||
# be loaded on the self.enforcer object.
|
||||
|
||||
# This should be overridden by the policy file
|
||||
self.enforcer.register_default(policy.RuleDefault(name='admin',
|
||||
check_str='is_admin:False'))
|
||||
# This is not in the policy file, only registered
|
||||
self.enforcer.register_default(policy.RuleDefault(name='owner',
|
||||
check_str='role:owner'))
|
||||
|
||||
scenario(*args, **kwargs)
|
||||
|
||||
self.assertIn('owner', self.enforcer.rules)
|
||||
self.assertEqual('role:owner', str(self.enforcer.rules['owner']))
|
||||
self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
|
||||
self.assertIn('owner', self.enforcer.registered_rules)
|
||||
self.assertIn('admin', self.enforcer.registered_rules)
|
||||
self.assertNotIn('default', self.enforcer.registered_rules)
|
||||
self.assertNotIn('owner', self.enforcer.file_rules)
|
||||
self.assertIn('admin', self.enforcer.file_rules)
|
||||
self.assertIn('default', self.enforcer.file_rules)
|
||||
|
||||
def test_load_file(self):
|
||||
self.conf.set_override('policy_dirs', [], group='oslo_policy')
|
||||
self.enforcer.load_rules(True)
|
||||
self.assertIsNotNone(self.enforcer.rules)
|
||||
self.assertIn('default', self.enforcer.rules)
|
||||
self.assertIn('admin', self.enforcer.rules)
|
||||
self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
|
||||
|
||||
def test_load_file_opts_registered(self):
|
||||
self._test_scenario_with_opts_registered(self.test_load_file)
|
||||
|
||||
def test_load_directory(self):
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
@@ -205,6 +257,9 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
'policy.d/b.conf',
|
||||
])
|
||||
|
||||
def test_load_directory_opts_registered(self):
|
||||
self._test_scenario_with_opts_registered(self.test_load_directory)
|
||||
|
||||
def test_load_directory_caching_with_files_updated(self):
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
|
||||
@@ -233,8 +288,12 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
'policy.d/a.conf',
|
||||
])
|
||||
|
||||
def test_load_directory_caching_with_files_updated_opts_registered(self):
|
||||
self._test_scenario_with_opts_registered(
|
||||
self.test_load_directory_caching_with_files_updated)
|
||||
|
||||
def test_load_directory_caching_with_files_same(self, overwrite=True):
|
||||
self.enforcer = policy.Enforcer(self.conf, overwrite=overwrite)
|
||||
self.enforcer.overwrite = overwrite
|
||||
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
|
||||
@@ -260,6 +319,16 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
def test_load_directory_caching_with_files_same_but_overwrite_false(self):
|
||||
self.test_load_directory_caching_with_files_same(overwrite=False)
|
||||
|
||||
def test_load_directory_caching_with_files_same_opts_registered(self):
|
||||
self._test_scenario_with_opts_registered(
|
||||
self.test_load_directory_caching_with_files_same)
|
||||
|
||||
def test_load_dir_caching_with_files_same_overwrite_false_opts_reg(self):
|
||||
# Very long test name makes this difficult
|
||||
test = getattr(self,
|
||||
'test_load_directory_caching_with_files_same_but_overwrite_false') # NOQA
|
||||
self._test_scenario_with_opts_registered(test)
|
||||
|
||||
def test_load_multiple_directories(self):
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
|
||||
@@ -279,6 +348,10 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
'policy.2.d/fake.conf',
|
||||
])
|
||||
|
||||
def test_load_multiple_directories_opts_registered(self):
|
||||
self._test_scenario_with_opts_registered(
|
||||
self.test_load_multiple_directories)
|
||||
|
||||
def test_load_non_existed_directory(self):
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
self.conf.set_override('policy_dirs',
|
||||
@@ -290,6 +363,10 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
self.assertIn('admin', self.enforcer.rules)
|
||||
self.check_loaded_files(['policy.json', 'policy.d/a.conf'])
|
||||
|
||||
def test_load_non_existed_directory_opts_registered(self):
|
||||
self._test_scenario_with_opts_registered(
|
||||
self.test_load_non_existed_directory)
|
||||
|
||||
def test_load_policy_dirs_with_non_directory(self):
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
self.conf.set_override('policy_dirs',
|
||||
@@ -311,6 +388,17 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
self.assertIsNone(self.enforcer.default_rule)
|
||||
self.assertIsNone(self.enforcer.policy_path)
|
||||
|
||||
def test_clear_opts_registered(self):
|
||||
# This should be overridden by the policy file
|
||||
self.enforcer.register_default(policy.RuleDefault(name='admin',
|
||||
check_str='is_admin:False'))
|
||||
# This is not in the policy file, only registered
|
||||
self.enforcer.register_default(policy.RuleDefault(name='owner',
|
||||
check_str='role:owner'))
|
||||
|
||||
self.test_clear()
|
||||
self.assertEqual({}, self.enforcer.registered_rules)
|
||||
|
||||
def test_rule_with_check(self):
|
||||
rules_json = jsonutils.dumps({
|
||||
"deny_stack_user": "not role:stack_user",
|
||||
@@ -335,7 +423,7 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
creds = {'roles': ''}
|
||||
self.assertTrue(enforcer.enforce(action, {}, creds))
|
||||
|
||||
def test_enforcer_force_reload_with_overwrite(self):
|
||||
def test_enforcer_force_reload_with_overwrite(self, opts_registered=0):
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
|
||||
|
||||
@@ -362,11 +450,15 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
self.assertIn('default', self.enforcer.rules)
|
||||
self.assertIn('admin', self.enforcer.rules)
|
||||
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
|
||||
self.assertEqual(2, len(loaded_rules))
|
||||
self.assertEqual(2 + opts_registered, len(loaded_rules))
|
||||
self.assertIn('role:fakeB', loaded_rules['default'])
|
||||
self.assertIn('is_admin:True', loaded_rules['admin'])
|
||||
|
||||
def test_enforcer_force_reload_without_overwrite(self):
|
||||
def test_enforcer_force_reload_with_overwrite_opts_registered(self):
|
||||
self._test_scenario_with_opts_registered(
|
||||
self.test_enforcer_force_reload_with_overwrite, opts_registered=1)
|
||||
|
||||
def test_enforcer_force_reload_without_overwrite(self, opts_registered=0):
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
|
||||
|
||||
@@ -396,41 +488,62 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
self.assertIn('default', self.enforcer.rules)
|
||||
self.assertIn('admin', self.enforcer.rules)
|
||||
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
|
||||
self.assertEqual(3, len(loaded_rules))
|
||||
self.assertEqual(3 + opts_registered, len(loaded_rules))
|
||||
self.assertIn('role:test', loaded_rules['test'])
|
||||
self.assertIn('role:fakeB', loaded_rules['default'])
|
||||
self.assertIn('is_admin:True', loaded_rules['admin'])
|
||||
|
||||
def test_enforcer_force_reload_without_overwrite_opts_registered(self):
|
||||
self._test_scenario_with_opts_registered(
|
||||
self.test_enforcer_force_reload_without_overwrite,
|
||||
opts_registered=1)
|
||||
|
||||
def test_enforcer_keep_use_conf_flag_after_reload(self):
|
||||
self.create_config_file('policy.d/a.conf', POLICY_A_CONTENTS)
|
||||
self.create_config_file('policy.d/b.conf', POLICY_B_CONTENTS)
|
||||
|
||||
# We initialized enforcer with
|
||||
# policy configure files.
|
||||
enforcer = policy.Enforcer(self.conf)
|
||||
self.assertTrue(enforcer.use_conf)
|
||||
self.assertTrue(enforcer.enforce('default', {},
|
||||
{'roles': ['fakeB']}))
|
||||
self.assertFalse(enforcer.enforce('test', {},
|
||||
{'roles': ['test']}))
|
||||
self.assertTrue(self.enforcer.use_conf)
|
||||
self.assertTrue(self.enforcer.enforce('default', {},
|
||||
{'roles': ['fakeB']}))
|
||||
self.assertFalse(self.enforcer.enforce('test', {},
|
||||
{'roles': ['test']}))
|
||||
# After enforcement the flag should
|
||||
# be remained there.
|
||||
self.assertTrue(enforcer.use_conf)
|
||||
self.assertFalse(enforcer.enforce('_dynamic_test_rule', {},
|
||||
{'roles': ['test']}))
|
||||
self.assertTrue(self.enforcer.use_conf)
|
||||
self.assertFalse(self.enforcer.enforce('_dynamic_test_rule', {},
|
||||
{'roles': ['test']}))
|
||||
# Then if configure file got changed,
|
||||
# reloading will be triggered when calling
|
||||
# enforcer(), this case could happen only
|
||||
# when use_conf flag equals True.
|
||||
rules = jsonutils.loads(str(enforcer.rules))
|
||||
rules = jsonutils.loads(str(self.enforcer.rules))
|
||||
rules['_dynamic_test_rule'] = 'role:test'
|
||||
|
||||
with open(enforcer.policy_path, 'w') as f:
|
||||
with open(self.enforcer.policy_path, 'w') as f:
|
||||
f.write(jsonutils.dumps(rules))
|
||||
|
||||
enforcer.load_rules(force_reload=True)
|
||||
self.assertTrue(enforcer.enforce('_dynamic_test_rule', {},
|
||||
{'roles': ['test']}))
|
||||
self.enforcer.load_rules(force_reload=True)
|
||||
self.assertTrue(self.enforcer.enforce('_dynamic_test_rule', {},
|
||||
{'roles': ['test']}))
|
||||
|
||||
def test_enforcer_keep_use_conf_flag_after_reload_opts_registered(self):
|
||||
# This test does not use _test_scenario_with_opts_registered because
|
||||
# it loads all rules and then dumps them to a policy file and reloads.
|
||||
# That breaks the ability to differentiate between registered and file
|
||||
# loaded policies.
|
||||
|
||||
# This should be overridden by the policy file
|
||||
self.enforcer.register_default(policy.RuleDefault(name='admin',
|
||||
check_str='is_admin:False'))
|
||||
# This is not in the policy file, only registered
|
||||
self.enforcer.register_default(policy.RuleDefault(name='owner',
|
||||
check_str='role:owner'))
|
||||
|
||||
self.test_enforcer_keep_use_conf_flag_after_reload()
|
||||
|
||||
self.assertIn('owner', self.enforcer.rules)
|
||||
self.assertEqual('role:owner', str(self.enforcer.rules['owner']))
|
||||
self.assertEqual('is_admin:True', str(self.enforcer.rules['admin']))
|
||||
|
||||
def test_enforcer_force_reload_false(self):
|
||||
self.enforcer.set_rules({'test': 'test'})
|
||||
@@ -481,6 +594,36 @@ class EnforcerTest(base.PolicyBaseTestCase):
|
||||
enforcer = policy.Enforcer(self.conf, )
|
||||
self.assertEqual('bar_rule', enforcer.rules.default_rule)
|
||||
|
||||
def test_enforcer_register_twice_raises(self):
|
||||
self.enforcer.register_default(policy.RuleDefault(name='owner',
|
||||
check_str='role:owner'))
|
||||
self.assertRaises(policy.DuplicatePolicyError,
|
||||
self.enforcer.register_default,
|
||||
policy.RuleDefault(name='owner',
|
||||
check_str='role:owner'))
|
||||
|
||||
def test_non_reversible_check(self):
|
||||
self.create_config_file('policy.json',
|
||||
jsonutils.dumps(
|
||||
{'shared': 'field:networks:shared=True'}))
|
||||
# load_rules succeeding without error is the focus of this test
|
||||
self.enforcer.load_rules(True)
|
||||
self.assertIsNotNone(self.enforcer.rules)
|
||||
loaded_rules = jsonutils.loads(str(self.enforcer.rules))
|
||||
self.assertNotEqual('field:networks:shared=True',
|
||||
loaded_rules['shared'])
|
||||
|
||||
def test_authorize_opt_registered(self):
|
||||
self.enforcer.register_default(policy.RuleDefault(name='test',
|
||||
check_str='role:test'))
|
||||
self.assertTrue(self.enforcer.authorize('test', {},
|
||||
{'roles': ['test']}))
|
||||
|
||||
def test_authorize_opt_not_registered(self):
|
||||
self.assertRaises(policy.PolicyNotRegistered,
|
||||
self.enforcer.authorize, 'test', {},
|
||||
{'roles': ['test']})
|
||||
|
||||
|
||||
class CheckFunctionTestCase(base.PolicyBaseTestCase):
|
||||
|
||||
@@ -565,3 +708,14 @@ class BaseCheckTypesTestCase(base.PolicyBaseTestCase):
|
||||
self.assertEqual(
|
||||
TestCheck, _checks.registered_checks[check_str],
|
||||
message='%s check type is not public.' % check_str)
|
||||
|
||||
|
||||
class RuleDefaultTestCase(base.PolicyBaseTestCase):
|
||||
def test_rule_is_parsed(self):
|
||||
opt = policy.RuleDefault(name='foo', check_str='rule:foo')
|
||||
self.assertTrue(isinstance(opt.check, _checks.BaseCheck))
|
||||
self.assertEqual('rule:foo', str(opt.check))
|
||||
|
||||
def test_str(self):
|
||||
opt = policy.RuleDefault(name='foo', check_str='rule:foo')
|
||||
self.assertEqual('"foo": "rule:foo"', str(opt))
|
||||
|
||||
Reference in New Issue
Block a user