Support policy control on Admin-API request

1.What is the problem:
Currently Admin-API is to manage pod and pod-binding, the Admin-API
access is hard coded, and only admin role is allowed. OpenStack
usually use policy.json based authorization to control the
API-request. Policy feature is missing in the Tricircle.

2.What's need to be fixed:
Remove hard coded Admin-API request authorization, use policy instead.
For Nova API-GW and Cinder API-GW, the API access control should be
done at bottom OpenStack as far as possible if the API request will
be forwarded to bottom OpenStack directly for further processing;
only these APIs which only interact with database for example flavor
and volume type, because these APIs processing will be terminated at
the Tricircle layer, so policy control should be done in Nova API-GW
or Cinder API-GW. No work needs to do in Tricircle Neutron Plugin for
Neutron API server is there, Neutron API server will be responsible
for policy control.

3.What is the purpose of this patch set:
In this patch, default policy option and rule, and policy control
in Admin-API were added. Using the default option and value to
generate the policy.json will be implemented in next patch. No
policy.json is mandatory required after this patch is merged,
if no policy.json is configured or provided, the policy control
will use the default rule automatically.

Change-Id: Ifb6137b20f56e9f9a70d339fd357ee480fa3ce2e
Signed-off-by: joehuang <joehuang@huawei.com>
This commit is contained in:
joehuang 2016-08-17 02:30:47 -04:00
parent ca3bf0a29e
commit a9e6220c3a
8 changed files with 454 additions and 56 deletions

View File

@ -27,6 +27,7 @@ import tricircle.common.context as t_context
import tricircle.common.exceptions as t_exc
from tricircle.common.i18n import _
from tricircle.common.i18n import _LE
from tricircle.common import policy
from tricircle.common import utils
from tricircle.db import api as db_api
@ -45,8 +46,8 @@ class PodsController(rest.RestController):
def post(self, **kw):
context = t_context.extract_context_from_environ()
if not t_context.is_admin_context(context):
pecan.abort(400, _('Admin role required to create pods'))
if not policy.enforce(context, policy.ADMIN_API_PODS_CREATE):
pecan.abort(401, _('Unauthorized to create pods'))
return
if 'pod' not in kw:
@ -129,8 +130,8 @@ class PodsController(rest.RestController):
def get_one(self, _id):
context = t_context.extract_context_from_environ()
if not t_context.is_admin_context(context):
pecan.abort(400, _('Admin role required to show pods'))
if not policy.enforce(context, policy.ADMIN_API_PODS_SHOW):
pecan.abort(401, _('Unauthorized to show pods'))
return
try:
@ -143,8 +144,8 @@ class PodsController(rest.RestController):
def get_all(self):
context = t_context.extract_context_from_environ()
if not t_context.is_admin_context(context):
pecan.abort(400, _('Admin role required to list pods'))
if not policy.enforce(context, policy.ADMIN_API_PODS_LIST):
pecan.abort(401, _('Unauthorized to list pods'))
return
try:
@ -160,8 +161,8 @@ class PodsController(rest.RestController):
def delete(self, _id):
context = t_context.extract_context_from_environ()
if not t_context.is_admin_context(context):
pecan.abort(400, _('Admin role required to delete pods'))
if not policy.enforce(context, policy.ADMIN_API_PODS_DELETE):
pecan.abort(401, _('Unauthorized to delete pods'))
return
try:
@ -211,8 +212,8 @@ class BindingsController(rest.RestController):
def post(self, **kw):
context = t_context.extract_context_from_environ()
if not t_context.is_admin_context(context):
pecan.abort(400, _('Admin role required to create bindings'))
if not policy.enforce(context, policy.ADMIN_API_BINDINGS_CREATE):
pecan.abort(401, _('Unauthorized to create bindings'))
return
if 'pod_binding' not in kw:
@ -271,8 +272,8 @@ class BindingsController(rest.RestController):
def get_one(self, _id):
context = t_context.extract_context_from_environ()
if not t_context.is_admin_context(context):
pecan.abort(400, _('Admin role required to show bindings'))
if not policy.enforce(context, policy.ADMIN_API_BINDINGS_SHOW):
pecan.abort(401, _('Unauthorized to show bindings'))
return
try:
@ -289,8 +290,8 @@ class BindingsController(rest.RestController):
def get_all(self):
context = t_context.extract_context_from_environ()
if not t_context.is_admin_context(context):
pecan.abort(400, _('Admin role required to list bindings'))
if not policy.enforce(context, policy.ADMIN_API_BINDINGS_LIST):
pecan.abort(401, _('Unauthorized to list bindings'))
return
try:
@ -308,8 +309,8 @@ class BindingsController(rest.RestController):
def delete(self, _id):
context = t_context.extract_context_from_environ()
if not t_context.is_admin_context(context):
pecan.abort(400, _('Admin role required to delete bindings'))
if not policy.enforce(context, policy.ADMIN_API_BINDINGS_DELETE):
pecan.abort(401, _('Unauthorized to delete bindings'))
return
try:

