diff --git a/neutron/policy.py b/neutron/policy.py index 562ee0363..092750add 100644 --- a/neutron/policy.py +++ b/neutron/policy.py @@ -17,6 +17,7 @@ Policy engine for neutron. Largely copied from nova. """ import itertools +import logging import re from oslo.config import cfg @@ -27,11 +28,11 @@ import neutron.common.utils as utils from neutron.openstack.common import excutils from neutron.openstack.common.gettextutils import _LE, _LI, _LW from neutron.openstack.common import importutils -from neutron.openstack.common import log as logging +from neutron.openstack.common import log from neutron.openstack.common import policy -LOG = logging.getLogger(__name__) +LOG = log.getLogger(__name__) _POLICY_PATH = None _POLICY_CACHE = {} ADMIN_CTX_POLICY = 'context_is_admin' @@ -151,6 +152,16 @@ def _build_subattr_match_rule(attr_name, attr, action, target): return policy.AndCheck(sub_attr_rules) +def _process_rules_list(rules, match_rule): + """Recursively walk a policy rule to extract a list of match entries.""" + if isinstance(match_rule, policy.RuleCheck): + rules.append(match_rule.match) + elif isinstance(match_rule, policy.AndCheck): + for rule in match_rule.rules: + _process_rules_list(rules, rule) + return rules + + def _build_match_rule(action, target): """Create the rule to match for a given action. @@ -188,6 +199,11 @@ def _build_match_rule(action, target): attribute_name, attribute, action, target)]) match_rule = policy.AndCheck([match_rule, attr_rule]) + # Check that the logger has a DEBUG log level + if (cfg.CONF.debug and LOG.logger.level == logging.NOTSET or + LOG.logger.level == logging.DEBUG): + rules = _process_rules_list([], match_rule) + LOG.debug("Enforcing rules: %s", rules) return match_rule diff --git a/neutron/tests/unit/test_policy.py b/neutron/tests/unit/test_policy.py index bc0074c90..70905b29a 100644 --- a/neutron/tests/unit/test_policy.py +++ b/neutron/tests/unit/test_policy.py @@ -551,3 +551,24 @@ class NeutronPolicyTestCase(base.BaseTestCase): {'extension:provider_network:set': 'rule:admin_only'}, dict((policy, 'rule:admin_only') for policy in expected_policies)) + + def test_process_rules(self): + action = "create_something" + # Construct RuleChecks for an action, attribute and subattribute + match_rule = common_policy.RuleCheck('rule', action) + attr_rule = common_policy.RuleCheck('rule', '%s:%s' % + (action, 'somethings')) + sub_attr_rules = [common_policy.RuleCheck('rule', '%s:%s:%s' % + (action, 'attr', + 'sub_attr_1'))] + # Build an AndCheck from the given RuleChecks + # Make the checks nested to better check the recursion + sub_attr_rules = common_policy.AndCheck(sub_attr_rules) + attr_rule = common_policy.AndCheck( + [attr_rule, sub_attr_rules]) + + match_rule = common_policy.AndCheck([match_rule, attr_rule]) + # Assert that the rules are correctly extracted from the match_rule + rules = policy._process_rules_list([], match_rule) + self.assertEqual(['create_something', 'create_something:somethings', + 'create_something:attr:sub_attr_1'], rules)