Add test case for bay policy check
Two purpose: Make sure the policy enforce_wsgi decorator is called. Make sure the request will be disallowed by policy, when it does not satisfy the policy rule. Co-Authored-By: yuntongjin <yuntong.jin@intel.com> Change-Id: I4a4930d7951f416d3553c737c3523c37014d7101 Partial-implements: blueprint policy-enforce
This commit is contained in:
parent
d057f8c442
commit
6186a86bd8
|
@ -16,6 +16,7 @@ import os
|
|||
|
||||
import fixtures
|
||||
from oslo_config import cfg
|
||||
from oslo_policy import _parser
|
||||
from oslo_policy import opts as policy_opts
|
||||
|
||||
from magnum.common import policy as magnum_policy
|
||||
|
@ -39,3 +40,8 @@ class PolicyFixture(fixtures.Fixture):
|
|||
CONF.set_override('policy_file', self.policy_file_name, 'oslo_policy')
|
||||
magnum_policy._ENFORCER = None
|
||||
self.addCleanup(magnum_policy.init().clear)
|
||||
|
||||
def set_rules(self, rules):
|
||||
policy = magnum_policy._ENFORCER
|
||||
policy.set_rules({k: _parser.parse_rule(v)
|
||||
for k, v in rules.items()})
|
||||
|
|
|
@ -14,6 +14,7 @@ import datetime
|
|||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_policy import policy
|
||||
from oslo_utils import timeutils
|
||||
from six.moves.urllib import parse as urlparse
|
||||
from wsme import types as wtypes
|
||||
|
@ -585,3 +586,50 @@ class TestDelete(api_base.FunctionalTest):
|
|||
self.assertEqual(409, response.status_int)
|
||||
self.assertEqual('application/json', response.content_type)
|
||||
self.assertTrue(response.json['error_message'])
|
||||
|
||||
|
||||
class TestBayPolicyEnforcement(api_base.FunctionalTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestBayPolicyEnforcement, self).setUp()
|
||||
obj_utils.create_test_baymodel(self.context)
|
||||
|
||||
def _common_policy_check(self, rule, func, *arg, **kwarg):
|
||||
self.policy.set_rules({rule: "project:non_fake"})
|
||||
exc = self.assertRaises(policy.PolicyNotAuthorized,
|
||||
func, *arg, **kwarg)
|
||||
self.assertTrue(exc.message.startswith(rule))
|
||||
self.assertTrue(exc.message.endswith("disallowed by policy"))
|
||||
|
||||
def test_policy_disallow_get_all(self):
|
||||
self._common_policy_check(
|
||||
"bay:get_all", self.get_json, '/bays')
|
||||
|
||||
def test_policy_disallow_get_one(self):
|
||||
self._common_policy_check(
|
||||
"bay:get", self.get_json, '/bays/111-222-333')
|
||||
|
||||
def test_policy_disallow_update(self):
|
||||
self.bay = obj_utils.create_test_bay(self.context,
|
||||
name='bay_example_A',
|
||||
node_count=3)
|
||||
self._common_policy_check(
|
||||
"bay:update", self.patch_json, '/bays/%s' % self.bay.name,
|
||||
[{'path': '/name', 'value': "new_name", 'op': 'replace'}])
|
||||
|
||||
def test_policy_disallow_create(self):
|
||||
bdict = apiutils.bay_post_data(name='bay_example_A')
|
||||
self._common_policy_check(
|
||||
"bay:create", self.post_json, '/bays', bdict)
|
||||
|
||||
def _simulate_rpc_bay_delete(self, bay_uuid):
|
||||
bay = objects.Bay.get_by_uuid(self.context, bay_uuid)
|
||||
bay.destroy()
|
||||
|
||||
def test_policy_disallow_delete(self):
|
||||
p = mock.patch.object(rpcapi.API, 'bay_delete')
|
||||
self.mock_bay_delete = p.start()
|
||||
self.mock_bay_delete.side_effect = self._simulate_rpc_bay_delete
|
||||
self.addCleanup(p.stop)
|
||||
self._common_policy_check(
|
||||
"bay:delete", self.delete, '/bays/test_bay')
|
||||
|
|
Loading…
Reference in New Issue