69fef14509
PEP-0274 introduced dict comprehensions to replace dict constructor with a sequence of length-2 sequences, these are benefits copied from [1]: The dictionary constructor approach has two distinct disadvantages from the proposed syntax though. First, it isn't as legible as a dict comprehension. Second, it forces the programmer to create an in-core list object first, which could be expensive. Nova dropped python 2.6 support, we can leverage this now. There is deep dive about PEP-0274[2] and basic tests about performance[3]. Note: This commit doesn't handle dict constructor with kwagrs. This commit also adds a hacking rule. [1]http://legacy.python.org/dev/peps/pep-0274/ [2]http://doughellmann.com/2012/11/12/the-performance-impact-of-using-dict-instead-of-in-cpython-2-7-2.html [3]http://paste.openstack.org/show/154798/ Change-Id: Ifb5cb05b9cc2b8758d5a8e34f7792470a73d7c40
232 lines
8.6 KiB
Python
232 lines
8.6 KiB
Python
# Copyright 2011 Piston Cloud Computing, Inc.
|
|
# All Rights Reserved.
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Test of Policy Engine For Nova."""
|
|
|
|
import os.path
|
|
import StringIO
|
|
|
|
import mock
|
|
import six.moves.urllib.request as urlrequest
|
|
|
|
from nova import context
|
|
from nova import exception
|
|
from nova.openstack.common import policy as common_policy
|
|
from nova import policy
|
|
from nova import test
|
|
from nova.tests.unit import policy_fixture
|
|
from nova import utils
|
|
|
|
|
|
class PolicyFileTestCase(test.NoDBTestCase):
|
|
def setUp(self):
|
|
super(PolicyFileTestCase, self).setUp()
|
|
self.context = context.RequestContext('fake', 'fake')
|
|
self.target = {}
|
|
|
|
def test_modified_policy_reloads(self):
|
|
with utils.tempdir() as tmpdir:
|
|
tmpfilename = os.path.join(tmpdir, 'policy')
|
|
|
|
self.flags(policy_file=tmpfilename)
|
|
|
|
# NOTE(uni): context construction invokes policy check to determin
|
|
# is_admin or not. As a side-effect, policy reset is needed here
|
|
# to flush existing policy cache.
|
|
policy.reset()
|
|
|
|
action = "example:test"
|
|
with open(tmpfilename, "w") as policyfile:
|
|
policyfile.write('{"example:test": ""}')
|
|
policy.enforce(self.context, action, self.target)
|
|
with open(tmpfilename, "w") as policyfile:
|
|
policyfile.write('{"example:test": "!"}')
|
|
policy._ENFORCER.load_rules(True)
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, action, self.target)
|
|
|
|
|
|
class PolicyTestCase(test.NoDBTestCase):
|
|
def setUp(self):
|
|
super(PolicyTestCase, self).setUp()
|
|
rules = {
|
|
"true": '@',
|
|
"example:allowed": '@',
|
|
"example:denied": "!",
|
|
"example:get_http": "http://www.example.com",
|
|
"example:my_file": "role:compute_admin or "
|
|
"project_id:%(project_id)s",
|
|
"example:early_and_fail": "! and @",
|
|
"example:early_or_success": "@ or !",
|
|
"example:lowercase_admin": "role:admin or role:sysadmin",
|
|
"example:uppercase_admin": "role:ADMIN or role:sysadmin",
|
|
}
|
|
policy.reset()
|
|
policy.init()
|
|
policy.set_rules({k: common_policy.parse_rule(v)
|
|
for k, v in rules.items()})
|
|
self.context = context.RequestContext('fake', 'fake', roles=['member'])
|
|
self.target = {}
|
|
|
|
def test_enforce_nonexistent_action_throws(self):
|
|
action = "example:noexist"
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, action, self.target)
|
|
|
|
def test_enforce_bad_action_throws(self):
|
|
action = "example:denied"
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, action, self.target)
|
|
|
|
def test_enforce_bad_action_noraise(self):
|
|
action = "example:denied"
|
|
result = policy.enforce(self.context, action, self.target, False)
|
|
self.assertEqual(result, False)
|
|
|
|
def test_enforce_good_action(self):
|
|
action = "example:allowed"
|
|
result = policy.enforce(self.context, action, self.target)
|
|
self.assertEqual(result, True)
|
|
|
|
@mock.patch.object(urlrequest, 'urlopen',
|
|
return_value=StringIO.StringIO("True"))
|
|
def test_enforce_http_true(self, mock_urlrequest):
|
|
action = "example:get_http"
|
|
target = {}
|
|
result = policy.enforce(self.context, action, target)
|
|
self.assertEqual(result, True)
|
|
|
|
@mock.patch.object(urlrequest, 'urlopen',
|
|
return_value=StringIO.StringIO("False"))
|
|
def test_enforce_http_false(self, mock_urlrequest):
|
|
action = "example:get_http"
|
|
target = {}
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, action, target)
|
|
|
|
def test_templatized_enforcement(self):
|
|
target_mine = {'project_id': 'fake'}
|
|
target_not_mine = {'project_id': 'another'}
|
|
action = "example:my_file"
|
|
policy.enforce(self.context, action, target_mine)
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, action, target_not_mine)
|
|
|
|
def test_early_AND_enforcement(self):
|
|
action = "example:early_and_fail"
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, action, self.target)
|
|
|
|
def test_early_OR_enforcement(self):
|
|
action = "example:early_or_success"
|
|
policy.enforce(self.context, action, self.target)
|
|
|
|
def test_ignore_case_role_check(self):
|
|
lowercase_action = "example:lowercase_admin"
|
|
uppercase_action = "example:uppercase_admin"
|
|
# NOTE(dprince) we mix case in the Admin role here to ensure
|
|
# case is ignored
|
|
admin_context = context.RequestContext('admin',
|
|
'fake',
|
|
roles=['AdMiN'])
|
|
policy.enforce(admin_context, lowercase_action, self.target)
|
|
policy.enforce(admin_context, uppercase_action, self.target)
|
|
|
|
|
|
class DefaultPolicyTestCase(test.NoDBTestCase):
|
|
|
|
def setUp(self):
|
|
super(DefaultPolicyTestCase, self).setUp()
|
|
|
|
self.rules = {
|
|
"default": '',
|
|
"example:exist": "!",
|
|
}
|
|
|
|
self._set_rules('default')
|
|
|
|
self.context = context.RequestContext('fake', 'fake')
|
|
|
|
def _set_rules(self, default_rule):
|
|
policy.reset()
|
|
rules = {k: common_policy.parse_rule(v)
|
|
for k, v in self.rules.items()}
|
|
policy.init(rules=rules, default_rule=default_rule, use_conf=False)
|
|
|
|
def test_policy_called(self):
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, "example:exist", {})
|
|
|
|
def test_not_found_policy_calls_default(self):
|
|
policy.enforce(self.context, "example:noexist", {})
|
|
|
|
def test_default_not_found(self):
|
|
self._set_rules("default_noexist")
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, "example:noexist", {})
|
|
|
|
|
|
class IsAdminCheckTestCase(test.NoDBTestCase):
|
|
def setUp(self):
|
|
super(IsAdminCheckTestCase, self).setUp()
|
|
policy.init()
|
|
|
|
def test_init_true(self):
|
|
check = policy.IsAdminCheck('is_admin', 'True')
|
|
|
|
self.assertEqual(check.kind, 'is_admin')
|
|
self.assertEqual(check.match, 'True')
|
|
self.assertEqual(check.expected, True)
|
|
|
|
def test_init_false(self):
|
|
check = policy.IsAdminCheck('is_admin', 'nottrue')
|
|
|
|
self.assertEqual(check.kind, 'is_admin')
|
|
self.assertEqual(check.match, 'False')
|
|
self.assertEqual(check.expected, False)
|
|
|
|
def test_call_true(self):
|
|
check = policy.IsAdminCheck('is_admin', 'True')
|
|
|
|
self.assertEqual(check('target', dict(is_admin=True),
|
|
policy._ENFORCER), True)
|
|
self.assertEqual(check('target', dict(is_admin=False),
|
|
policy._ENFORCER), False)
|
|
|
|
def test_call_false(self):
|
|
check = policy.IsAdminCheck('is_admin', 'False')
|
|
|
|
self.assertEqual(check('target', dict(is_admin=True),
|
|
policy._ENFORCER), False)
|
|
self.assertEqual(check('target', dict(is_admin=False),
|
|
policy._ENFORCER), True)
|
|
|
|
|
|
class AdminRolePolicyTestCase(test.NoDBTestCase):
|
|
def setUp(self):
|
|
super(AdminRolePolicyTestCase, self).setUp()
|
|
self.policy = self.useFixture(policy_fixture.RoleBasedPolicyFixture())
|
|
self.context = context.RequestContext('fake', 'fake', roles=['member'])
|
|
self.actions = policy.get_rules().keys()
|
|
self.target = {}
|
|
|
|
def test_enforce_admin_actions_with_nonadmin_context_throws(self):
|
|
"""Check if non-admin context passed to admin actions throws
|
|
Policy not authorized exception
|
|
"""
|
|
for action in self.actions:
|
|
self.assertRaises(exception.PolicyNotAuthorized, policy.enforce,
|
|
self.context, action, self.target)
|