From b3e065efaaaecf3a2fe83f45631567d63e19f47b Mon Sep 17 00:00:00 2001 From: Jamie Lennox Date: Thu, 14 Jul 2016 15:53:09 +1000 Subject: [PATCH] Remove get_user_id in trust controller The get_user_id function relied on the token model in auth_context - which amongst other things means it would fail with a tokenless auth context. This can be replaced with user_id from the request.context. Normally I try not to mix in cleanups, but whilst doing this and changing variables in list_trusts a small reshuffle and some whitespace just made the whole thing actually readable. Change-Id: I5b64210fc797961b422a0ab9a1b4cee078fe6a0f --- keystone/tests/unit/test_auth.py | 27 ++++++++-------- keystone/trust/controllers.py | 55 +++++++++++++------------------- 2 files changed, 37 insertions(+), 45 deletions(-) diff --git a/keystone/tests/unit/test_auth.py b/keystone/tests/unit/test_auth.py index 1fdc642876..3ac5511ce0 100644 --- a/keystone/tests/unit/test_auth.py +++ b/keystone/tests/unit/test_auth.py @@ -850,6 +850,20 @@ class AuthWithTrust(AuthTest): req = self.make_request(environ=environ) req.context_dict['token_id'] = token_id + # NOTE(jamielennox): This wouldn't be necessary if these were calls via + # the wsgi interface instead of directly creating a request to pass to + # a controller. + req.context.auth_token = token_id + req.context.user_id = auth_context.get('user_id') + req.context.project_id = auth_context.get('project_id') + req.context.domain_id = auth_context.get('domain_id') + req.context.domain_name = auth_context.get('domain_name') + req.context.user_domain_id = auth_context.get('user_domain_id') + req.context.roles = auth_context.get('roles') + req.context.trust_id = auth_context.get('trust_id') + req.context.trustor_id = auth_context.get('trustor_id') + req.context.trustee_id = auth_context.get('trustee_id') + return req def create_trust(self, trust_data, trustor_name, expires_at=None, @@ -970,19 +984,6 @@ class AuthWithTrust(AuthTest): for role in new_trust['roles']: self.assertIn(role['id'], role_ids) - def test_get_trust_without_auth_context(self): - """Verify a trust cannot be retrieved if auth context is missing.""" - unscoped_token = self.get_unscoped_token(self.trustor['name']) - request = self._create_auth_request( - unscoped_token['access']['token']['id']) - new_trust = self.trust_controller.create_trust( - request, trust=self.sample_data)['trust'] - # Delete the auth context before calling get_trust(). - del request.context_dict['environment'][authorization.AUTH_CONTEXT_ENV] - self.assertRaises(exception.Forbidden, - self.trust_controller.get_trust, request, - new_trust['id']) - def test_create_trust_no_impersonation(self): new_trust = self.create_trust(self.sample_data, self.trustor['name'], expires_at=None, impersonation=False) diff --git a/keystone/trust/controllers.py b/keystone/trust/controllers.py index 32ca2c60db..509cb5b3f4 100644 --- a/keystone/trust/controllers.py +++ b/keystone/trust/controllers.py @@ -48,17 +48,9 @@ class TrustV3(controller.V3Controller): path = '/OS-TRUST/' + cls.collection_name return super(TrustV3, cls).base_url(context, path=path) - def _get_user_id(self, context): - try: - token_ref = utils.get_token_ref(context) - except exception.Unauthorized: - return None - return token_ref.user_id - def get_trust(self, request, trust_id): - user_id = self._get_user_id(request.context_dict) trust = self.trust_api.get_trust(trust_id) - _trustor_trustee_only(trust, user_id) + _trustor_trustee_only(trust, request.context.user_id) self._fill_in_roles(request.context_dict, trust, self.role_api.list_roles()) return TrustV3.wrap_member(request.context_dict, trust) @@ -130,10 +122,8 @@ class TrustV3(controller.V3Controller): msg = _('At least one role should be specified.') raise exception.Forbidden(msg) - user_id = self._get_user_id(request.context_dict) - # the creating user must be the trustor - if user_id != trust.get('trustor_user_id'): + if request.context.user_id != trust.get('trustor_user_id'): msg = _("The authenticated user should match the trustor.") raise exception.Forbidden(msg) @@ -197,40 +187,43 @@ class TrustV3(controller.V3Controller): @controller.protected() def list_trusts(self, request): trusts = [] + trustor_user_id = request.params.get('trustor_user_id') + trustee_user_id = request.params.get('trustee_user_id') + if not request.params: self.assert_admin(request) trusts += self.trust_api.list_trusts() - if 'trustor_user_id' in request.params: - user_id = request.params['trustor_user_id'] - calling_user_id = self._get_user_id(request.context_dict) - if user_id != calling_user_id: + + if trustor_user_id: + if trustor_user_id != request.context.user_id: raise exception.Forbidden() - trusts += (self.trust_api. - list_trusts_for_trustor(user_id)) - if 'trustee_user_id' in request.params: - user_id = request.params['trustee_user_id'] - calling_user_id = self._get_user_id(request.context_dict) - if user_id != calling_user_id: + + trusts += self.trust_api.list_trusts_for_trustor(trustor_user_id) + + if trustee_user_id: + if trustee_user_id != request.context.user_id: raise exception.Forbidden() - trusts += self.trust_api.list_trusts_for_trustee(user_id) + + trusts += self.trust_api.list_trusts_for_trustee(trustee_user_id) + for trust in trusts: # get_trust returns roles, list_trusts does not # It seems in some circumstances, roles does not # exist in the query response, so check first if 'roles' in trust: del trust['roles'] + if trust.get('expires_at') is not None: - trust['expires_at'] = (utils.isotime - (trust['expires_at'], - subsecond=True)) + trust['expires_at'] = utils.isotime(trust['expires_at'], + subsecond=True) + return TrustV3.wrap_collection(request.context_dict, trusts) @controller.protected() def delete_trust(self, request, trust_id): trust = self.trust_api.get_trust(trust_id) - user_id = self._get_user_id(request.context_dict) - if (user_id != trust.get('trustor_user_id') and + if (request.context.user_id != trust.get('trustor_user_id') and not request.context.is_admin): raise exception.Forbidden() @@ -240,8 +233,7 @@ class TrustV3(controller.V3Controller): @controller.protected() def list_roles_for_trust(self, request, trust_id): trust = self.get_trust(request, trust_id)['trust'] - user_id = self._get_user_id(request.context_dict) - _trustor_trustee_only(trust, user_id) + _trustor_trustee_only(trust, request.context.user_id) return {'roles': trust['roles'], 'links': trust['roles_links']} @@ -249,8 +241,7 @@ class TrustV3(controller.V3Controller): def get_role_for_trust(self, request, trust_id, role_id): """Get a role that has been assigned to a trust.""" trust = self.trust_api.get_trust(trust_id) - user_id = self._get_user_id(request.context_dict) - _trustor_trustee_only(trust, user_id) + _trustor_trustee_only(trust, request.context.user_id) if not any(role['id'] == role_id for role in trust['roles']): raise exception.RoleNotFound(role_id=role_id)