Added support for domain scoping. Enhancement on AccessInfo to support reading v2/v3 token information. Enhancement on ServiceCatalog for reading/filtering v2/v3 service catalog information. Change-Id: Ibb678b9933d3673e37d0fba857a152a3c5d2b4f4
		
			
				
	
	
		
			409 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			409 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import copy
 | 
						|
import json
 | 
						|
import requests
 | 
						|
 | 
						|
from keystoneclient import exceptions
 | 
						|
from keystoneclient.v3 import client
 | 
						|
 | 
						|
from tests.v3 import utils
 | 
						|
 | 
						|
 | 
						|
class AuthenticateAgainstKeystoneTests(utils.TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        super(AuthenticateAgainstKeystoneTests, self).setUp()
 | 
						|
        self.TEST_RESPONSE_DICT = {
 | 
						|
            "token": {
 | 
						|
                "methods": [
 | 
						|
                    "token",
 | 
						|
                    "password"
 | 
						|
                ],
 | 
						|
 | 
						|
                "expires_at": "2020-01-01T00:00:10.000123Z",
 | 
						|
                "project": {
 | 
						|
                    "domain": {
 | 
						|
                        "id": self.TEST_DOMAIN_ID,
 | 
						|
                        "name": self.TEST_DOMAIN_NAME
 | 
						|
                    },
 | 
						|
                    "id": self.TEST_TENANT_ID,
 | 
						|
                    "name": self.TEST_TENANT_NAME
 | 
						|
                },
 | 
						|
                "user": {
 | 
						|
                    "domain": {
 | 
						|
                        "id": self.TEST_DOMAIN_ID,
 | 
						|
                        "name": self.TEST_DOMAIN_NAME
 | 
						|
                    },
 | 
						|
                    "id": self.TEST_USER,
 | 
						|
                    "name": self.TEST_USER
 | 
						|
                },
 | 
						|
                "issued_at": "2013-05-29T16:55:21.468960Z",
 | 
						|
                "catalog": self.TEST_SERVICE_CATALOG
 | 
						|
            },
 | 
						|
        }
 | 
						|
        self.TEST_REQUEST_BODY = {
 | 
						|
            "auth": {
 | 
						|
                "identity": {
 | 
						|
                    "methods": ["password"],
 | 
						|
                    "password": {
 | 
						|
                        "user": {
 | 
						|
                            "domain": {
 | 
						|
                                "name": self.TEST_DOMAIN_NAME
 | 
						|
                            },
 | 
						|
                            "name": self.TEST_USER,
 | 
						|
                            "password": self.TEST_TOKEN
 | 
						|
                        }
 | 
						|
                    }
 | 
						|
                },
 | 
						|
                "scope": {
 | 
						|
                    "project": {
 | 
						|
                        "id": self.TEST_TENANT_ID
 | 
						|
                    },
 | 
						|
                }
 | 
						|
            }
 | 
						|
        }
 | 
						|
        self.TEST_REQUEST_HEADERS = {
 | 
						|
            'Content-Type': 'application/json',
 | 
						|
            'User-Agent': 'python-keystoneclient'
 | 
						|
        }
 | 
						|
        self.TEST_RESPONSE_HEADERS = {
 | 
						|
            'X-Subject-Token': self.TEST_TOKEN
 | 
						|
        }
 | 
						|
 | 
						|
    def test_authenticate_success(self):
 | 
						|
        TEST_TOKEN = "abcdef"
 | 
						|
        self.TEST_RESPONSE_HEADERS['X-Subject-Token'] = TEST_TOKEN
 | 
						|
        ident = self.TEST_REQUEST_BODY['auth']['identity']
 | 
						|
        del ident['password']['user']['domain']
 | 
						|
        del ident['password']['user']['name']
 | 
						|
        ident['password']['user']['id'] = self.TEST_USER
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_RESPONSE_DICT),
 | 
						|
            "headers": self.TEST_RESPONSE_HEADERS,
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(user_id=self.TEST_USER,
 | 
						|
                           password=self.TEST_TOKEN,
 | 
						|
                           project_id=self.TEST_TENANT_ID,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
        self.assertEqual(cs.auth_token, TEST_TOKEN)
 | 
						|
 | 
						|
    def test_authenticate_failure(self):
 | 
						|
        ident = self.TEST_REQUEST_BODY['auth']['identity']
 | 
						|
        ident['password']['user']['password'] = 'bad_key'
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 401,
 | 
						|
            "text": json.dumps({
 | 
						|
                "unauthorized": {
 | 
						|
                    "message": "Unauthorized",
 | 
						|
                    "code": "401",
 | 
						|
                },
 | 
						|
            }),
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        # Workaround for issue with assertRaises on python2.6
 | 
						|
        # where with assertRaises(exceptions.Unauthorized): doesn't work
 | 
						|
        # right
 | 
						|
        def client_create_wrapper():
 | 
						|
            client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
 | 
						|
                          username=self.TEST_USER,
 | 
						|
                          password="bad_key",
 | 
						|
                          project_id=self.TEST_TENANT_ID,
 | 
						|
                          auth_url=self.TEST_URL)
 | 
						|
 | 
						|
        self.assertRaises(exceptions.Unauthorized, client_create_wrapper)
 | 
						|
 | 
						|
    def test_auth_redirect(self):
 | 
						|
        correct_response = json.dumps(self.TEST_RESPONSE_DICT, sort_keys=True)
 | 
						|
        dict_responses = [
 | 
						|
            {
 | 
						|
                "headers": {
 | 
						|
                    'location': self.TEST_ADMIN_URL + "/auth/tokens",
 | 
						|
                    'X-Subject-Token': self.TEST_TOKEN,
 | 
						|
                },
 | 
						|
                "status_code": 305,
 | 
						|
                "text": "Use proxy",
 | 
						|
            },
 | 
						|
            {
 | 
						|
                "headers": {'X-Subject-Token': self.TEST_TOKEN},
 | 
						|
                "status_code": 200,
 | 
						|
                "text": correct_response,
 | 
						|
            },
 | 
						|
        ]
 | 
						|
        responses = [(utils.TestResponse(resp))
 | 
						|
                     for resp in dict_responses]
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn(responses[0])
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_ADMIN_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn(responses[1])
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
 | 
						|
                           username=self.TEST_USER,
 | 
						|
                           password=self.TEST_TOKEN,
 | 
						|
                           project_id=self.TEST_TENANT_ID,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
 | 
						|
        self.assertEqual(cs.management_url,
 | 
						|
                         self.TEST_RESPONSE_DICT["token"]["catalog"][3]
 | 
						|
                         ['endpoints'][2]["url"])
 | 
						|
        self.assertEqual(cs.auth_token,
 | 
						|
                         self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
 | 
						|
 | 
						|
    def test_authenticate_success_domain_username_password_scoped(self):
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_RESPONSE_DICT),
 | 
						|
            "headers": self.TEST_RESPONSE_HEADERS,
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
 | 
						|
                           username=self.TEST_USER,
 | 
						|
                           password=self.TEST_TOKEN,
 | 
						|
                           project_id=self.TEST_TENANT_ID,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
        self.assertEqual(cs.management_url,
 | 
						|
                         self.TEST_RESPONSE_DICT["token"]["catalog"][3]
 | 
						|
                         ['endpoints'][2]["url"])
 | 
						|
        self.assertEqual(cs.auth_token,
 | 
						|
                         self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
 | 
						|
 | 
						|
    def test_authenticate_success_userid_password_domain_scoped(self):
 | 
						|
        ident = self.TEST_REQUEST_BODY['auth']['identity']
 | 
						|
        del ident['password']['user']['domain']
 | 
						|
        del ident['password']['user']['name']
 | 
						|
        ident['password']['user']['id'] = self.TEST_USER
 | 
						|
 | 
						|
        scope = self.TEST_REQUEST_BODY['auth']['scope']
 | 
						|
        del scope['project']
 | 
						|
        scope['domain'] = {}
 | 
						|
        scope['domain']['id'] = self.TEST_DOMAIN_ID
 | 
						|
 | 
						|
        token = self.TEST_RESPONSE_DICT['token']
 | 
						|
        del token['project']
 | 
						|
        token['domain'] = {}
 | 
						|
        token['domain']['id'] = self.TEST_DOMAIN_ID
 | 
						|
        token['domain']['name'] = self.TEST_DOMAIN_NAME
 | 
						|
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_RESPONSE_DICT),
 | 
						|
            "headers": self.TEST_RESPONSE_HEADERS,
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(user_id=self.TEST_USER,
 | 
						|
                           password=self.TEST_TOKEN,
 | 
						|
                           domain_id=self.TEST_DOMAIN_ID,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
        self.assertEqual(cs.auth_domain_id,
 | 
						|
                         self.TEST_DOMAIN_ID)
 | 
						|
        self.assertEqual(cs.management_url,
 | 
						|
                         self.TEST_RESPONSE_DICT["token"]["catalog"][3]
 | 
						|
                         ['endpoints'][2]["url"])
 | 
						|
        self.assertEqual(cs.auth_token,
 | 
						|
                         self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
 | 
						|
 | 
						|
    def test_authenticate_success_userid_password_project_scoped(self):
 | 
						|
        ident = self.TEST_REQUEST_BODY['auth']['identity']
 | 
						|
        del ident['password']['user']['domain']
 | 
						|
        del ident['password']['user']['name']
 | 
						|
        ident['password']['user']['id'] = self.TEST_USER
 | 
						|
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_RESPONSE_DICT),
 | 
						|
            "headers": self.TEST_RESPONSE_HEADERS,
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(user_id=self.TEST_USER,
 | 
						|
                           password=self.TEST_TOKEN,
 | 
						|
                           project_id=self.TEST_TENANT_ID,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
        self.assertEqual(cs.auth_tenant_id,
 | 
						|
                         self.TEST_TENANT_ID)
 | 
						|
        self.assertEqual(cs.management_url,
 | 
						|
                         self.TEST_RESPONSE_DICT["token"]["catalog"][3]
 | 
						|
                         ['endpoints'][2]["url"])
 | 
						|
        self.assertEqual(cs.auth_token,
 | 
						|
                         self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
 | 
						|
 | 
						|
    def test_authenticate_success_password_unscoped(self):
 | 
						|
        del self.TEST_RESPONSE_DICT['token']['catalog']
 | 
						|
        del self.TEST_REQUEST_BODY['auth']['scope']
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_RESPONSE_DICT),
 | 
						|
            "headers": self.TEST_RESPONSE_HEADERS,
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(user_domain_name=self.TEST_DOMAIN_NAME,
 | 
						|
                           username=self.TEST_USER,
 | 
						|
                           password=self.TEST_TOKEN,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
        self.assertEqual(cs.auth_token,
 | 
						|
                         self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
 | 
						|
        self.assertFalse('catalog' in cs.service_catalog.catalog)
 | 
						|
 | 
						|
    def test_authenticate_success_token_domain_scoped(self):
 | 
						|
        ident = self.TEST_REQUEST_BODY['auth']['identity']
 | 
						|
        del ident['password']
 | 
						|
        ident['methods'] = ['token']
 | 
						|
        ident['token'] = {}
 | 
						|
        ident['token']['id'] = self.TEST_TOKEN
 | 
						|
 | 
						|
        scope = self.TEST_REQUEST_BODY['auth']['scope']
 | 
						|
        del scope['project']
 | 
						|
        scope['domain'] = {}
 | 
						|
        scope['domain']['id'] = self.TEST_DOMAIN_ID
 | 
						|
 | 
						|
        token = self.TEST_RESPONSE_DICT['token']
 | 
						|
        del token['project']
 | 
						|
        token['domain'] = {}
 | 
						|
        token['domain']['id'] = self.TEST_DOMAIN_ID
 | 
						|
        token['domain']['name'] = self.TEST_DOMAIN_NAME
 | 
						|
 | 
						|
        self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_RESPONSE_DICT),
 | 
						|
            "headers": self.TEST_RESPONSE_HEADERS,
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(token=self.TEST_TOKEN,
 | 
						|
                           domain_id=self.TEST_DOMAIN_ID,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
        self.assertEqual(cs.auth_domain_id,
 | 
						|
                         self.TEST_DOMAIN_ID)
 | 
						|
        self.assertEqual(cs.management_url,
 | 
						|
                         self.TEST_RESPONSE_DICT["token"]["catalog"][3]
 | 
						|
                         ['endpoints'][2]["url"])
 | 
						|
        self.assertEqual(cs.auth_token,
 | 
						|
                         self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
 | 
						|
 | 
						|
    def test_authenticate_success_token_project_scoped(self):
 | 
						|
        ident = self.TEST_REQUEST_BODY['auth']['identity']
 | 
						|
        del ident['password']
 | 
						|
        ident['methods'] = ['token']
 | 
						|
        ident['token'] = {}
 | 
						|
        ident['token']['id'] = self.TEST_TOKEN
 | 
						|
        self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_RESPONSE_DICT),
 | 
						|
            "headers": self.TEST_RESPONSE_HEADERS,
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(token=self.TEST_TOKEN,
 | 
						|
                           project_id=self.TEST_TENANT_ID,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
        self.assertEqual(cs.auth_tenant_id,
 | 
						|
                         self.TEST_TENANT_ID)
 | 
						|
        self.assertEqual(cs.management_url,
 | 
						|
                         self.TEST_RESPONSE_DICT["token"]["catalog"][3]
 | 
						|
                         ['endpoints'][2]["url"])
 | 
						|
        self.assertEqual(cs.auth_token,
 | 
						|
                         self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
 | 
						|
 | 
						|
    def test_authenticate_success_token_unscoped(self):
 | 
						|
        ident = self.TEST_REQUEST_BODY['auth']['identity']
 | 
						|
        del ident['password']
 | 
						|
        ident['methods'] = ['token']
 | 
						|
        ident['token'] = {}
 | 
						|
        ident['token']['id'] = self.TEST_TOKEN
 | 
						|
        del self.TEST_REQUEST_BODY['auth']['scope']
 | 
						|
        del self.TEST_RESPONSE_DICT['token']['catalog']
 | 
						|
        self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
 | 
						|
        resp = utils.TestResponse({
 | 
						|
            "status_code": 200,
 | 
						|
            "text": json.dumps(self.TEST_RESPONSE_DICT),
 | 
						|
            "headers": self.TEST_RESPONSE_HEADERS,
 | 
						|
        })
 | 
						|
 | 
						|
        kwargs = copy.copy(self.TEST_REQUEST_BASE)
 | 
						|
        kwargs['headers'] = self.TEST_REQUEST_HEADERS
 | 
						|
        kwargs['data'] = json.dumps(self.TEST_REQUEST_BODY, sort_keys=True)
 | 
						|
        requests.request('POST',
 | 
						|
                         self.TEST_URL + "/auth/tokens",
 | 
						|
                         **kwargs).AndReturn((resp))
 | 
						|
        self.mox.ReplayAll()
 | 
						|
 | 
						|
        cs = client.Client(token=self.TEST_TOKEN,
 | 
						|
                           auth_url=self.TEST_URL)
 | 
						|
        self.assertEqual(cs.auth_token,
 | 
						|
                         self.TEST_RESPONSE_HEADERS["X-Subject-Token"])
 | 
						|
        self.assertFalse('catalog' in cs.service_catalog.catalog)
 |