gluon/gluon/policy.py

310 lines
12 KiB
Python

# Copyright (c) 2016 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(JinLi)This is a simpilied version of the original policy.py with the
# attribute-level authorization stripped off. The reason is because the
# policy.py.bak file is borrowed from Neutron and the attribute-level authZ
# code in that original file handles logic for Neutron attribute.
#
# This simplied version only handles object-level authorization. In the future,
# we may use some code/logic from original policy.py and adapt it to enable
# attibute-level authorization for Gluon.
import collections
import re
import six
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_policy import policy
from oslo_utils import excutils
from oslo_utils import importutils
from gluon import constants
from gluon import policies
from gluon._i18n import _
from gluon.api import attributes
from gluon.common import exception as g_exc
LOG = logging.getLogger(__name__)
_ENFORCER = None
ADMIN_CTX_POLICY = 'context_is_admin'
ADVSVC_CTX_POLICY = 'context_is_advsvc'
def reset():
global _ENFORCER
if _ENFORCER:
_ENFORCER.clear()
_ENFORCER = None
def init(conf=cfg.CONF, policy_file=None):
"""Init an instance of the Enforcer class."""
global _ENFORCER
if not _ENFORCER:
_ENFORCER = policy.Enforcer(conf, policy_file=policy_file)
_ENFORCER.load_rules(True)
register_rules(_ENFORCER)
def refresh(policy_file=None):
"""Reset policy and init a new instance of Enforcer."""
reset()
init(policy_file=policy_file)
def get_resource_and_action(action, pluralized=None):
"""Return resource and enforce_attr_based_check(boolean) per
resource and action extracted from api operation.
"""
data = action.split(':', 1)[0].split('_', 1)
resource = pluralized or ("%ss" % data[-1])
enforce_attr_based_check = data[0] not in ('get', 'delete')
return resource, enforce_attr_based_check
def set_rules(policies, overwrite=True):
"""Set rules based on the provided dict of rules.
:param policies: New policies to use. It should be an instance of dict.
:param overwrite: Whether to overwrite current rules or update them
with the new rules.
"""
LOG.debug("Loading policies from file: %s", _ENFORCER.policy_path)
init()
_ENFORCER.set_rules(policies, overwrite)
def _is_attribute_explicitly_set(attribute_name, resource, target, action):
"""Verify that an attribute is present and is explicitly set."""
if 'update' in action:
# In the case of update, the function should not pay attention to a
# default value of an attribute, but check whether it was explicitly
# marked as being updated instead.
return (attribute_name in target[constants.ATTRIBUTES_TO_UPDATE] and
target[attribute_name] is not constants.ATTR_NOT_SPECIFIED)
result = (attribute_name in target and
target[attribute_name] is not constants.ATTR_NOT_SPECIFIED)
if result and 'default' in resource[attribute_name]:
return target[attribute_name] != resource[attribute_name]['default']
return result
def _should_validate_sub_attributes(attribute, sub_attr):
"""Verify that sub-attributes are iterable and should be validated."""
validate = attribute.get('validate')
return (validate and isinstance(sub_attr, collections.Iterable) and
any([k.startswith('type:dict') and
v for (k, v) in validate.items()]))
def _build_subattr_match_rule(attr_name, attr, action, target):
"""Create the rule to match for sub-attribute policy checks."""
# TODO(salv-orlando): Instead of relying on validator info, introduce
# typing for API attributes
# Expect a dict as type descriptor
validate = attr['validate']
key = [k for k in validate.keys() if k.startswith('type:dict')]
if not key:
LOG.warning(("Unable to find data type descriptor "
"for attribute %s"),
attr_name)
return
data = validate[key[0]]
if not isinstance(data, dict):
LOG.debug("Attribute type descriptor is not a dict. Unable to "
"generate any sub-attr policy rule for %s.",
attr_name)
return
sub_attr_rules = [policy.RuleCheck('rule', '%s:%s:%s' %
(action, attr_name,
sub_attr_name)) for
sub_attr_name in data if sub_attr_name in
target[attr_name]]
return policy.AndCheck(sub_attr_rules)
def _process_rules_list(rules, match_rule):
"""Recursively walk a policy rule to extract a list of match entries."""
if isinstance(match_rule, policy.RuleCheck):
rules.append(match_rule.match)
elif isinstance(match_rule, policy.AndCheck):
for rule in match_rule.rules:
_process_rules_list(rules, rule)
return rules
def _build_match_rule(action, target, pluralized):
"""Create the rule to match for a given action.
The policy rule to be matched is built in the following way:
1) add entries for matching permission on objects
2) add an entry for the specific action (e.g.: create_network)
3) add an entry for attributes of a resource for which the action
is being executed (e.g.: create_network:shared)
4) add an entry for sub-attributes of a resource for which the
action is being executed
(e.g.: create_router:external_gateway_info:network_id)
"""
match_rule = policy.RuleCheck('rule', action)
resource, enforce_attr_based_check = get_resource_and_action(
action, pluralized)
if enforce_attr_based_check:
# assigning to variable with short name for improving readability
res_map = attributes.RESOURCE_ATTRIBUTE_MAP
if resource in res_map:
for attribute_name in res_map[resource]:
if _is_attribute_explicitly_set(attribute_name,
res_map[resource],
target, action):
attribute = res_map[resource][attribute_name]
if 'enforce_policy' in attribute:
attr_rule = policy.RuleCheck('rule', '%s:%s' %
(action, attribute_name))
# Build match entries for sub-attributes
if _should_validate_sub_attributes(
attribute, target[attribute_name]):
attr_rule = policy.AndCheck(
[attr_rule, _build_subattr_match_rule(
attribute_name, attribute,
action, target)])
match_rule = policy.AndCheck([match_rule, attr_rule])
return match_rule
def _prepare_check(context, action, target, pluralized):
"""Prepare rule, target, and credentials for the policy engine."""
# Compare with None to distinguish case in which target is {}
if target is None:
target = {}
match_rule = _build_match_rule(action, target, pluralized)
credentials = context.to_dict()
return match_rule, target, credentials
def log_rule_list(match_rule):
if LOG.isEnabledFor(logging.DEBUG):
rules = _process_rules_list([], match_rule)
LOG.debug("Enforcing rules: %s", rules)
def check(context, action, target, plugin=None, might_not_exist=False,
pluralized=None):
"""Verifies that the action is valid on the target in this context.
:param context: neutron context
:param action: string representing the action to be checked
this should be colon separated for clarity.
:param target: dictionary representing the object of the action
for object creation this should be a dictionary representing the
location of the object e.g. ``{'project_id': context.project_id}``
:param plugin: currently unused and deprecated.
Kept for backward compatibility.
:param might_not_exist: If True the policy check is skipped (and the
function returns True) if the specified policy does not exist.
Defaults to false.
:param pluralized: pluralized case of resource
e.g. firewall_policy -> pluralized = "firewall_policies"
:return: Returns True if access is permitted else False.
"""
# If we already know the context has admin rights do not perform an
# additional check and authorize the operation
if context.is_admin:
return True
if might_not_exist and not (_ENFORCER.rules and action in _ENFORCER.rules):
return True
match_rule, target, credentials = _prepare_check(context,
action,
target,
pluralized)
result = _ENFORCER.enforce(match_rule,
target,
credentials,
pluralized=pluralized)
# logging applied rules in case of failure
if not result:
log_rule_list(match_rule)
return result
def enforce(context, action, target, plugin=None, pluralized=None):
"""Verifies that the action is valid on the target in this context.
:param context: neutron context
:param action: string representing the action to be checked
this should be colon separated for clarity.
:param target: dictionary representing the object of the action
for object creation this should be a dictionary representing the
location of the object e.g. ``{'project_id': context.project_id}``
:param plugin: currently unused and deprecated.
Kept for backward compatibility.
:param pluralized: pluralized case of resource
e.g. firewall_policy -> pluralized = "firewall_policies"
:raises oslo_policy.policy.PolicyNotAuthorized:
if verification fails.
"""
# If we already know the context has admin rights do not perform an
# additional check and authorize the operation
if context.is_admin:
return True
rule, target, credentials = _prepare_check(context,
action,
target,
pluralized)
try:
result = _ENFORCER.enforce(rule, target, credentials, action=action,
do_raise=True)
except policy.PolicyNotAuthorized:
with excutils.save_and_reraise_exception():
log_rule_list(rule)
LOG.debug("Failed policy check for '%s'", action)
return result
def check_is_admin(context):
"""Verify context has admin rights according to policy settings."""
init()
# the target is user-self
credentials = context.to_dict()
if ADMIN_CTX_POLICY not in _ENFORCER.rules:
return False
return _ENFORCER.enforce(ADMIN_CTX_POLICY, credentials, credentials)
def check_is_advsvc(context):
"""Verify context has advsvc rights according to policy settings."""
init()
# the target is user-self
credentials = context.to_dict()
if ADVSVC_CTX_POLICY not in _ENFORCER.rules:
return False
return _ENFORCER.enforce(ADVSVC_CTX_POLICY, credentials, credentials)
def register_rules(enforcer):
enforcer.register_defaults(policies.list_rules())