44a101737f
Change-Id: Idc00ccbd254aca2c64c1bcb376fe7a3aab4d1cb4
135 lines
5.2 KiB
Python
135 lines
5.2 KiB
Python
# Copyright 2012 OpenStack, LLC
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os.path
|
|
|
|
from oslo.config import cfg
|
|
|
|
from heat.common import policy
|
|
from heat.common import exception
|
|
from heat.tests.common import HeatTestCase
|
|
from heat.tests.utils import dummy_context
|
|
|
|
policy_path = os.path.dirname(os.path.realpath(__file__)) + "/policy/"
|
|
|
|
|
|
class TestPolicyEnforcer(HeatTestCase):
|
|
cfn_actions = ("ListStacks", "CreateStack", "DescribeStacks",
|
|
"DeleteStack", "UpdateStack", "DescribeStackEvents",
|
|
"ValidateTemplate", "GetTemplate",
|
|
"EstimateTemplateCost", "DescribeStackResource",
|
|
"DescribeStackResources")
|
|
|
|
cw_actions = ("DeleteAlarms", "DescribeAlarmHistory", "DescribeAlarms",
|
|
"DescribeAlarmsForMetric", "DisableAlarmActions",
|
|
"EnableAlarmActions", "GetMetricStatistics", "ListMetrics",
|
|
"PutMetricAlarm", "PutMetricData", "SetAlarmState")
|
|
|
|
def setUp(self):
|
|
super(TestPolicyEnforcer, self).setUp()
|
|
opts = [
|
|
cfg.StrOpt('config_dir', default=policy_path),
|
|
cfg.StrOpt('config_file', default='foo'),
|
|
cfg.StrOpt('project', default='heat'),
|
|
]
|
|
cfg.CONF.register_opts(opts)
|
|
|
|
def test_policy_cfn_default(self):
|
|
enforcer = policy.Enforcer(scope='cloudformation')
|
|
|
|
ctx = dummy_context(roles=[])
|
|
for action in self.cfn_actions:
|
|
# Everything should be allowed
|
|
enforcer.enforce(ctx, action, {})
|
|
|
|
def test_policy_cfn_notallowed(self):
|
|
pf = policy_path + 'notallowed.json'
|
|
self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
|
|
policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
|
|
self.m.ReplayAll()
|
|
|
|
enforcer = policy.Enforcer(scope='cloudformation')
|
|
|
|
ctx = dummy_context(roles=[])
|
|
for action in self.cfn_actions:
|
|
# Everything should raise the default exception.Forbidden
|
|
self.assertRaises(exception.Forbidden, enforcer.enforce, ctx,
|
|
action, {})
|
|
self.m.VerifyAll()
|
|
|
|
def test_policy_cfn_deny_stack_user(self):
|
|
pf = policy_path + 'deny_stack_user.json'
|
|
self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
|
|
policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
|
|
self.m.ReplayAll()
|
|
|
|
enforcer = policy.Enforcer(scope='cloudformation')
|
|
|
|
ctx = dummy_context(roles=['heat_stack_user'])
|
|
for action in self.cfn_actions:
|
|
# Everything apart from DescribeStackResource should be Forbidden
|
|
if action == "DescribeStackResource":
|
|
enforcer.enforce(ctx, action, {})
|
|
else:
|
|
self.assertRaises(exception.Forbidden, enforcer.enforce, ctx,
|
|
action, {})
|
|
self.m.VerifyAll()
|
|
|
|
def test_policy_cfn_allow_non_stack_user(self):
|
|
pf = policy_path + 'deny_stack_user.json'
|
|
self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
|
|
policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
|
|
self.m.ReplayAll()
|
|
|
|
enforcer = policy.Enforcer(scope='cloudformation')
|
|
|
|
ctx = dummy_context(roles=['not_a_stack_user'])
|
|
for action in self.cfn_actions:
|
|
# Everything should be allowed
|
|
enforcer.enforce(ctx, action, {})
|
|
self.m.VerifyAll()
|
|
|
|
def test_policy_cw_deny_stack_user(self):
|
|
pf = policy_path + 'deny_stack_user.json'
|
|
self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
|
|
policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
|
|
self.m.ReplayAll()
|
|
|
|
enforcer = policy.Enforcer(scope='cloudwatch')
|
|
|
|
ctx = dummy_context(roles=['heat_stack_user'])
|
|
for action in self.cw_actions:
|
|
# Everything apart from PutMetricData should be Forbidden
|
|
if action == "PutMetricData":
|
|
enforcer.enforce(ctx, action, {})
|
|
else:
|
|
self.assertRaises(exception.Forbidden, enforcer.enforce, ctx,
|
|
action, {})
|
|
self.m.VerifyAll()
|
|
|
|
def test_policy_cw_allow_non_stack_user(self):
|
|
pf = policy_path + 'deny_stack_user.json'
|
|
self.m.StubOutWithMock(policy.Enforcer, '_find_policy_file')
|
|
policy.Enforcer._find_policy_file().MultipleTimes().AndReturn(pf)
|
|
self.m.ReplayAll()
|
|
|
|
enforcer = policy.Enforcer(scope='cloudwatch')
|
|
|
|
ctx = dummy_context(roles=['not_a_stack_user'])
|
|
for action in self.cw_actions:
|
|
# Everything should be allowed
|
|
enforcer.enforce(ctx, action, {})
|
|
self.m.VerifyAll()
|