View File

@ -16,35 +16,35 @@
"""
Routines for configuring tricircle, largely copy from Neutron
"""
import sys
from oslo_config import cfg
import oslo_log.log as logging
from oslo_policy import opts as policy_opts
from tricircle.common.i18n import _LI
# from tricircle import policy
from tricircle.common import policy
from tricircle.common import rpc
from tricircle.common import version
logging.register_options(cfg.CONF)
LOG = logging.getLogger(__name__)
policy_opts.set_defaults(cfg.CONF, 'policy.json')
def init(opts, args, **kwargs):
# Register the configuration options
cfg.CONF.register_opts(opts)
# ks_session.Session.register_conf_options(cfg.CONF)
# auth.register_conf_options(cfg.CONF)
logging.register_options(cfg.CONF)
cfg.CONF(args=args, project='tricircle',
version=version.version_info,
**kwargs)
_setup_logging()
_setup_policy()
rpc.init(cfg.CONF)
@ -60,11 +60,23 @@ def _setup_logging():
LOG.debug("command line: %s", " ".join(sys.argv))
def _setup_policy():
# if there is valid policy file, use policy file by oslo_policy
# otherwise, use the default policy value in policy.py
policy_file = cfg.CONF.oslo_policy.policy_file
if policy_file and cfg.CONF.find_file(policy_file):
# just return here, oslo_policy lib will use policy file by itself
return
policy.populate_default_rules()
def reset_service():
# Reset worker in case SIGHUP is called.
# Note that this is called only in case a service is running in
# daemon mode.
_setup_logging()
# TODO(zhiyuan) enforce policy later
# policy.refresh()
policy.reset()
_setup_policy()

View File

