Files
python-keystoneclient/tests/v2_0/test_auth.py
Joe Heck f1cc3cfc42 removing repeat attempt at authorization in client
blueprint solidify-python-api

* extended and updated documentation strings
* updated README.rst with latest options
* made debug a pass-through value, optionally set on client (instead of
  just being pulled from environment variable)
* adding AccessInfo object and associated tests
  (access.AccessInfo meant to be a cacheable object external to client
  and ultimately to replace service_catalog and it's existing functionality)
* extending authtoken to support lists of endpoints
* maintaining a single entity for client.management_url with first from
  list of possible endpoints
* create project_name and project_id synonyms to match tenant_name and
  tenant_id
* replacing authenticate call to a pure method, not overloading the
  resource/manager path that confuses base URL concepts.
* throw AuthorizationFailure if client attempts to access keystone
  resources before it has a management url
* special case listing tenant using auth_url for unscoped tokens authorized
  through client
* special case listing tokens.authenticate for Dashboard to allow unscoped
  tokens to hand back parity information to dashboard

Change-Id: I4bb3a1b6a5ce2c4b3fbcebeb59116286cac8b2e3
2012-11-09 00:02:41 +00:00

221 lines
8.4 KiB
Python

import httplib2
import json
from keystoneclient.v2_0 import client
from keystoneclient import exceptions
from tests import utils
def to_http_response(resp_dict):
"""
Utility function to convert a python dictionary
(e.g. {'status':status, 'body': body, 'headers':headers}
to an httplib2 response.
"""
resp = httplib2.Response(resp_dict)
for k, v in resp_dict['headers'].items():
resp[k] = v
return resp
class AuthenticateAgainstKeystoneTests(utils.TestCase):
def setUp(self):
super(AuthenticateAgainstKeystoneTests, self).setUp()
self.TEST_RESPONSE_DICT = {
"access": {
"token": {
"expires": "12345",
"id": self.TEST_TOKEN,
"tenant": {
"id": self.TEST_TENANT_ID
},
},
"user": {
"id": self.TEST_USER
},
"serviceCatalog": self.TEST_SERVICE_CATALOG,
},
}
self.TEST_REQUEST_BODY = {
"auth": {
"passwordCredentials": {
"username": self.TEST_USER,
"password": self.TEST_TOKEN,
},
"tenantId": self.TEST_TENANT_ID,
},
}
self.TEST_REQUEST_HEADERS = {
'Content-Type': 'application/json',
'User-Agent': 'python-keystoneclient',
}
def test_authenticate_failure(self):
_auth = 'auth'
_cred = 'passwordCredentials'
_pass = 'password'
self.TEST_REQUEST_BODY[_auth][_cred][_pass] = 'bad_key'
resp = httplib2.Response({
"status": 401,
"body": json.dumps({
"unauthorized": {
"message": "Unauthorized",
"code": "401",
},
}),
})
httplib2.Http.request(self.TEST_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
# Workaround for issue with assertRaises on python2.6
# where with assertRaises(exceptions.Unauthorized): doesn't work
# right
def client_create_wrapper():
client.Client(username=self.TEST_USER,
password="bad_key",
tenant_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertRaises(exceptions.Unauthorized, client_create_wrapper)
def test_auth_redirect(self):
correct_response = json.dumps(self.TEST_RESPONSE_DICT)
dict_responses = [
{
"headers": {
'location': self.TEST_ADMIN_URL + "/tokens",
},
"status": 305,
"body": "Use proxy",
},
{
"headers": {},
"status": 200,
"body": correct_response,
},
]
responses = [(to_http_response(resp), resp['body'])
for resp in dict_responses]
httplib2.Http.request(self.TEST_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn(responses[0])
httplib2.Http.request(self.TEST_ADMIN_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn(responses[1])
self.mox.ReplayAll()
cs = client.Client(username=self.TEST_USER,
password=self.TEST_TOKEN,
tenant_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3]
['endpoints'][0]["adminURL"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_DICT["access"]["token"]["id"])
def test_authenticate_success_password_scoped(self):
resp = httplib2.Response({
"status": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT),
})
httplib2.Http.request(self.TEST_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
cs = client.Client(username=self.TEST_USER,
password=self.TEST_TOKEN,
tenant_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3]
['endpoints'][0]["adminURL"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_DICT["access"]["token"]["id"])
def test_authenticate_success_password_unscoped(self):
del self.TEST_RESPONSE_DICT['access']['serviceCatalog']
del self.TEST_REQUEST_BODY['auth']['tenantId']
resp = httplib2.Response({
"status": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT),
})
httplib2.Http.request(self.TEST_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
cs = client.Client(username=self.TEST_USER,
password=self.TEST_TOKEN,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_DICT["access"]["token"]["id"])
self.assertFalse('serviceCatalog' in cs.service_catalog.catalog)
def test_authenticate_success_token_scoped(self):
del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
resp = httplib2.Response({
"status": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT),
})
httplib2.Http.request(self.TEST_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
cs = client.Client(token=self.TEST_TOKEN,
tenant_id=self.TEST_TENANT_ID,
auth_url=self.TEST_URL)
self.assertEqual(cs.management_url,
self.TEST_RESPONSE_DICT["access"]["serviceCatalog"][3]
['endpoints'][0]["adminURL"])
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_DICT["access"]["token"]["id"])
def test_authenticate_success_token_unscoped(self):
del self.TEST_REQUEST_BODY['auth']['passwordCredentials']
del self.TEST_REQUEST_BODY['auth']['tenantId']
del self.TEST_RESPONSE_DICT['access']['serviceCatalog']
self.TEST_REQUEST_BODY['auth']['token'] = {'id': self.TEST_TOKEN}
self.TEST_REQUEST_HEADERS['X-Auth-Token'] = self.TEST_TOKEN
resp = httplib2.Response({
"status": 200,
"body": json.dumps(self.TEST_RESPONSE_DICT),
})
httplib2.Http.request(self.TEST_URL + "/tokens",
'POST',
body=json.dumps(self.TEST_REQUEST_BODY),
headers=self.TEST_REQUEST_HEADERS) \
.AndReturn((resp, resp['body']))
self.mox.ReplayAll()
cs = client.Client(token=self.TEST_TOKEN,
auth_url=self.TEST_URL)
self.assertEqual(cs.auth_token,
self.TEST_RESPONSE_DICT["access"]["token"]["id"])
self.assertFalse('serviceCatalog' in cs.service_catalog.catalog)