# Copyright 2012 OpenStack LLC # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import json import uuid import nose.exc from keystone import auth from keystone.common import cms from keystone import config from keystone import exception import test_v3 CONF = config.CONF class TestAuthInfo(test_v3.RestfulTestCase): # TDOD(henry-nash) These tests are somewhat inefficient, since by # using the test_v3.RestfulTestCase class to gain access to the auth # building helper functions, they cause backend databases and fixtures # to be loaded unnecessarily. Separating out the helper functions from # this base class would improve efficiency (Bug #1134836) def setUp(self, load_sample_data=False): super(TestAuthInfo, self).setUp(load_sample_data=load_sample_data) def test_missing_auth_methods(self): auth_data = {'identity': {}} auth_data['identity']['token'] = {'id': uuid.uuid4().hex} self.assertRaises(exception.ValidationError, auth.controllers.AuthInfo, None, auth_data) def test_unsupported_auth_method(self): auth_data = {'methods': ['abc']} auth_data['abc'] = {'test': 'test'} auth_data = {'identity': auth_data} self.assertRaises(exception.AuthMethodNotSupported, auth.controllers.AuthInfo, None, auth_data) def test_missing_auth_method_data(self): auth_data = {'methods': ['password']} auth_data = {'identity': auth_data} self.assertRaises(exception.ValidationError, auth.controllers.AuthInfo, None, auth_data) def test_project_name_no_domain(self): auth_data = self.build_authentication_request( username='test', password='test', project_name='abc')['auth'] self.assertRaises(exception.ValidationError, auth.controllers.AuthInfo, None, auth_data) def test_both_project_and_domain_in_scope(self): auth_data = self.build_authentication_request( user_id='test', password='test', project_name='test', domain_name='test')['auth'] self.assertRaises(exception.ValidationError, auth.controllers.AuthInfo, None, auth_data) def test_get_method_data_invalid_method(self): auth_data = self.build_authentication_request( user_id='test', password='test')['auth'] context = None auth_info = auth.controllers.AuthInfo(context, auth_data) method_name = uuid.uuid4().hex self.assertRaises(exception.ValidationError, auth_info.get_method_data, method_name) class TestTokenAPIs(test_v3.RestfulTestCase): def setUp(self): super(TestTokenAPIs, self).setUp() auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain_id, password=self.user['password']) resp = self.post('/auth/tokens', body=auth_data) self.token_data = resp.result self.token = resp.headers.get('X-Subject-Token') self.headers = {'X-Subject-Token': resp.headers.get('X-Subject-Token')} def test_default_fixture_scope_token(self): self.assertIsNotNone(self.get_scoped_token()) def test_v3_pki_token_id(self): self.opt_in_group('signing', token_format='PKI') auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) resp = self.post('/auth/tokens', body=auth_data) token_data = resp.result token_id = resp.headers.get('X-Subject-Token') self.assertIn('expires_at', token_data['token']) token_signed = cms.cms_sign_token(json.dumps(token_data), CONF.signing.certfile, CONF.signing.keyfile) self.assertEqual(token_signed, token_id) def test_v3_v2_intermix_non_default_domain_failed(self): self.opt_in_group('signing', token_format='UUID') auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) resp = self.post('/auth/tokens', body=auth_data) token = resp.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, token='ADMIN', method='GET', expected_status=401) def test_v3_v2_intermix_domain_scoped_token_failed(self): self.opt_in_group('signing', token_format='UUID') # grant the domain role to user path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) resp = self.post('/auth/tokens', body=auth_data) token = resp.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, token='ADMIN', method='GET', expected_status=401) def test_v3_v2_intermix_non_default_project_failed(self): auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.project['id']) resp = self.post('/auth/tokens', body=auth_data) token = resp.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, token='ADMIN', method='GET', expected_status=401) def test_v3_v2_unscoped_uuid_token_intermix(self): self.opt_in_group('signing', token_format='UUID') auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password']) resp = self.post('/auth/tokens', body=auth_data) token_data = resp.result token = resp.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, token='ADMIN', method='GET') v2_token = resp.result self.assertEqual(v2_token['access']['user']['id'], token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token['access']['token']['expires'][:-1], token_data['token']['expires_at']) def test_v3_v2_unscoped_pki_token_intermix(self): self.opt_in_group('signing', token_format='PKI') auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password']) resp = self.post('/auth/tokens', body=auth_data) token_data = resp.result token = resp.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, token='ADMIN', method='GET') v2_token = resp.result self.assertEqual(v2_token['access']['user']['id'], token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token['access']['token']['expires'][:-1], token_data['token']['expires_at']) def test_v3_v2_uuid_token_intermix(self): # FIXME(gyee): PKI tokens are not interchangeable because token # data is baked into the token itself. self.opt_in_group('signing', token_format='UUID') auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project['id']) resp = self.post('/auth/tokens', body=auth_data) token_data = resp.result token = resp.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, token='ADMIN', method='GET') v2_token = resp.result self.assertEqual(v2_token['access']['user']['id'], token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token['access']['token']['expires'][:-1], token_data['token']['expires_at']) self.assertEqual(v2_token['access']['user']['roles'][0]['id'], token_data['token']['roles'][0]['id']) def test_v3_v2_pki_token_intermix(self): # FIXME(gyee): PKI tokens are not interchangeable because token # data is baked into the token itself. self.opt_in_group('signing', token_format='PKI') auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project['id']) resp = self.post('/auth/tokens', body=auth_data) token_data = resp.result token = resp.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, token='ADMIN', method='GET') v2_token = resp.result self.assertEqual(v2_token['access']['user']['id'], token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token['access']['token']['expires'][-1], token_data['token']['expires_at']) self.assertEqual(v2_token['access']['user']['roles'][0]['id'], token_data['token']['roles'][0]['id']) def test_v2_v3_unscoped_uuid_token_intermix(self): self.opt_in_group('signing', token_format='UUID') body = { 'auth': { 'passwordCredentials': { 'userId': self.user['id'], 'password': self.user['password'] } }} resp = self.admin_request(path='/v2.0/tokens', method='POST', body=body) v2_token_data = resp.result v2_token = v2_token_data['access']['token']['id'] headers = {'X-Subject-Token': v2_token} resp = self.get('/auth/tokens', headers=headers) token_data = resp.result self.assertEqual(v2_token_data['access']['user']['id'], token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token_data['access']['token']['expires'][-1], token_data['token']['expires_at']) def test_v2_v3_unscoped_pki_token_intermix(self): self.opt_in_group('signing', token_format='PKI') body = { 'auth': { 'passwordCredentials': { 'userId': self.user['id'], 'password': self.user['password'] } }} resp = self.admin_request(path='/v2.0/tokens', method='POST', body=body) v2_token_data = resp.result v2_token = v2_token_data['access']['token']['id'] headers = {'X-Subject-Token': v2_token} resp = self.get('/auth/tokens', headers=headers) token_data = resp.result self.assertEqual(v2_token_data['access']['user']['id'], token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token_data['access']['token']['expires'][-1], token_data['token']['expires_at']) def test_v2_v3_uuid_token_intermix(self): self.opt_in_group('signing', token_format='UUID') body = { 'auth': { 'passwordCredentials': { 'userId': self.user['id'], 'password': self.user['password'] }, 'tenantId': self.project['id'] }} resp = self.admin_request(path='/v2.0/tokens', method='POST', body=body) v2_token_data = resp.result v2_token = v2_token_data['access']['token']['id'] headers = {'X-Subject-Token': v2_token} resp = self.get('/auth/tokens', headers=headers) token_data = resp.result self.assertEqual(v2_token_data['access']['user']['id'], token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token_data['access']['token']['expires'][-1], token_data['token']['expires_at']) self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'], token_data['token']['roles'][0]['name']) def test_v2_v3_pki_token_intermix(self): self.opt_in_group('signing', token_format='PKI') body = { 'auth': { 'passwordCredentials': { 'userId': self.user['id'], 'password': self.user['password'] }, 'tenantId': self.project['id'] }} resp = self.admin_request(path='/v2.0/tokens', method='POST', body=body) v2_token_data = resp.result v2_token = v2_token_data['access']['token']['id'] headers = {'X-Subject-Token': v2_token} resp = self.get('/auth/tokens', headers=headers) token_data = resp.result self.assertEqual(v2_token_data['access']['user']['id'], token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token_data['access']['token']['expires'][-1], token_data['token']['expires_at']) self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'], token_data['token']['roles'][0]['name']) def test_rescoping_token(self): expires = self.token_data['token']['expires_at'] auth_data = self.build_authentication_request( token=self.token, project_id=self.project_id) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectScopedTokenResponse(r) # make sure expires stayed the same self.assertEqual(expires, r.result['token']['expires_at']) def test_check_token(self): self.head('/auth/tokens', headers=self.headers, expected_status=204) def test_validate_token(self): r = self.get('/auth/tokens', headers=self.headers) self.assertValidUnscopedTokenResponse(r) def test_revoke_token(self): headers = {'X-Subject-Token': self.get_scoped_token()} self.delete('/auth/tokens', headers=headers, expected_status=204) self.head('/auth/tokens', headers=headers, expected_status=401) # make sure we have a CRL r = self.get('/auth/tokens/OS-PKI/revoked') self.assertIn('signed', r.result) class TestTokenRevoking(test_v3.RestfulTestCase): """Test token revocation on the v3 Identity API.""" def setUp(self): """Setup for Token Revoking Test Cases. As well as the usual housekeeping, create a set of domains, users, groups, roles and projects for the subsequent tests: - Two domains: A & B - DomainA has user1, domainB has user2 and user3 - DomainA has group1 and group2, domainB has group3 - User1 has a role on domainA - Two projects: A & B, both in domainA - All users have a role on projectA - Two groups: 1 & 2 - User1 and user2 are members of group1 - User3 is a member of group2 """ super(TestTokenRevoking, self).setUp() # Start by creating a couple of domains and projects self.domainA = self.new_domain_ref() self.identity_api.create_domain(self.domainA['id'], self.domainA) self.domainB = self.new_domain_ref() self.identity_api.create_domain(self.domainB['id'], self.domainB) self.projectA = self.new_project_ref(domain_id=self.domainA['id']) self.identity_api.create_project(self.projectA['id'], self.projectA) self.projectB = self.new_project_ref(domain_id=self.domainA['id']) self.identity_api.create_project(self.projectB['id'], self.projectB) # Now create some users, one in domainA and two of them in domainB self.user1 = self.new_user_ref( domain_id=self.domainA['id']) self.user1['password'] = uuid.uuid4().hex self.identity_api.create_user(self.user1['id'], self.user1) self.user2 = self.new_user_ref( domain_id=self.domainB['id']) self.user2['password'] = uuid.uuid4().hex self.identity_api.create_user(self.user2['id'], self.user2) self.user3 = self.new_user_ref( domain_id=self.domainB['id']) self.user3['password'] = uuid.uuid4().hex self.identity_api.create_user(self.user3['id'], self.user3) self.group1 = self.new_group_ref( domain_id=self.domainA['id']) self.identity_api.create_group(self.group1['id'], self.group1) self.group2 = self.new_group_ref( domain_id=self.domainA['id']) self.identity_api.create_group(self.group2['id'], self.group2) self.group3 = self.new_group_ref( domain_id=self.domainB['id']) self.identity_api.create_group(self.group3['id'], self.group3) self.identity_api.add_user_to_group(self.user1['id'], self.group1['id']) self.identity_api.add_user_to_group(self.user2['id'], self.group1['id']) self.identity_api.add_user_to_group(self.user3['id'], self.group2['id']) self.role1 = self.new_role_ref() self.identity_api.create_role(self.role1['id'], self.role1) self.role2 = self.new_role_ref() self.identity_api.create_role(self.role2['id'], self.role2) self.identity_api.create_grant(self.role1['id'], user_id=self.user1['id'], domain_id=self.domainA['id']) self.identity_api.create_grant(self.role1['id'], user_id=self.user1['id'], project_id=self.projectA['id']) self.identity_api.create_grant(self.role1['id'], user_id=self.user2['id'], project_id=self.projectA['id']) self.identity_api.create_grant(self.role1['id'], user_id=self.user3['id'], project_id=self.projectA['id']) self.identity_api.create_grant(self.role1['id'], group_id=self.group1['id'], project_id=self.projectA['id']) def test_unscoped_token_remains_valid_after_role_assignment(self): r = self.post( '/auth/tokens', body=self.build_authentication_request( user_id=self.user1['id'], password=self.user1['password'])) unscoped_token = r.headers.get('X-Subject-Token') r = self.post( '/auth/tokens', body=self.build_authentication_request( token=unscoped_token, project_id=self.projectA['id'])) scoped_token = r.headers.get('X-Subject-Token') # confirm both tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, expected_status=204) self.head('/auth/tokens', headers={'X-Subject-Token': scoped_token}, expected_status=204) # create a new role role = self.new_role_ref() self.identity_api.create_role(role['id'], role) # assign a new role self.put( '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % { 'project_id': self.projectA['id'], 'user_id': self.user1['id'], 'role_id': role['id']}) # both tokens should remain valid self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, expected_status=204) self.head('/auth/tokens', headers={'X-Subject-Token': scoped_token}, expected_status=204) def test_deleting_user_grant_revokes_token(self): """Test deleting a user grant revokes token. Test Plan: - Get a token for user1, scoped to ProjectA - Delete the grant user1 has on ProjectA - Check token is no longer valid """ auth_data = self.build_authentication_request( user_id=self.user1['id'], password=self.user1['password'], project_id=self.projectA['id']) resp = self.post('/auth/tokens', body=auth_data) token = resp.headers.get('X-Subject-Token') # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, expected_status=204) # Delete the grant, which should invalidate the token grant_url = ( '/projects/%(project_id)s/users/%(user_id)s/' 'roles/%(role_id)s' % { 'project_id': self.projectA['id'], 'user_id': self.user1['id'], 'role_id': self.role1['id']}) self.delete(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token}, expected_status=401) def test_domain_user_role_assignment_maintains_token(self): """Test user-domain role assignment maintains existing token. Test Plan: - Get a token for user1, scoped to ProjectA - Create a grant for user1 on DomainB - Check token is still valid """ auth_data = self.build_authentication_request( user_id=self.user1['id'], password=self.user1['password'], project_id=self.projectA['id']) resp = self.post('/auth/tokens', body=auth_data) token = resp.headers.get('X-Subject-Token') # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, expected_status=204) # Assign a role, which should not affect the token grant_url = ( '/domains/%(domain_id)s/users/%(user_id)s/' 'roles/%(role_id)s' % { 'domain_id': self.domainB['id'], 'user_id': self.user1['id'], 'role_id': self.role1['id']}) self.put(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token}, expected_status=204) def test_deleting_group_grant_revokes_tokens(self): """Test deleting a group grant revokes tokens. Test Plan: - Get a token for user1, scoped to ProjectA - Get a token for user2, scoped to ProjectA - Get a token for user3, scoped to ProjectA - Delete the grant group1 has on ProjectA - Check tokens for user1 & user2 are no longer valid, since user1 and user2 are members of group1 - Check token for user3 is still valid """ auth_data = self.build_authentication_request( user_id=self.user1['id'], password=self.user1['password'], project_id=self.projectA['id']) resp = self.post('/auth/tokens', body=auth_data) token1 = resp.headers.get('X-Subject-Token') auth_data = self.build_authentication_request( user_id=self.user2['id'], password=self.user2['password'], project_id=self.projectA['id']) resp = self.post('/auth/tokens', body=auth_data) token2 = resp.headers.get('X-Subject-Token') auth_data = self.build_authentication_request( user_id=self.user3['id'], password=self.user3['password'], project_id=self.projectA['id']) resp = self.post('/auth/tokens', body=auth_data) token3 = resp.headers.get('X-Subject-Token') # Confirm tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': token1}, expected_status=204) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, expected_status=204) self.head('/auth/tokens', headers={'X-Subject-Token': token3}, expected_status=204) # Delete the group grant, which should invalidate the # tokens for user1 and user2 grant_url = ( '/projects/%(project_id)s/groups/%(group_id)s/' 'roles/%(role_id)s' % { 'project_id': self.projectA['id'], 'group_id': self.group1['id'], 'role_id': self.role1['id']}) self.delete(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token1}, expected_status=401) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, expected_status=401) # But user3's token should still be valid self.head('/auth/tokens', headers={'X-Subject-Token': token3}, expected_status=204) def test_domain_group_role_assignment_maintains_token(self): """Test domain-group role assignment maintains existing token. Test Plan: - Get a token for user1, scoped to ProjectA - Create a grant for group1 on DomainB - Check token is still longer valid """ auth_data = self.build_authentication_request( user_id=self.user1['id'], password=self.user1['password'], project_id=self.projectA['id']) resp = self.post('/auth/tokens', body=auth_data) token = resp.headers.get('X-Subject-Token') # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, expected_status=204) # Delete the grant, which should invalidate the token grant_url = ( '/domains/%(domain_id)s/groups/%(group_id)s/' 'roles/%(role_id)s' % { 'domain_id': self.domainB['id'], 'group_id': self.group1['id'], 'role_id': self.role1['id']}) self.put(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token}, expected_status=204) def test_group_membership_changes_revokes_token(self): """Test add/removal to/from group revokes token. Test Plan: - Get a token for user1, scoped to ProjectA - Get a token for user2, scoped to ProjectA - Remove user1 from group1 - Check token for user1 is no longer valid - Check token for user2 is still valid, even though user2 is also part of group1 - Add user2 to group2 - Check token for user2 is now no longer valid """ auth_data = self.build_authentication_request( user_id=self.user1['id'], password=self.user1['password'], project_id=self.projectA['id']) resp = self.post('/auth/tokens', body=auth_data) token1 = resp.headers.get('X-Subject-Token') auth_data = self.build_authentication_request( user_id=self.user2['id'], password=self.user2['password'], project_id=self.projectA['id']) resp = self.post('/auth/tokens', body=auth_data) token2 = resp.headers.get('X-Subject-Token') # Confirm tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': token1}, expected_status=204) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, expected_status=204) # Remove user1 from group1, which should invalidate # the token self.delete('/groups/%(group_id)s/users/%(user_id)s' % { 'group_id': self.group1['id'], 'user_id': self.user1['id']}) self.head('/auth/tokens', headers={'X-Subject-Token': token1}, expected_status=401) # But user2's token should still be valid self.head('/auth/tokens', headers={'X-Subject-Token': token2}, expected_status=204) # Adding user2 to a group should invalidate token self.put('/groups/%(group_id)s/users/%(user_id)s' % { 'group_id': self.group2['id'], 'user_id': self.user2['id']}) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, expected_status=401) class TestAuthJSON(test_v3.RestfulTestCase): content_type = 'json' def test_unscoped_token_with_user_id(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) r = self.post('/auth/tokens', body=auth_data) self.assertValidUnscopedTokenResponse(r) def test_unscoped_token_with_user_domain_id(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password']) r = self.post('/auth/tokens', body=auth_data) self.assertValidUnscopedTokenResponse(r) def test_unscoped_token_with_user_domain_name(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password']) r = self.post('/auth/tokens', body=auth_data) self.assertValidUnscopedTokenResponse(r) def test_project_id_scoped_token_with_user_id(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectScopedTokenResponse(r) def test_default_project_id_scoped_token_with_user_id(self): # create a second project to work with ref = self.new_project_ref(domain_id=self.domain_id) r = self.post('/projects', body={'project': ref}) project = self.assertValidProjectResponse(r, ref) # grant the user a role on the project self.put( '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % { 'user_id': self.user['id'], 'project_id': project['id'], 'role_id': self.role['id']}) # set the user's preferred project body = {'user': {'default_project_id': project['id']}} r = self.patch('/users/%(user_id)s' % { 'user_id': self.user['id']}, body=body) self.assertValidUserResponse(r) # attempt to authenticate without requesting a project auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectScopedTokenResponse(r) self.assertEqual(r.result['token']['project']['id'], project['id']) def test_default_project_id_scoped_token_with_user_id_401(self): # create a second project to work with ref = self.new_project_ref(domain_id=self.domain['id']) del ref['id'] r = self.post('/projects', body={'project': ref}) project = self.assertValidProjectResponse(r, ref) # set the user's preferred project without having authz on that project body = {'user': {'default_project_id': project['id']}} r = self.patch('/users/%(user_id)s' % { 'user_id': self.user['id']}, body=body) self.assertValidUserResponse(r) # attempt to authenticate without requesting a project # the default_project_id should be the assumed scope of the request, # and fail because the user doesn't have explicit authz on that scope auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_project_id_scoped_token_with_user_id_401(self): project_id = uuid.uuid4().hex project = self.new_project_ref(domain_id=self.domain_id) self.identity_api.create_project(project_id, project) auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], project_id=project['id']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_project_id_scoped_token_with_user_domain_id(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password'], project_id=self.project['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectScopedTokenResponse(r) def test_project_id_scoped_token_with_user_domain_name(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password'], project_id=self.project['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectScopedTokenResponse(r) def test_domain_id_scoped_token_with_user_id(self): path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_id_scoped_token_with_user_domain_id(self): path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password'], domain_id=self.domain['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_id_scoped_token_with_user_domain_name(self): path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password'], domain_id=self.domain['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_name_scoped_token_with_user_id(self): path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_name=self.domain['name']) r = self.post('/auth/tokens', body=auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_name_scoped_token_with_user_domain_id(self): path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password'], domain_name=self.domain['name']) r = self.post('/auth/tokens', body=auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_name_scoped_token_with_user_domain_name(self): path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password'], domain_name=self.domain['name']) r = self.post('/auth/tokens', body=auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_token_with_group_role(self): group_id = uuid.uuid4().hex group = self.new_group_ref( domain_id=self.domain_id) group['id'] = group_id self.identity_api.create_group(group_id, group) # add user to group self.identity_api.add_user_to_group(self.user['id'], group['id']) # grant the domain role to group path = '/domains/%s/groups/%s/roles/%s' % ( self.domain['id'], group['id'], self.role['id']) self.put(path=path) # now get a domain-scoped token auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_token_with_name(self): # grant the domain role to user path = '/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id']) self.put(path=path) # now get a domain-scoped token auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_name=self.domain['name']) r = self.post('/auth/tokens', body=auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_failed(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_auth_with_id(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) r = self.post('/auth/tokens', body=auth_data) self.assertValidUnscopedTokenResponse(r) token = r.headers.get('X-Subject-Token') # test token auth auth_data = self.build_authentication_request(token=token) r = self.post('/auth/tokens', body=auth_data) self.assertValidUnscopedTokenResponse(r) def test_invalid_user_id(self): auth_data = self.build_authentication_request( user_id=uuid.uuid4().hex, password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_user_name(self): auth_data = self.build_authentication_request( username=uuid.uuid4().hex, user_domain_id=self.domain['id'], password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_domain_id(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=uuid.uuid4().hex, password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_domain_name(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=uuid.uuid4().hex, password=self.user['password']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_invalid_password(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=uuid.uuid4().hex) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_remote_user(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'])['auth'] api = auth.controllers.Auth() context = {'REMOTE_USER': self.user['name']} auth_info = auth.controllers.AuthInfo(None, auth_data) auth_context = {'extras': {}, 'method_names': []} api.authenticate(context, auth_info, auth_context) self.assertEqual(auth_context['user_id'], self.user['id']) def test_remote_user_no_domain(self): auth_data = self.build_authentication_request( username=self.user['name'], password=self.user['password'])['auth'] api = auth.controllers.Auth() context = {'REMOTE_USER': self.user['name']} auth_info = auth.controllers.AuthInfo(None, auth_data) auth_context = {'extras': {}, 'method_names': []} self.assertRaises(exception.ValidationError, api.authenticate, context, auth_info, auth_context) class TestAuthXML(TestAuthJSON): content_type = 'xml' class TestTrustOptional(test_v3.RestfulTestCase): def setUp(self, *args, **kwargs): self.opt_in_group('trust', enabled=False) super(TestTrustOptional, self).setUp(*args, **kwargs) def test_trusts_404(self): self.get('/OS-TRUST/trusts', body={'trust': {}}, expected_status=404) self.post('/OS-TRUST/trusts', body={'trust': {}}, expected_status=404) def test_auth_with_scope_in_trust_403(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], trust_id=uuid.uuid4().hex) self.post('/auth/tokens', body=auth_data, expected_status=403) class TestTrustAuth(TestAuthInfo): def setUp(self): self.opt_in_group('trust', enabled=True) super(TestTrustAuth, self).setUp(load_sample_data=True) # create a trustee to delegate stuff to self.trustee_user_id = uuid.uuid4().hex self.trustee_user = self.new_user_ref(domain_id=self.domain_id) self.trustee_user['id'] = self.trustee_user_id self.identity_api.create_user(self.trustee_user_id, self.trustee_user) def test_create_trust_400(self): raise nose.exc.SkipTest('Blocked by bug 1133435') self.post('/OS-TRUST/trusts', body={'trust': {}}, expected_status=400) def test_create_unscoped_trust(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) self.assertValidTrustResponse(r, ref) def test_trust_crud(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_ids=[self.role_id]) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r, ref) r = self.get( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, expected_status=200) self.assertValidTrustResponse(r, ref) # validate roles on the trust r = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles' % { 'trust_id': trust['id']}, expected_status=200) roles = self.assertValidRoleListResponse(r, self.role) self.assertIn(self.role['id'], [x['id'] for x in roles]) self.head( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], 'role_id': self.role['id']}, expected_status=204) r = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], 'role_id': self.role['id']}, expected_status=200) self.assertValidRoleResponse(r, self.role) r = self.get('/OS-TRUST/trusts', expected_status=200) self.assertValidTrustListResponse(r, trust) # trusts are immutable self.patch( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, body={'trust': ref}, expected_status=404) self.delete( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, expected_status=204) self.get( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, expected_status=404) def test_create_trust_trustee_404(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=uuid.uuid4().hex) del ref['id'] self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) def test_create_trust_trustor_trustee_backwards(self): ref = self.new_trust_ref( trustor_user_id=self.trustee_user_id, trustee_user_id=self.user_id) del ref['id'] self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=403) def test_create_trust_project_404(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=uuid.uuid4().hex, role_ids=[self.role_id]) del ref['id'] self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) def test_create_trust_role_id_404(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_ids=[uuid.uuid4().hex]) del ref['id'] self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) def test_create_trust_role_name_404(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, role_names=[uuid.uuid4().hex]) del ref['id'] self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) def test_create_expired_trust(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, expires=dict(seconds=-1), role_ids=[self.role_id]) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r, ref) self.get('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}, expected_status=404) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.default_domain_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], trust_id=trust['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectTrustScopedTokenResponse( r, self.default_domain_user) token = r.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token='ADMIN', method='GET', expected_status=401) def test_v3_v2_intermix_trustor_not_in_default_domaini_failed(self): ref = self.new_trust_ref( trustor_user_id=self.default_domain_user_id, trustee_user_id=self.trustee_user_id, project_id=self.default_domain_project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project_id) r = self.post('/auth/tokens', body=auth_data) token = r.headers.get('X-Subject-Token') r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) trust = self.assertValidTrustResponse(r) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectTrustScopedTokenResponse( r, self.trustee_user) token = r.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token='ADMIN', method='GET', expected_status=401) def test_v3_v2_intermix_project_not_in_default_domaini_failed(self): # create a trustee in default domain to delegate stuff to trustee_user_id = uuid.uuid4().hex trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID) trustee_user['id'] = trustee_user_id self.identity_api.create_user(trustee_user_id, trustee_user) ref = self.new_trust_ref( trustor_user_id=self.default_domain_user_id, trustee_user_id=trustee_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project_id) r = self.post('/auth/tokens', body=auth_data) token = r.headers.get('X-Subject-Token') r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) trust = self.assertValidTrustResponse(r) auth_data = self.build_authentication_request( user_id=trustee_user['id'], password=trustee_user['password'], trust_id=trust['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectTrustScopedTokenResponse( r, trustee_user) token = r.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token='ADMIN', method='GET', expected_status=401) def test_v3_v2_intermix(self): # create a trustee in default domain to delegate stuff to trustee_user_id = uuid.uuid4().hex trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID) trustee_user['id'] = trustee_user_id self.identity_api.create_user(trustee_user_id, trustee_user) ref = self.new_trust_ref( trustor_user_id=self.default_domain_user_id, trustee_user_id=trustee_user_id, project_id=self.default_domain_project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] auth_data = self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project_id) r = self.post('/auth/tokens', body=auth_data) token = r.headers.get('X-Subject-Token') r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) trust = self.assertValidTrustResponse(r) auth_data = self.build_authentication_request( user_id=trustee_user['id'], password=trustee_user['password'], trust_id=trust['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectTrustScopedTokenResponse( r, trustee_user) token = r.headers.get('X-Subject-Token') # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token='ADMIN', method='GET', expected_status=200) def test_exercise_trust_scoped_token_without_impersonation(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectTrustScopedTokenResponse(r, self.trustee_user) self.assertEqual(r.result['token']['user']['id'], self.trustee_user['id']) self.assertEqual(r.result['token']['user']['name'], self.trustee_user['name']) self.assertEqual(r.result['token']['user']['domain']['id'], self.domain['id']) self.assertEqual(r.result['token']['user']['domain']['name'], self.domain['name']) self.assertEqual(r.result['token']['project']['id'], self.project['id']) self.assertEqual(r.result['token']['project']['name'], self.project['name']) def test_exercise_trust_scoped_token_with_impersonation(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=True, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectTrustScopedTokenResponse(r, self.user) self.assertEqual(r.result['token']['user']['id'], self.user['id']) self.assertEqual(r.result['token']['user']['name'], self.user['name']) self.assertEqual(r.result['token']['user']['domain']['id'], self.domain['id']) self.assertEqual(r.result['token']['user']['domain']['name'], self.domain['name']) self.assertEqual(r.result['token']['project']['id'], self.project['id']) self.assertEqual(r.result['token']['project']['name'], self.project['name']) def test_delete_trust(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r, ref) self.delete('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}, expected_status=204) self.get('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}, expected_status=404) self.get('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}, expected_status=404) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) self.post('/auth/tokens', body=auth_data, expected_status=401) def test_list_trusts(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=False, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] for i in range(0, 3): r = self.post('/OS-TRUST/trusts', body={'trust': ref}) self.assertValidTrustResponse(r, ref) r = self.get('/OS-TRUST/trusts?trustor_user_id=%s' % self.user_id, expected_status=200) trusts = r.result['trusts'] self.assertEqual(len(trusts), 3) r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' % self.user_id, expected_status=200) trusts = r.result['trusts'] self.assertEqual(len(trusts), 0) def test_change_password_invalidates_trust_tokens(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user_id, project_id=self.project_id, impersonation=True, expires=dict(minutes=1), role_ids=[self.role_id]) del ref['id'] r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) r = self.post('/auth/tokens', body=auth_data) self.assertValidProjectTrustScopedTokenResponse(r, self.user) trust_token = r.headers.get('X-Subject-Token') self.get('/OS-TRUST/trusts?trustor_user_id=%s' % self.user_id, expected_status=200, token=trust_token) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password']) self.assertValidUserResponse( self.patch('/users/%s' % self.trustee_user['id'], body={'user': {'password': uuid.uuid4().hex}}, auth=auth_data, expected_status=200)) self.get('/OS-TRUST/trusts?trustor_user_id=%s' % self.user_id, expected_status=401, token=trust_token)