@ -76,7 +76,7 @@ class ContextBase(oslo_ctx.RequestContext):
def __init__(self, auth_token=None, user_id=None, tenant_id=None,
is_admin=False, read_deleted="no", request_id=None,
overwrite=True, user_name=None, tenant_name=None,
quota_class=None, **kwargs):
quota_class=None, roles=None, **kwargs):
"""Initialize RequestContext.
:param read_deleted: 'no' indicates deleted records are hidden, 'yes'
@ -105,6 +105,7 @@ class ContextBase(oslo_ctx.RequestContext):
self.read_deleted = read_deleted
self.nova_micro_version = kwargs.get('nova_micro_version',
constants.NOVA_APIGW_MIN_VERSION)
self.roles = roles or []
def _get_read_deleted(self):
return self._read_deleted
@ -128,7 +129,8 @@ class ContextBase(oslo_ctx.RequestContext):
'tenant_name': self.tenant_name,
'tenant_id': self.tenant_id,
'project_id': self.project_id,
'quota_class': self.quota_class
'quota_class': self.quota_class,
'roles': self.roles,
})
return ctx_dict
@ -175,6 +177,7 @@ class Context(ContextBase):
def elevated(self, read_deleted=None, overwrite=False):
"""Return a version of this context with admin flag set."""
ctx = copy.copy(self)
ctx.roles = copy.deepcopy(self.roles)
ctx.is_admin = True
if read_deleted is not None:

View File

@ -123,6 +123,10 @@ class AdminRequired(NotAuthorized):
message = _("User does not have admin privileges")
class PolicyNotAuthorized(NotAuthorized):
message = _("Policy doesn't allow this operation to be performed.")
class InUse(TricircleException):
message = _("The resource is inuse")

186
tricircle/common/policy.py Normal file
View File

@ -0,0 +1,186 @@
# Copyright (c) Huawei Technologies Co., Ltd.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Policy Engine For Tricircle."""
# Policy controlled API access mainly for the Tricircle Admin API. Regarding
# to Nova API-GW and Cinder API-GW, the API access control should be done at
# bottom OpenStack as far as possible if the API request will be forwarded
# to bottom OpenStack directly for further processing; only these APIs which
# only can interact with database for example flavor and volume type, because
# these APIs processing will be terminated at the Tricircle layer, so policy
# control should be done by Nova API-GW or Cinder API-GW. No work is required
# to do in the Tricircle Neutron Plugin for Neutron API server is there,
# Neutron API server will be responsible for policy control.
from oslo_config import cfg
import oslo_log.log as logging
from oslo_policy import policy
from tricircle.common import exceptions as t_exec
from tricircle.common.i18n import _LE
_ENFORCER = None
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
default_policies = [
policy.RuleDefault('context_is_admin', 'role:admin'),
policy.RuleDefault('admin_api', 'is_admin:True',
description='cloud admin allowed'),
policy.RuleDefault('admin_or_owner',
'is_admin:True or project_id:%(project_id)s',
description='cloud admin or project owner allowed'),
policy.RuleDefault('default', 'rule:admin_or_owner'),
]
ADMIN_API_PODS_CREATE = 'admin_api:pods:create'
ADMIN_API_PODS_DELETE = 'admin_api:pods:delete'
ADMIN_API_PODS_SHOW = 'admin_api:pods:show'
ADMIN_API_PODS_LIST = 'admin_api:pods:list'
ADMIN_API_BINDINGS_CREATE = 'admin_api:bindings:create'
ADMIN_API_BINDINGS_DELETE = 'admin_api:bindings:delete'
ADMIN_API_BINDINGS_SHOW = 'admin_api:bindings:show'
ADMIN_API_BINDINGS_LIST = 'admin_api:bindings:list'
tricircle_admin_api_policies = [
policy.RuleDefault(ADMIN_API_PODS_CREATE,
'rule:admin_api',
description='Create pod'),
policy.RuleDefault(ADMIN_API_PODS_DELETE,
'rule:admin_api',
description='Delete pod'),
policy.RuleDefault(ADMIN_API_PODS_SHOW,
'rule:admin_api',
description='Show pod detail'),
policy.RuleDefault(ADMIN_API_PODS_LIST,
'rule:admin_api',
description='List pods'),
policy.RuleDefault(ADMIN_API_BINDINGS_CREATE,
'rule:admin_api',
description='Create pod binding'),
policy.RuleDefault(ADMIN_API_BINDINGS_DELETE,
'rule:admin_api',
description='Delete pod binding'),
policy.RuleDefault(ADMIN_API_BINDINGS_SHOW,
'rule:admin_api',
description='Show pod binding detail'),
policy.RuleDefault(ADMIN_API_BINDINGS_LIST,
'rule:admin_api',
description='List pod bindings'),
]
def list_policies():
policies = (default_policies +
tricircle_admin_api_policies)
return policies
# we can get a policy enforcer by this init.
# oslo policy supports change policy rule dynamically.
# at present, policy.enforce will reload the policy rules when it checks
# the policy file has been touched.
def init(policy_file=None, rules=None,
default_rule=None, use_conf=True, overwrite=True):
"""Init an Enforcer class.
:param policy_file: Custom policy file to use, if none is
specified, ``conf.policy_file`` will be
used.
:param rules: Default dictionary / Rules to use. It will be
considered just in the first instantiation. If
:meth:`load_rules` with ``force_reload=True``,
:meth:`clear` or :meth:`set_rules` with
``overwrite=True`` is called this will be overwritten.
:param default_rule: Default rule to use, conf.default_rule will
be used if none is specified.
:param use_conf: Whether to load rules from cache or config file.
:param overwrite: Whether to overwrite existing rules when reload rules
from config file.
"""
global _ENFORCER
if not _ENFORCER:
# http://docs.openstack.org/developer/oslo.policy/usage.html
_ENFORCER = policy.Enforcer(CONF,
policy_file=policy_file,
rules=rules,
default_rule=default_rule,
use_conf=use_conf,
overwrite=overwrite)
_ENFORCER.register_defaults(list_policies())
return _ENFORCER
def set_rules(rules, overwrite=True, use_conf=False):
"""Set rules based on the provided dict of rules.
:param rules: New rules to use. It should be an instance of dict.
:param overwrite: Whether to overwrite current rules or update them
with the new rules.
:param use_conf: Whether to reload rules from config file.
"""
init(use_conf=False)
_ENFORCER.set_rules(rules, overwrite, use_conf)
def populate_default_rules():
reset()
init(use_conf=False)
dict_rules = {}
for default in list_policies():
dict_rules[default.name] = default.check_str
rules = policy.Rules.from_dict(dict_rules)
set_rules(rules)
def reset():
global _ENFORCER
if _ENFORCER:
_ENFORCER.clear()
_ENFORCER = None
def enforce(context, rule=None, target=None, *args, **kwargs):
"""Check authorization of a rule against the target and credentials.
:param dict context: As much information about the user performing the
action as possible.
:param rule: The rule to evaluate.
:param dict target: As much information about the object being operated
on as possible.
:return: ``True`` if the policy allows the action.
``False`` if the policy does not allow the action.
"""
enforcer = init()
credentials = context.to_dict()
if target is None:
target = {'project_id': context.project_id,
'user_id': context.user_id}
exc = t_exec.PolicyNotAuthorized
try:
result = enforcer.enforce(rule, target, credentials,
do_raise=True, exc=exc, *args, **kwargs)
except t_exec.PolicyNotAuthorized as e:
result = False
LOG.exception(_LE("%(msg)s, %(rule)s, %(target)s"),
{'msg': str(e), 'rule': rule, 'target': target})
return result

