Merge "Remove get_user_id in trust controller"

This commit is contained in:
Jenkins 2016-07-20 20:35:51 +00:00 committed by Gerrit Code Review
commit c70e69b317
2 changed files with 37 additions and 45 deletions

View File

@ -850,6 +850,20 @@ class AuthWithTrust(AuthTest):
req = self.make_request(environ=environ)
req.context_dict['token_id'] = token_id
# NOTE(jamielennox): This wouldn't be necessary if these were calls via
# the wsgi interface instead of directly creating a request to pass to
# a controller.
req.context.auth_token = token_id
req.context.user_id = auth_context.get('user_id')
req.context.project_id = auth_context.get('project_id')
req.context.domain_id = auth_context.get('domain_id')
req.context.domain_name = auth_context.get('domain_name')
req.context.user_domain_id = auth_context.get('user_domain_id')
req.context.roles = auth_context.get('roles')
req.context.trust_id = auth_context.get('trust_id')
req.context.trustor_id = auth_context.get('trustor_id')
req.context.trustee_id = auth_context.get('trustee_id')
return req
def create_trust(self, trust_data, trustor_name, expires_at=None,
@ -970,19 +984,6 @@ class AuthWithTrust(AuthTest):
for role in new_trust['roles']:
self.assertIn(role['id'], role_ids)
def test_get_trust_without_auth_context(self):
"""Verify a trust cannot be retrieved if auth context is missing."""
unscoped_token = self.get_unscoped_token(self.trustor['name'])
request = self._create_auth_request(
unscoped_token['access']['token']['id'])
new_trust = self.trust_controller.create_trust(
request, trust=self.sample_data)['trust']
# Delete the auth context before calling get_trust().
del request.context_dict['environment'][authorization.AUTH_CONTEXT_ENV]
self.assertRaises(exception.Forbidden,
self.trust_controller.get_trust, request,
new_trust['id'])
def test_create_trust_no_impersonation(self):
new_trust = self.create_trust(self.sample_data, self.trustor['name'],
expires_at=None, impersonation=False)

View File

@ -48,17 +48,9 @@ class TrustV3(controller.V3Controller):
path = '/OS-TRUST/' + cls.collection_name
return super(TrustV3, cls).base_url(context, path=path)
def _get_user_id(self, context):
try:
token_ref = utils.get_token_ref(context)
except exception.Unauthorized:
return None
return token_ref.user_id
def get_trust(self, request, trust_id):
user_id = self._get_user_id(request.context_dict)
trust = self.trust_api.get_trust(trust_id)
_trustor_trustee_only(trust, user_id)
_trustor_trustee_only(trust, request.context.user_id)
self._fill_in_roles(request.context_dict, trust,
self.role_api.list_roles())
return TrustV3.wrap_member(request.context_dict, trust)
@ -130,10 +122,8 @@ class TrustV3(controller.V3Controller):
msg = _('At least one role should be specified.')
raise exception.Forbidden(msg)
user_id = self._get_user_id(request.context_dict)
# the creating user must be the trustor
if user_id != trust.get('trustor_user_id'):
if request.context.user_id != trust.get('trustor_user_id'):
msg = _("The authenticated user should match the trustor.")
raise exception.Forbidden(msg)
@ -197,40 +187,43 @@ class TrustV3(controller.V3Controller):
@controller.protected()
def list_trusts(self, request):
trusts = []
trustor_user_id = request.params.get('trustor_user_id')
trustee_user_id = request.params.get('trustee_user_id')
if not request.params:
self.assert_admin(request)
trusts += self.trust_api.list_trusts()
if 'trustor_user_id' in request.params:
user_id = request.params['trustor_user_id']
calling_user_id = self._get_user_id(request.context_dict)
if user_id != calling_user_id:
if trustor_user_id:
if trustor_user_id != request.context.user_id:
raise exception.Forbidden()
trusts += (self.trust_api.
list_trusts_for_trustor(user_id))
if 'trustee_user_id' in request.params:
user_id = request.params['trustee_user_id']
calling_user_id = self._get_user_id(request.context_dict)
if user_id != calling_user_id:
trusts += self.trust_api.list_trusts_for_trustor(trustor_user_id)
if trustee_user_id:
if trustee_user_id != request.context.user_id:
raise exception.Forbidden()
trusts += self.trust_api.list_trusts_for_trustee(user_id)
trusts += self.trust_api.list_trusts_for_trustee(trustee_user_id)
for trust in trusts:
# get_trust returns roles, list_trusts does not
# It seems in some circumstances, roles does not
# exist in the query response, so check first
if 'roles' in trust:
del trust['roles']
if trust.get('expires_at') is not None:
trust['expires_at'] = (utils.isotime
(trust['expires_at'],
subsecond=True))
trust['expires_at'] = utils.isotime(trust['expires_at'],
subsecond=True)
return TrustV3.wrap_collection(request.context_dict, trusts)
@controller.protected()
def delete_trust(self, request, trust_id):
trust = self.trust_api.get_trust(trust_id)
user_id = self._get_user_id(request.context_dict)
if (user_id != trust.get('trustor_user_id') and
if (request.context.user_id != trust.get('trustor_user_id') and
not request.context.is_admin):
raise exception.Forbidden()
@ -240,8 +233,7 @@ class TrustV3(controller.V3Controller):
@controller.protected()
def list_roles_for_trust(self, request, trust_id):
trust = self.get_trust(request, trust_id)['trust']
user_id = self._get_user_id(request.context_dict)
_trustor_trustee_only(trust, user_id)
_trustor_trustee_only(trust, request.context.user_id)
return {'roles': trust['roles'],
'links': trust['roles_links']}
@ -249,8 +241,7 @@ class TrustV3(controller.V3Controller):
def get_role_for_trust(self, request, trust_id, role_id):
"""Get a role that has been assigned to a trust."""
trust = self.trust_api.get_trust(trust_id)
user_id = self._get_user_id(request.context_dict)
_trustor_trustee_only(trust, user_id)
_trustor_trustee_only(trust, request.context.user_id)
if not any(role['id'] == role_id for role in trust['roles']):
raise exception.RoleNotFound(role_id=role_id)