octavia/octavia/tests/unit/common/test_policy.py

241 lines
9.0 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Test of Policy Engine For Octavia."""
import tempfile
from oslo_config import fixture as oslo_fixture
from oslo_log import log as logging
from oslo_policy import policy as oslo_policy
import requests_mock
from octavia.common import config
from octavia.common import context
from octavia.common import exceptions
from octavia.common import policy
from octavia.tests.unit import base
CONF = config.cfg.CONF
LOG = logging.getLogger(__name__)
class PolicyFileTestCase(base.TestCase):
def setUp(self):
super(PolicyFileTestCase, self).setUp()
self.conf = self.useFixture(oslo_fixture.Config(CONF))
policy.reset()
self.target = {}
def test_modified_policy_reloads(self):
with tempfile.NamedTemporaryFile(mode='w', delete=True) as tmp:
self.conf.load_raw_values(
group='oslo_policy', policy_file=tmp.name)
tmp.write('{"example:test": ""}')
tmp.flush()
self.context = context.Context('fake', 'fake')
rule = oslo_policy.RuleDefault('example:test', "")
policy.get_enforcer().register_defaults([rule])
action = "example:test"
policy.get_enforcer().authorize(action, self.target, self.context)
tmp.seek(0)
tmp.write('{"example:test": "!"}')
tmp.flush()
policy.get_enforcer().load_rules(True)
self.assertRaises(exceptions.PolicyForbidden,
policy.get_enforcer().authorize,
action, self.target, self.context)
class PolicyTestCase(base.TestCase):
def setUp(self):
super(PolicyTestCase, self).setUp()
self.conf = self.useFixture(oslo_fixture.Config())
# diltram: this one must be removed after fixing issue in oslo.config
# https://bugs.launchpad.net/oslo.config/+bug/1645868
self.conf.conf.__call__(args=[])
policy.reset()
self.context = context.Context('fake', 'fake', roles=['member'])
self.rules = [
oslo_policy.RuleDefault("true", "@"),
oslo_policy.RuleDefault("example:allowed", "@"),
oslo_policy.RuleDefault("example:denied", "!"),
oslo_policy.RuleDefault("example:get_http",
"http://www.example.com"),
oslo_policy.RuleDefault("example:my_file",
"role:compute_admin or "
"project_id:%(project_id)s"),
oslo_policy.RuleDefault("example:early_and_fail", "! and @"),
oslo_policy.RuleDefault("example:early_or_success", "@ or !"),
oslo_policy.RuleDefault("example:lowercase_admin",
"role:admin or role:sysadmin"),
oslo_policy.RuleDefault("example:uppercase_admin",
"role:ADMIN or role:sysadmin"),
]
policy.get_enforcer().register_defaults(self.rules)
self.target = {}
def test_authorize_nonexistent_action_throws(self):
action = "example:noexist"
self.assertRaises(
oslo_policy.PolicyNotRegistered, policy.get_enforcer().authorize,
action, self.target, self.context)
def test_authorize_bad_action_throws(self):
action = "example:denied"
self.assertRaises(
exceptions.PolicyForbidden, policy.get_enforcer().authorize,
action, self.target, self.context)
def test_authorize_bad_action_noraise(self):
action = "example:denied"
result = policy.get_enforcer().authorize(action, self.target,
self.context, False)
self.assertFalse(result)
def test_authorize_good_action(self):
action = "example:allowed"
result = policy.get_enforcer().authorize(action, self.target,
self.context)
self.assertTrue(result)
@requests_mock.mock()
def test_authorize_http(self, req_mock):
req_mock.post('http://www.example.com/', text='False')
action = "example:get_http"
self.assertRaises(exceptions.PolicyForbidden,
policy.get_enforcer().authorize, action, self.target,
self.context)
def test_templatized_authorization(self):
target_mine = {'project_id': 'fake'}
target_not_mine = {'project_id': 'another'}
action = "example:my_file"
policy.get_enforcer().authorize(action, target_mine, self.context)
self.assertRaises(exceptions.PolicyForbidden,
policy.get_enforcer().authorize,
action, target_not_mine, self.context)
def test_early_AND_authorization(self):
action = "example:early_and_fail"
self.assertRaises(exceptions.PolicyForbidden,
policy.get_enforcer().authorize, action, self.target,
self.context)
def test_early_OR_authorization(self):
action = "example:early_or_success"
policy.get_enforcer().authorize(action, self.target, self.context)
def test_ignore_case_role_check(self):
lowercase_action = "example:lowercase_admin"
uppercase_action = "example:uppercase_admin"
# NOTE(dprince) we mix case in the Admin role here to ensure
# case is ignored
self.context = context.Context('admin', 'fake', roles=['AdMiN'])
policy.get_enforcer().authorize(lowercase_action, self.target,
self.context)
policy.get_enforcer().authorize(uppercase_action, self.target,
self.context)
def test_check_is_admin_fail(self):
self.assertFalse(policy.get_enforcer().check_is_admin(self.context))
def test_check_is_admin(self):
self.context = context.Context('admin', 'fake', roles=['AdMiN'])
self.assertTrue(policy.get_enforcer().check_is_admin(self.context))
def test_get_enforcer(self):
self.assertTrue(isinstance(policy.get_no_context_enforcer(),
oslo_policy.Enforcer))
class IsAdminCheckTestCase(base.TestCase):
def setUp(self):
super(IsAdminCheckTestCase, self).setUp()
self.conf = self.useFixture(oslo_fixture.Config())
# diltram: this one must be removed after fixing issue in oslo.config
# https://bugs.launchpad.net/oslo.config/+bug/1645868
self.conf.conf.__call__(args=[])
self.context = context.Context('fake', 'fake')
def test_init_true(self):
check = policy.IsAdminCheck('is_admin', 'True')
self.assertEqual(check.kind, 'is_admin')
self.assertEqual(check.match, 'True')
self.assertTrue(check.expected)
def test_init_false(self):
check = policy.IsAdminCheck('is_admin', 'nottrue')
self.assertEqual(check.kind, 'is_admin')
self.assertEqual(check.match, 'False')
self.assertFalse(check.expected)
def test_call_true(self):
check = policy.IsAdminCheck('is_admin', 'True')
self.assertTrue(
check('target', dict(is_admin=True), policy.get_enforcer()))
self.assertFalse(
check('target', dict(is_admin=False), policy.get_enforcer()))
def test_call_false(self):
check = policy.IsAdminCheck('is_admin', 'False')
self.assertFalse(
check('target', dict(is_admin=True), policy.get_enforcer()))
self.assertTrue(
check('target', dict(is_admin=False), policy.get_enforcer()))
class AdminRolePolicyTestCase(base.TestCase):
def setUp(self):
super(AdminRolePolicyTestCase, self).setUp()
self.conf = self.useFixture(oslo_fixture.Config())
# diltram: this one must be removed after fixing issue in oslo.config
# https://bugs.launchpad.net/oslo.config/+bug/1645868
self.conf.conf.__call__(args=[])
self.context = context.Context('fake', 'fake', roles=['member'])
self.actions = policy.get_enforcer().get_rules().keys()
self.target = {}
def test_authorize_admin_actions_with_nonadmin_context_throws(self):
"""Check if non-admin context passed to admin actions throws
Policy not authorized exception
"""
for action in self.actions:
self.assertRaises(
exceptions.PolicyForbidden, policy.get_enforcer().authorize,
action, self.target, self.context)