View File

@ -24,6 +24,7 @@ import oslo_db.exception as db_exc
from tricircle.api import app
from tricircle.common import az_ag
from tricircle.common import context
from tricircle.common import policy
from tricircle.common import utils
from tricircle.db import core
from tricircle.tests import base
@ -33,8 +34,14 @@ OPT_GROUP_NAME = 'keystone_authtoken'
cfg.CONF.import_group(OPT_GROUP_NAME, "keystonemiddleware.auth_token")
def fake_is_admin(ctx):
return True
def fake_admin_context():
context_paras = {'is_admin': True}
return context.Context(**context_paras)
def fake_non_admin_context():
context_paras = {}
return context.Context(**context_paras)
class API_FunctionalTest(base.TestCase):
@ -44,6 +51,7 @@ class API_FunctionalTest(base.TestCase):
self.addCleanup(set_config, {}, overwrite=True)
cfg.CONF.clear()
cfg.CONF.register_opts(app.common_opts)
self.CONF = self.useFixture(fixture_config.Config()).conf
@ -56,6 +64,8 @@ class API_FunctionalTest(base.TestCase):
self.context = context.get_admin_context()
policy.populate_default_rules()
self.app = self._make_app()
def _make_app(self, enable_acl=False):
@ -78,13 +88,14 @@ class API_FunctionalTest(base.TestCase):
cfg.CONF.unregister_opts(app.common_opts)
pecan.set_config({}, overwrite=True)
core.ModelBase.metadata.drop_all(core.get_engine())
policy.reset()
class TestPodController(API_FunctionalTest):
"""Test version listing on root URI."""
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_no_input(self):
pods = [
# missing pod
@ -109,8 +120,8 @@ class TestPodController(API_FunctionalTest):
def fake_create_ag_az(context, ag_name, az_name):
raise db_exc.DBDuplicateEntry
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
@patch.object(az_ag, 'create_ag_az',
new=fake_create_ag_az)
def test_post_dup_db_exception(self):
@ -132,8 +143,8 @@ class TestPodController(API_FunctionalTest):
def fake_create_ag_az_exp(context, ag_name, az_name):
raise Exception
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
@patch.object(core, 'create_resource',
new=fake_create_ag_az_exp)
def test_post_exception(self):
@ -152,8 +163,8 @@ class TestPodController(API_FunctionalTest):
self._test_and_check(pods)
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_invalid_input(self):
pods = [
@ -230,8 +241,8 @@ class TestPodController(API_FunctionalTest):
self._test_and_check(pods)
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_duplicate_top_region(self):
pods = [
@ -261,8 +272,8 @@ class TestPodController(API_FunctionalTest):
self._test_and_check(pods)
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_duplicate_pod(self):
pods = [
@ -293,8 +304,8 @@ class TestPodController(API_FunctionalTest):
self._test_and_check(pods)
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_pod_duplicate_top_region(self):
pods = [
@ -336,8 +347,8 @@ class TestPodController(API_FunctionalTest):
self.assertEqual(response.status_int,
test_pod['expected_error'])
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_get_all(self):
pods = [
@ -387,12 +398,9 @@ class TestPodController(API_FunctionalTest):
self.assertIn('Pod1', response)
self.assertIn('Pod2', response)
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ')
def test_get_delete_one(self, mock_context):
mock_context.return_value = self.context
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_get_delete_one(self):
pods = [
@ -480,12 +488,42 @@ class TestPodController(API_FunctionalTest):
ag = az_ag.get_ag_by_name(self.context, ag_name)
self.assertIsNone(ag)
@patch.object(context, 'extract_context_from_environ',
new=fake_non_admin_context)
def test_non_admin_action(self):
pods = [
{
"pod":
{
"pod_name": "Pod1",
"pod_az_name": "az1",
"dc_name": "dc2",
"az_name": "AZ1"
},
"expected_error": 401,
},
]
self._test_and_check(pods)
response = self.app.get('/v1.0/pods/1234567890',
expect_errors=True)
self.assertEqual(response.status_int, 401)
response = self.app.get('/v1.0/pods',
expect_errors=True)
self.assertEqual(response.status_int, 401)
response = self.app.delete('/v1.0/pods/1234567890',
expect_errors=True)
self.assertEqual(response.status_int, 401)
class TestBindingController(API_FunctionalTest):
"""Test version listing on root URI."""
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_no_input(self):
pod_bindings = [
# missing pod_binding
@ -507,8 +545,8 @@ class TestBindingController(API_FunctionalTest):
self.assertEqual(response.status_int,
test_pod['expected_error'])
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_post_invalid_input(self):
pod_bindings = [
@ -552,8 +590,8 @@ class TestBindingController(API_FunctionalTest):
self._test_and_check(pod_bindings)
@patch.object(context, 'is_admin_context',
new=fake_is_admin)
@patch.object(context, 'extract_context_from_environ',
new=fake_admin_context)
def test_bindings(self):
pods = [
@ -665,3 +703,31 @@ class TestBindingController(API_FunctionalTest):
self.assertEqual(response.status_int,
test_pod['expected_error'])
@patch.object(context, 'extract_context_from_environ',
new=fake_non_admin_context)
def test_non_admin_action(self):
pod_bindings = [
{
"pod_binding":
{
"tenant_id": "dddddd",
"pod_id": "0ace0db2-ef33-43a6-a150-42703ffda643"
},
"expected_error": 401
},
]
self._test_and_check(pod_bindings)
response = self.app.get('/v1.0/bindings/1234567890',
expect_errors=True)
self.assertEqual(response.status_int, 401)
response = self.app.get('/v1.0/bindings',
expect_errors=True)
self.assertEqual(response.status_int, 401)
response = self.app.delete('/v1.0/bindings/1234567890',
expect_errors=True)
self.assertEqual(response.status_int, 401)

View File

@ -21,6 +21,7 @@ import pecan
from tricircle.api.controllers import pod
from tricircle.common import context
from tricircle.common import policy
from tricircle.common import utils
from tricircle.db import core
from tricircle.db import models
@ -32,6 +33,7 @@ class PodsControllerTest(unittest.TestCase):
core.ModelBase.metadata.create_all(core.get_engine())
self.controller = pod.PodsController()
self.context = context.get_admin_context()
policy.populate_default_rules()
@patch.object(context, 'extract_context_from_environ')
def test_post_top_pod(self, mock_context):
@ -133,3 +135,4 @@ class PodsControllerTest(unittest.TestCase):
def tearDown(self):
core.ModelBase.metadata.drop_all(core.get_engine())
policy.reset()

View File

@ -0,0 +1,123 @@
# Copyright 2016 Huawei Technologies Co., Ltd.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from oslo_policy import policy as oslo_policy
from tricircle.common import context
from tricircle.common import policy
class PolicyTestCase(unittest.TestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
rules = oslo_policy.Rules.from_dict({
"true": '@',
"example:allowed": '@',
"example:denied": "!",
"example:my_file": "role:admin or "
"project_id:%(project_id)s",
"example:early_and_fail": "! and @",
"example:early_or_success": "@ or !",
"example:lowercase_admin": "role:admin or role:sysadmin",
"example:uppercase_admin": "role:ADMIN or role:sysadmin",
})
policy.reset()
policy.init()
policy.set_rules(rules)
self.context = context.Context(user_id='fake',
tenant_id='fake',
roles=['member'])
self.target = None
def test_enforce_nonexistent_action_throws(self):
action = "example:non_exist"
result = policy.enforce(self.context, action, self.target)
self.assertEqual(result, False)
def test_enforce_bad_action_throws(self):
action = "example:denied"
result = policy.enforce(self.context, action, self.target)
self.assertEqual(result, False)
def test_enforce_good_action(self):
action = "example:allowed"
result = policy.enforce(self.context, action, self.target)
self.assertEqual(result, True)
def test_templatized_enforcement(self):
target_mine = {'project_id': 'fake'}
target_not_mine = {'project_id': 'another'}
action = "example:my_file"
result = policy.enforce(self.context, action, target_mine)
self.assertEqual(result, True)
result = policy.enforce(self.context, action, target_not_mine)
self.assertEqual(result, False)
def test_early_AND_enforcement(self):
action = "example:early_and_fail"
result = policy.enforce(self.context, action, self.target)
self.assertEqual(result, False)
def test_early_OR_enforcement(self):
action = "example:early_or_success"
result = policy.enforce(self.context, action, self.target)
self.assertEqual(result, True)
def test_ignore_case_role_check(self):
lowercase_action = "example:lowercase_admin"
uppercase_action = "example:uppercase_admin"
admin_context = context.Context(user_id='fake',
tenant_id='fake',
roles=['AdMiN'])
result = policy.enforce(admin_context, lowercase_action, self.target)
self.assertEqual(result, True)
result = policy.enforce(admin_context, uppercase_action, self.target)
self.assertEqual(result, True)
class DefaultPolicyTestCase(unittest.TestCase):
def setUp(self):
super(DefaultPolicyTestCase, self).setUp()
self.rules = oslo_policy.Rules.from_dict({
"default": '',
"example:exist": "!",
})
self._set_rules('default')
self.context = context.Context(user_id='fake',
tenant_id='fake')
def _set_rules(self, default_rule):
policy.reset()
policy.init(rules=self.rules, default_rule=default_rule,
use_conf=False)
def test_policy_called(self):
result = policy.enforce(self.context, "example:exist", {})
self.assertEqual(result, False)
def test_not_found_policy_calls_default(self):
result = policy.enforce(self.context, "example:noexist", {})
self.assertEqual(result, True)
def test_default_not_found(self):
self._set_rules("default_noexist")
result = policy.enforce(self.context, "example:noexist", {})
self.assertEqual(result, False)