keystone/keystone/models/token_model.py
Colleen Murphy 6c73690f77 Ensure OAuth1 authorized roles are respected
Without this patch, when an OAuth1 request token is authorized with a
limited set of roles, the roles for the access token are ignored when
the user uses it to request a keystone token. This means that user of an
access token can use it to escallate their role assignments beyond what
was authorized by the creator. This patch fixes the issue by ensuring
the token model accounts for an OAuth1-scoped token and correctly
populating the roles for it.

Change-Id: I02f9836fbd4d7e629653977fc341476cfd89859e
Closes-bug: #1873290
2020-05-01 15:48:58 -07:00

618 lines
22 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Unified in-memory token model."""
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_serialization import msgpackutils
from oslo_utils import reflection
from keystone.common import cache
from keystone.common import provider_api
from keystone import exception
from keystone.i18n import _
LOG = log.getLogger(__name__)
PROVIDERS = provider_api.ProviderAPIs
# supported token versions
V3 = 'v3.0'
VERSIONS = frozenset([V3])
# minimum access rules support
ACCESS_RULES_MIN_VERSION = 1.0
class TokenModel(object):
"""An object that represents a token emitted by keystone.
This is a queryable object that other parts of keystone can use to reason
about a user's authentication or authorization.
"""
def __init__(self):
self.user_id = None
self.__user = None
self.__user_domain = None
self.methods = None
self.audit_id = None
self.parent_audit_id = None
self.__expires_at = None
self.__issued_at = None
self.system = None
self.domain_id = None
self.__domain = None
self.project_id = None
self.__project = None
self.__project_domain = None
self.trust_id = None
self.__trust = None
self.__trustor = None
self.__trustee = None
self.__trust_project = None
self.__trust_project_domain = None
self.is_federated = False
self.identity_provider_id = None
self.protocol_id = None
self.federated_groups = None
self.access_token_id = None
self.__access_token = None
self.application_credential_id = None
self.__application_credential = None
def __repr__(self):
"""Return string representation of TokenModel."""
desc = ('<%(type)s (audit_id=%(audit_id)s, '
'audit_chain_id=%(audit_ids)s) at %(loc)s>')
self_cls_name = reflection.get_class_name(self, fully_qualified=False)
return desc % {'type': self_cls_name,
'audit_id': self.audit_id,
'audit_ids': self.audit_ids,
'loc': hex(id(self))}
@property
def audit_ids(self):
if self.parent_audit_id:
return [self.audit_id, self.parent_audit_id]
return [self.audit_id]
@property
def expires_at(self):
return self.__expires_at
@expires_at.setter
def expires_at(self, value):
if not isinstance(value, str):
raise ValueError('expires_at must be a string.')
self.__expires_at = value
@property
def issued_at(self):
return self.__issued_at
@issued_at.setter
def issued_at(self, value):
if not isinstance(value, str):
raise ValueError('issued_at must be a string.')
self.__issued_at = value
@property
def unscoped(self):
return not any(
[self.system_scoped, self.domain_scoped, self.project_scoped,
self.trust_scoped]
)
@property
def system_scoped(self):
return self.system is not None
@property
def user(self):
if not self.__user:
if self.user_id:
self.__user = PROVIDERS.identity_api.get_user(self.user_id)
return self.__user
@property
def user_domain(self):
if not self.__user_domain:
if self.user:
self.__user_domain = PROVIDERS.resource_api.get_domain(
self.user['domain_id']
)
return self.__user_domain
@property
def domain(self):
if not self.__domain:
if self.domain_id:
self.__domain = PROVIDERS.resource_api.get_domain(
self.domain_id
)
return self.__domain
@property
def domain_scoped(self):
return self.domain_id is not None
@property
def project(self):
if not self.__project:
if self.project_id:
self.__project = PROVIDERS.resource_api.get_project(
self.project_id
)
return self.__project
@property
def project_scoped(self):
return self.project_id is not None
@property
def project_domain(self):
if not self.__project_domain:
if self.project and self.project.get('domain_id'):
self.__project_domain = PROVIDERS.resource_api.get_domain(
self.project['domain_id']
)
return self.__project_domain
@property
def application_credential(self):
if not self.__application_credential:
if self.application_credential_id:
app_cred_api = PROVIDERS.application_credential_api
self.__application_credential = (
app_cred_api.get_application_credential(
self.application_credential_id
)
)
return self.__application_credential
@property
def oauth_scoped(self):
return self.access_token_id is not None
@property
def access_token(self):
if not self.__access_token:
if self.access_token_id:
self.__access_token = PROVIDERS.oauth_api.get_access_token(
self.access_token_id
)
return self.__access_token
@property
def trust_scoped(self):
return self.trust_id is not None
@property
def trust(self):
if not self.__trust:
if self.trust_id:
self.__trust = PROVIDERS.trust_api.get_trust(self.trust_id)
return self.__trust
@property
def trustor(self):
if not self.__trustor:
if self.trust:
self.__trustor = PROVIDERS.identity_api.get_user(
self.trust['trustor_user_id']
)
return self.__trustor
@property
def trustee(self):
if not self.__trustee:
if self.trust:
self.__trustee = PROVIDERS.identity_api.get_user(
self.trust['trustee_user_id']
)
return self.__trustee
@property
def trust_project(self):
if not self.__trust_project:
if self.trust:
self.__trust_project = PROVIDERS.resource_api.get_project(
self.trust['project_id']
)
return self.__trust_project
@property
def trust_project_domain(self):
if not self.__trust_project_domain:
if self.trust:
self.__trust_project_domain = (
PROVIDERS.resource_api.get_domain(
self.trust_project['domain_id']
)
)
return self.__trust_project_domain
def _get_system_roles(self):
roles = []
groups = PROVIDERS.identity_api.list_groups_for_user(self.user_id)
all_group_roles = []
assignments = []
for group in groups:
group_roles = (
PROVIDERS.assignment_api.list_system_grants_for_group(
group['id']
)
)
for role in group_roles:
all_group_roles.append(role)
assignment = {'group_id': group['id'], 'role_id': role['id']}
assignments.append(assignment)
user_roles = PROVIDERS.assignment_api.list_system_grants_for_user(
self.user_id
)
for role in user_roles:
assignment = {'user_id': self.user_id, 'role_id': role['id']}
assignments.append(assignment)
# NOTE(lbragstad): The whole reason we need to build out a list of
# "assignments" as opposed to just using the nice list of roles we
# already have is because the add_implied_roles() method operates on a
# list of assignment dictionaries (containing role_id,
# user_id/group_id, project_id, et cetera). That method could probably
# be fixed to be more clear by operating on actual roles instead of
# just assignments.
assignments = PROVIDERS.assignment_api.add_implied_roles(assignments)
for assignment in assignments:
role = PROVIDERS.role_api.get_role(assignment['role_id'])
roles.append({'id': role['id'], 'name': role['name']})
return roles
def _get_trust_roles(self):
roles = []
# If redelegated_trust_id is set, then we must traverse the trust_chain
# in order to determine who the original trustor is. We need to do this
# because the user ID of the original trustor helps us determine scope
# in the redelegated context.
if self.trust.get('redelegated_trust_id'):
trust_chain = PROVIDERS.trust_api.get_trust_pedigree(
self.trust_id
)
original_trustor_id = trust_chain[-1]['trustor_user_id']
else:
original_trustor_id = self.trustor['id']
trust_roles = [
{'role_id': role['id']} for role in self.trust['roles']
]
effective_trust_roles = (
PROVIDERS.assignment_api.add_implied_roles(trust_roles)
)
effective_trust_role_ids = (
set([r['role_id'] for r in effective_trust_roles])
)
current_effective_trustor_roles = (
PROVIDERS.assignment_api.get_roles_for_trustor_and_project(
original_trustor_id, self.trust.get('project_id')
)
)
for trust_role_id in effective_trust_role_ids:
if trust_role_id in current_effective_trustor_roles:
role = PROVIDERS.role_api.get_role(trust_role_id)
if role['domain_id'] is None:
roles.append(role)
else:
raise exception.Forbidden(
_('Trustee has no delegated roles.'))
return roles
def _get_oauth_roles(self):
roles = []
access_token_roles = self.access_token['role_ids']
access_token_roles = [
{'role_id': r} for r in jsonutils.loads(access_token_roles)]
effective_access_token_roles = (
PROVIDERS.assignment_api.add_implied_roles(access_token_roles)
)
user_roles = [r['id'] for r in self._get_project_roles()]
for role in effective_access_token_roles:
if role['role_id'] in user_roles:
role = PROVIDERS.role_api.get_role(role['role_id'])
roles.append({'id': role['id'], 'name': role['name']})
return roles
def _get_federated_roles(self):
roles = []
group_ids = [group['id'] for group in self.federated_groups]
federated_roles = PROVIDERS.assignment_api.get_roles_for_groups(
group_ids, self.project_id, self.domain_id
)
for group_id in group_ids:
group_roles = (
PROVIDERS.assignment_api.list_system_grants_for_group(
group_id
)
)
for role in group_roles:
federated_roles.append(role)
user_roles = PROVIDERS.assignment_api.list_system_grants_for_user(
self.user_id
)
for role in user_roles:
federated_roles.append(role)
if self.domain_id:
domain_roles = (
PROVIDERS.assignment_api.get_roles_for_user_and_domain(
self.user_id, self.domain_id
)
)
for role in domain_roles:
federated_roles.append(role)
if self.project_id:
project_roles = (
PROVIDERS.assignment_api.get_roles_for_user_and_project(
self.user_id, self.project_id
)
)
for role in project_roles:
federated_roles.append(role)
# NOTE(lbragstad): Remove duplicate role references from a list of
# roles. It is often suggested that this be done with:
#
# roles = [dict(t) for t in set([tuple(d.items()) for d in roles])]
#
# But that doesn't actually remove duplicates in all cases and
# causes transient failures because dictionaries are unordered
# objects. This means {'id': 1, 'foo': 'bar'} and {'foo': 'bar',
# 'id': 1} won't actually resolve to a single entity in the above
# logic since they are both considered unique. By using `in` we're
# performing a containment check, which also does a deep comparison
# of the objects, which is what we want.
for role in federated_roles:
if not isinstance(role, dict):
role = PROVIDERS.role_api.get_role(role)
if role not in roles:
roles.append(role)
return roles
def _get_domain_roles(self):
roles = []
domain_roles = (
PROVIDERS.assignment_api.get_roles_for_user_and_domain(
self.user_id, self.domain_id
)
)
for role_id in domain_roles:
role = PROVIDERS.role_api.get_role(role_id)
roles.append({'id': role['id'], 'name': role['name']})
return roles
def _get_project_roles(self):
roles = []
project_roles = (
PROVIDERS.assignment_api.get_roles_for_user_and_project(
self.user_id, self.project_id
)
)
for role_id in project_roles:
r = PROVIDERS.role_api.get_role(role_id)
roles.append({'id': r['id'], 'name': r['name']})
return roles
def _get_application_credential_roles(self):
roles = []
app_cred_roles = self.application_credential['roles']
assignment_list = PROVIDERS.assignment_api.list_role_assignments(
user_id=self.user_id,
project_id=self.project_id,
domain_id=self.domain_id,
effective=True)
user_roles = list(set([x['role_id'] for x in assignment_list]))
for role in app_cred_roles:
if role['id'] in user_roles:
roles.append({'id': role['id'], 'name': role['name']})
return roles
@property
def roles(self):
if self.system_scoped:
roles = self._get_system_roles()
elif self.trust_scoped:
roles = self._get_trust_roles()
elif self.oauth_scoped:
roles = self._get_oauth_roles()
elif self.is_federated and not self.unscoped:
roles = self._get_federated_roles()
elif self.domain_scoped:
roles = self._get_domain_roles()
elif self.application_credential_id and self.project_id:
roles = self._get_application_credential_roles()
elif self.project_scoped:
roles = self._get_project_roles()
else:
roles = []
return roles
def _validate_token_resources(self):
if self.project and not self.project.get('enabled'):
msg = ('Unable to validate token because project %(id)s is '
'disabled') % {'id': self.project_id}
tr_msg = _('Unable to validate token because project %(id)s is '
'disabled') % {'id': self.project_id}
LOG.warning(msg)
raise exception.ProjectNotFound(tr_msg)
if self.project and not self.project_domain.get('enabled'):
msg = ('Unable to validate token because domain %(id)s is '
'disabled') % {'id': self.project_domain['id']}
tr_msg = _('Unable to validate token because domain %(id)s is '
'disabled') % {'id': self.project_domain['id']}
LOG.warning(msg)
raise exception.DomainNotFound(tr_msg)
def _validate_token_user(self):
if self.trust_scoped:
if self.user_id != self.trustee['id']:
raise exception.Forbidden(_('User is not a trustee.'))
try:
PROVIDERS.resource_api.assert_domain_enabled(
self.trustor['domain_id']
)
except AssertionError:
raise exception.TokenNotFound(_('Trustor domain is disabled.'))
try:
PROVIDERS.resource_api.assert_domain_enabled(
self.trustee['domain_id']
)
except AssertionError:
raise exception.TokenNotFound(_('Trustee domain is disabled.'))
try:
PROVIDERS.identity_api.assert_user_enabled(
self.trustor['id']
)
except AssertionError:
raise exception.Forbidden(_('Trustor is disabled.'))
if not self.user_domain.get('enabled'):
msg = ('Unable to validate token because domain %(id)s is '
'disabled') % {'id': self.user_domain['id']}
tr_msg = _('Unable to validate token because domain %(id)s is '
'disabled') % {'id': self.user_domain['id']}
LOG.warning(msg)
raise exception.DomainNotFound(tr_msg)
def _validate_system_scope(self):
if self.system_scoped and not self.roles:
msg = ('User %(user_id)s has no access to the system'
) % {'user_id': self.user_id}
tr_msg = _('User %(user_id)s has no access to the system'
) % {'user_id': self.user_id}
LOG.debug(msg)
raise exception.Unauthorized(tr_msg)
def _validate_domain_scope(self):
if self.domain_scoped and not self.roles:
msg = (
'User %(user_id)s has no access to domain %(domain_id)s'
) % {'user_id': self.user_id, 'domain_id': self.domain_id}
tr_msg = _(
'User %(user_id)s has no access to domain %(domain_id)s'
) % {'user_id': self.user_id, 'domain_id': self.domain_id}
LOG.debug(msg)
raise exception.Unauthorized(tr_msg)
def _validate_project_scope(self):
if self.project_scoped and not self.roles:
msg = (
'User %(user_id)s has no access to project %(project_id)s'
) % {'user_id': self.user_id, 'project_id': self.project_id}
tr_msg = _(
'User %(user_id)s has no access to project %(project_id)s'
) % {'user_id': self.user_id, 'project_id': self.project_id}
LOG.debug(msg)
raise exception.Unauthorized(tr_msg)
def _validate_trust_scope(self):
trust_roles = []
if self.trust_id:
refs = [{'role_id': role['id']} for role in self.trust['roles']]
effective_trust_roles = PROVIDERS.assignment_api.add_implied_roles(
refs
)
effective_trust_role_ids = (
set([r['role_id'] for r in effective_trust_roles])
)
current_effective_trustor_roles = (
PROVIDERS.assignment_api.get_roles_for_trustor_and_project(
self.trustor['id'], self.trust.get('project_id')
)
)
# Go through each of the effective trust roles, making sure the
# trustor still has them, if any have been removed, then we
# will treat the trust as invalid
for trust_role_id in effective_trust_role_ids:
if trust_role_id in current_effective_trustor_roles:
role = PROVIDERS.role_api.get_role(trust_role_id)
if role['domain_id'] is None:
trust_roles.append(role)
else:
raise exception.Forbidden(
_('Trustee has no delegated roles.'))
def mint(self, token_id, issued_at):
"""Set the ``id`` and ``issued_at`` attributes of a token.
The process of building a token requires setting attributes about the
authentication and authorization context, like ``user_id`` and
``project_id`` for example. Once a Token object accurately represents
this information it should be "minted". Tokens are minted when they get
an ``id`` attribute and their creation time is recorded.
"""
self._validate_token_resources()
self._validate_token_user()
self._validate_system_scope()
self._validate_domain_scope()
self._validate_project_scope()
self._validate_trust_scope()
self.id = token_id
self.issued_at = issued_at
class _TokenModelHandler(object):
identity = 126
handles = (TokenModel,)
def __init__(self, registry):
self._registry = registry
def serialize(self, obj):
serialized = msgpackutils.dumps(obj.__dict__, registry=self._registry)
return serialized
def deserialize(self, data):
token_data = msgpackutils.loads(data, registry=self._registry)
try:
token_model = TokenModel()
for k, v in iter(token_data.items()):
setattr(token_model, k, v)
except Exception:
LOG.debug(
"Failed to deserialize TokenModel. Data is %s", token_data
)
raise exception.CacheDeserializationError(
TokenModel.__name__, token_data
)
return token_model
cache.register_model_handler(_TokenModelHandler)