Merge "Refactor scope enforcement in the Enforcer class"

This commit is contained in:
Zuul 2021-10-25 14:02:35 +00:00 committed by Gerrit Code Review
commit 4ecbcf280a
1 changed files with 34 additions and 32 deletions

View File

@ -1063,40 +1063,9 @@ class Enforcer(object):
# as token_scope is not actually a hardcoded
# token.
# Check the scope of the operation against the possible scope
# attributes provided in `creds`.
if creds.get('system'):
token_scope = 'system' # nosec
elif creds.get('domain_id'):
token_scope = 'domain' # nosec
else:
# If the token isn't system-scoped or domain-scoped then
# we're dealing with a project-scoped token.
token_scope = 'project' # nosec
registered_rule = self.registered_rules.get(rule)
if registered_rule and registered_rule.scope_types:
if token_scope not in registered_rule.scope_types:
if self.conf.oslo_policy.enforce_scope:
raise InvalidScope(
rule, registered_rule.scope_types, token_scope
)
# If we don't raise an exception we should at least
# inform operators about policies that are being used
# with improper scopes.
msg = (
'Policy %(rule)s failed scope check. The token '
'used to make the request was %(token_scope)s '
'scoped but the policy requires %(policy_scope)s '
'scope. This behavior may change in the future '
'where using the intended scope is required' % {
'rule': rule,
'token_scope': token_scope,
'policy_scope': registered_rule.scope_types
}
)
warnings.warn(msg)
self._enforce_scope(creds, registered_rule)
result = _checks._check(
rule=to_check,
target=target,
@ -1114,6 +1083,39 @@ class Enforcer(object):
return result
def _enforce_scope(self, creds, rule):
# Check the scope of the operation against the possible scope
# attributes provided in `creds`.
if creds.get('system'):
token_scope = 'system' # nosec
elif creds.get('domain_id'):
token_scope = 'domain' # nosec
else:
# If the token isn't system-scoped or domain-scoped then
# we're dealing with a project-scoped token.
token_scope = 'project' # nosec
if token_scope not in rule.scope_types:
if self.conf.oslo_policy.enforce_scope:
raise InvalidScope(
rule, rule.scope_types, token_scope
)
# If we don't raise an exception we should at least
# inform operators about policies that are being used
# with improper scopes.
msg = (
'Policy %(rule)s failed scope check. The token '
'used to make the request was %(token_scope)s '
'scoped but the policy requires %(policy_scope)s '
'scope. This behavior may change in the future '
'where using the intended scope is required' % {
'rule': rule,
'token_scope': token_scope,
'policy_scope': rule.scope_types
}
)
warnings.warn(msg)
def _map_context_attributes_into_creds(self, context):
creds = {}
# port public context attributes into the creds dictionary so long as