From 550973b64a64a546ae0c0e94c49af05bd2d64175 Mon Sep 17 00:00:00 2001 From: Guang Yee Date: Tue, 19 Mar 2013 19:14:47 -0700 Subject: [PATCH] Prohibit V3 V2 token intermix for resource in non-default domain (bug 1157430) Change-Id: Ibe9019684b45651a9679311a3bacdad41b4116f5 --- keystone/token/controllers.py | 45 ++++++- tests/test_v3.py | 22 ++++ tests/test_v3_auth.py | 220 ++++++++++++++++++++++++++++++++-- 3 files changed, 277 insertions(+), 10 deletions(-) diff --git a/keystone/token/controllers.py b/keystone/token/controllers.py index 06a1fe643b..ca7ef346cf 100644 --- a/keystone/token/controllers.py +++ b/keystone/token/controllers.py @@ -473,6 +473,46 @@ class Auth(controller.V2Controller): _('Token does not belong to specified tenant.')) return data + def _assert_default_domain(self, context, token_ref): + """ Make sure we are operating on default domain only. """ + if token_ref.get('token_data'): + # this is a V3 token + msg = _('Non-default domain is not supported') + # user in a non-default is prohibited + if (token_ref['token_data']['token']['user']['domain']['id'] != + DEFAULT_DOMAIN_ID): + raise exception.Unauthorized(msg) + # domain scoping is prohibited + if token_ref['token_data']['token'].get('domain'): + raise exception.Unauthorized( + _('Domain scoped token is not supported')) + # project in non-default domain is prohibited + if token_ref['token_data']['token'].get('project'): + project = token_ref['token_data']['token']['project'] + project_domain_id = project['domain']['id'] + # scoped to project in non-default domain is prohibited + if project_domain_id != DEFAULT_DOMAIN_ID: + raise exception.Unauthorized(msg) + # if token is scoped to trust, both trustor and trustee must + # be in the default domain. Furthermore, the delegated project + # must also be in the default domain + metadata_ref = token_ref['metadata'] + if 'trust_id' in metadata_ref: + trust_ref = self.trust_api.get_trust(context, + metadata_ref['trust_id']) + trustee_user_ref = self.identity_api.get_user( + context, trust_ref['trustee_user_id']) + if trustee_user_ref['domain_id'] != DEFAULT_DOMAIN_ID: + raise exception.Unauthorized(msg) + trustor_user_ref = self.identity_api.get_user( + context, trust_ref['trustor_user_id']) + if trustor_user_ref['domain_id'] != DEFAULT_DOMAIN_ID: + raise exception.Unauthorized(msg) + project_ref = self.identity_api.get_project( + context, trust_ref['project_id']) + if project_ref['domain_id'] != DEFAULT_DOMAIN_ID: + raise exception.Unauthorized(msg) + # admin only def validate_token_head(self, context, token_id): """Check that a token is valid. @@ -483,7 +523,9 @@ class Auth(controller.V2Controller): """ belongs_to = context['query_string'].get('belongsTo') - assert self._get_token_ref(context, token_id, belongs_to) + token_ref = self._get_token_ref(context, token_id, belongs_to) + assert token_ref + self._assert_default_domain(context, token_ref) # admin only def validate_token(self, context, token_id): @@ -496,6 +538,7 @@ class Auth(controller.V2Controller): """ belongs_to = context['query_string'].get('belongsTo') token_ref = self._get_token_ref(context, token_id, belongs_to) + self._assert_default_domain(context, token_ref) # TODO(termie): optimize this call at some point and put it into the # the return for metadata diff --git a/tests/test_v3.py b/tests/test_v3.py index 1d9855e8f6..f8bdd8ec03 100644 --- a/tests/test_v3.py +++ b/tests/test_v3.py @@ -15,6 +15,7 @@ import test_content_types CONF = config.CONF +DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' @@ -56,6 +57,21 @@ class RestfulTestCase(test_content_types.RestfulTestCase): self.user['id'] = self.user_id self.identity_api.create_user(self.user_id, self.user) + self.default_domain_project_id = uuid.uuid4().hex + self.default_domain_project = self.new_project_ref( + domain_id=DEFAULT_DOMAIN_ID) + self.default_domain_project['id'] = self.default_domain_project_id + self.identity_api.create_project(self.default_domain_project_id, + self.default_domain_project) + + self.default_domain_user_id = uuid.uuid4().hex + self.default_domain_user = self.new_user_ref( + domain_id=DEFAULT_DOMAIN_ID, + project_id=self.default_domain_project_id) + self.default_domain_user['id'] = self.default_domain_user_id + self.identity_api.create_user(self.default_domain_user_id, + self.default_domain_user) + # create & grant policy.json's default role for admin_required self.role_id = uuid.uuid4().hex self.role = self.new_role_ref() @@ -64,6 +80,12 @@ class RestfulTestCase(test_content_types.RestfulTestCase): self.identity_api.create_role(self.role_id, self.role) self.identity_api.add_role_to_user_and_project( self.user_id, self.project_id, self.role_id) + self.identity_api.add_role_to_user_and_project( + self.default_domain_user_id, self.default_domain_project_id, + self.role_id) + self.identity_api.add_role_to_user_and_project( + self.default_domain_user_id, self.project_id, + self.role_id) self.public_server = self.serveapp('keystone', name='main') self.admin_server = self.serveapp('keystone', name='admin') diff --git a/tests/test_v3_auth.py b/tests/test_v3_auth.py index 092980e735..b2602bdc48 100644 --- a/tests/test_v3_auth.py +++ b/tests/test_v3_auth.py @@ -113,7 +113,7 @@ class TestTokenAPIs(test_v3.RestfulTestCase): CONF.signing.keyfile) self.assertEqual(token_signed, token_id) - def test_v3_v2_unscoped_uuid_token_intermix(self): + def test_v3_v2_intermix_non_default_domain_failed(self): self.opt_in_group('signing', token_format='UUID') auth_data = self.build_authentication_request( user_id=self.user['id'], @@ -122,6 +122,59 @@ class TestTokenAPIs(test_v3.RestfulTestCase): token_data = resp.body token = resp.getheader('X-Subject-Token') + # now validate the v3 token with v2 API + path = '/v2.0/tokens/%s' % (token) + resp = self.admin_request(path=path, + token='ADMIN', + method='GET', + expected_status=401) + + def test_v3_v2_intermix_domain_scoped_token_failed(self): + self.opt_in_group('signing', token_format='UUID') + # grant the domain role to user + path = '/domains/%s/users/%s/roles/%s' % ( + self.domain['id'], self.user['id'], self.role['id']) + self.put(path=path) + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + domain_id=self.domain['id']) + resp = self.post('/auth/tokens', body=auth_data) + token_data = resp.body + token = resp.getheader('X-Subject-Token') + + # now validate the v3 token with v2 API + path = '/v2.0/tokens/%s' % (token) + resp = self.admin_request(path=path, + token='ADMIN', + method='GET', + expected_status=401) + + def test_v3_v2_intermix_non_default_project_failed(self): + auth_data = self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.project['id']) + resp = self.post('/auth/tokens', body=auth_data) + token_data = resp.body + token = resp.getheader('X-Subject-Token') + + # now validate the v3 token with v2 API + path = '/v2.0/tokens/%s' % (token) + resp = self.admin_request(path=path, + token='ADMIN', + method='GET', + expected_status=401) + + def test_v3_v2_unscoped_uuid_token_intermix(self): + self.opt_in_group('signing', token_format='UUID') + auth_data = self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password']) + resp = self.post('/auth/tokens', body=auth_data) + token_data = resp.body + token = resp.getheader('X-Subject-Token') + # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, @@ -138,8 +191,8 @@ class TestTokenAPIs(test_v3.RestfulTestCase): def test_v3_v2_unscoped_pki_token_intermix(self): self.opt_in_group('signing', token_format='PKI') auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password']) + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password']) resp = self.post('/auth/tokens', body=auth_data) token_data = resp.body token = resp.getheader('X-Subject-Token') @@ -162,9 +215,9 @@ class TestTokenAPIs(test_v3.RestfulTestCase): # data is baked into the token itself. self.opt_in_group('signing', token_format='UUID') auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - project_id=self.project['id']) + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.default_domain_project['id']) resp = self.post('/auth/tokens', body=auth_data) token_data = resp.body token = resp.getheader('X-Subject-Token') @@ -189,9 +242,9 @@ class TestTokenAPIs(test_v3.RestfulTestCase): # data is baked into the token itself. self.opt_in_group('signing', token_format='PKI') auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - project_id=self.project['id']) + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.default_domain_project['id']) resp = self.post('/auth/tokens', body=auth_data) token_data = resp.body token = resp.getheader('X-Subject-Token') @@ -1091,6 +1144,155 @@ class TestTrustAuth(TestAuthInfo): trust_id=trust['id']) self.post('/auth/tokens', body=auth_data, expected_status=401) + def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self): + ref = self.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=self.default_domain_user_id, + project_id=self.project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + r = self.post('/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectTrustScopedTokenResponse( + r, self.default_domain_user) + + token = r.getheader('X-Subject-Token') + + # now validate the v3 token with v2 API + path = '/v2.0/tokens/%s' % (token) + resp = self.admin_request(path=path, + token='ADMIN', + method='GET', + expected_status=401) + + def test_v3_v2_intermix_trustor_not_in_default_domaini_failed(self): + ref = self.new_trust_ref( + trustor_user_id=self.default_domain_user_id, + trustee_user_id=self.trustee_user_id, + project_id=self.default_domain_project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + auth_data = self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.default_domain_project_id) + r = self.post('/auth/tokens', body=auth_data) + token = r.getheader('X-Subject-Token') + + r = self.post('/trusts', body={'trust': ref}, token=token) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectTrustScopedTokenResponse( + r, self.trustee_user) + token = r.getheader('X-Subject-Token') + + # now validate the v3 token with v2 API + path = '/v2.0/tokens/%s' % (token) + resp = self.admin_request(path=path, + token='ADMIN', + method='GET', + expected_status=401) + + def test_v3_v2_intermix_project_not_in_default_domaini_failed(self): + # create a trustee in default domain to delegate stuff to + trustee_user_id = uuid.uuid4().hex + trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID) + trustee_user['id'] = trustee_user_id + self.identity_api.create_user(trustee_user_id, trustee_user) + + ref = self.new_trust_ref( + trustor_user_id=self.default_domain_user_id, + trustee_user_id=trustee_user_id, + project_id=self.project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + + auth_data = self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.default_domain_project_id) + r = self.post('/auth/tokens', body=auth_data) + token = r.getheader('X-Subject-Token') + + r = self.post('/trusts', body={'trust': ref}, token=token) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=trustee_user['id'], + password=trustee_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectTrustScopedTokenResponse( + r, trustee_user) + token = r.getheader('X-Subject-Token') + + # now validate the v3 token with v2 API + path = '/v2.0/tokens/%s' % (token) + resp = self.admin_request(path=path, + token='ADMIN', + method='GET', + expected_status=401) + + def test_v3_v2_intermix(self): + # create a trustee in default domain to delegate stuff to + trustee_user_id = uuid.uuid4().hex + trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID) + trustee_user['id'] = trustee_user_id + self.identity_api.create_user(trustee_user_id, trustee_user) + + ref = self.new_trust_ref( + trustor_user_id=self.default_domain_user_id, + trustee_user_id=trustee_user_id, + project_id=self.default_domain_project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id]) + del ref['id'] + auth_data = self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.default_domain_project_id) + r = self.post('/auth/tokens', body=auth_data) + token = r.getheader('X-Subject-Token') + + r = self.post('/trusts', body={'trust': ref}, token=token) + trust = self.assertValidTrustResponse(r) + + auth_data = self.build_authentication_request( + user_id=trustee_user['id'], + password=trustee_user['password'], + trust_id=trust['id']) + r = self.post('/auth/tokens', body=auth_data) + self.assertValidProjectTrustScopedTokenResponse( + r, trustee_user) + token = r.getheader('X-Subject-Token') + + # now validate the v3 token with v2 API + path = '/v2.0/tokens/%s' % (token) + resp = self.admin_request(path=path, + token='ADMIN', + method='GET', + expected_status=200) + def test_exercise_trust_scoped_token_without_impersonation(self): ref = self.new_trust_ref( trustor_user_id=self.user_id,