deb-heat/heat/common/policy.py
guohliu eca7fdae8c Port policy from oslo and refactor heat policy
This patch is to refactor Heat's usage to allow bringing the
policy code from oslo up to the current version to address
many fixed bugs which affect Heat.

    1. Refactor the heat wrapper policy.
    2. Add netaddr in requirement.txt.
    3. Add necessary unit test to cover the refactor part code.

Oslo commit: 3626b6db91ce1e897d1993fb4e13ac4237d04b7f
Fixes bug #1192551

Change-Id: Ia71d14b2cfde9110216ba38a515fee16d0f0c35e
2013-08-12 13:19:55 +08:00

99 lines
3.6 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Based on glance/api/policy.py
"""Policy Engine For Heat"""
from oslo.config import cfg
from heat.common import exception
import heat.openstack.common.log as logging
from heat.openstack.common import policy
logger = logging.getLogger(__name__)
CONF = cfg.CONF
DEFAULT_RULES = {
'default': policy.FalseCheck(),
}
class Enforcer(object):
"""Responsible for loading and enforcing rules."""
def __init__(self, scope='heat', exc=exception.Forbidden,
default_rule=DEFAULT_RULES['default']):
self.scope = scope
self.exc = exc
self.default_rule = default_rule
self.enforcer = policy.Enforcer(default_rule=default_rule)
def set_rules(self, rules, overwrite=True):
"""Create a new Rules object based on the provided dict of rules."""
rules_obj = policy.Rules(rules, self.default_rule)
self.enforcer.set_rules(rules_obj, overwrite)
def load_rules(self, force_reload=False):
"""Set the rules found in the json file on disk."""
self.enforcer.load_rules(force_reload)
def _check(self, context, rule, target, exc, *args, **kwargs):
"""Verifies that the action is valid on the target in this context.
:param context: Heat request context
:param rule: String representing the action to be checked
:param object: Dictionary representing the object of the action.
:raises: self.exc (defaults to heat.common.exception.Forbidden)
:returns: A non-False value if access is allowed.
"""
do_raise = False if not exc else True
credentials = {
'roles': context.roles,
'user': context.username,
'tenant': context.tenant,
}
return self.enforcer.enforce(rule, target, credentials,
do_raise, exc=exc, *args, **kwargs)
def enforce(self, context, action, target):
"""Verifies that the action is valid on the target in this context.
:param context: Heat request context
:param action: String representing the action to be checked
:param object: Dictionary representing the object of the action.
:raises: self.exc (defaults to heat.common.exception.Forbidden)
:returns: A non-False value if access is allowed.
"""
_action = '%s:%s' % (self.scope, action)
return self._check(context, _action, target, self.exc, action=action)
def check(self, context, action, target):
"""Verifies that the action is valid on the target in this context.
:param context: Heat request context
:param action: String representing the action to be checked
:param object: Dictionary representing the object of the action.
:returns: A non-False value if access is allowed.
"""
return self._check(context, action, target)
def clear(self):
self.enforcer.clear()