
Add a new optional tenant_id keyword argument to the client classes cinderclient.client.HTTPClient and cinderclient.v1.client.Client to support authentication with tenant_id instead of projectid (which is acctually used as "tenantName" in the auth request body). Keystone can provide tokens without specifiying the tenant in any form, but a tenantName _or_ tenantId is required to generate the catalog (the keystone service code seems to prefer tenantName if both are specified). When using cinderclient programatically, it may be more convienent, depending on the context to authenticate with out specificying the tenant, or by tenant_id instead of tenant_name. Either way it's impractial to make the requirement in the client for projectid (tenantName) if the auth system has no such limitation. The new client signature is backwards compatible. There is no change in behavior for the shell client. Change-Id: I0c1bdbdabd9acc29133a48a798750323011f1f18
367 lines
14 KiB
Python
367 lines
14 KiB
Python
import httplib2
|
|
import json
|
|
import mock
|
|
|
|
from cinderclient.v1 import client
|
|
from cinderclient import exceptions
|
|
from tests import utils
|
|
|
|
|
|
def to_http_response(resp_dict):
|
|
"""Converts dict of response attributes to httplib response."""
|
|
resp = httplib2.Response(resp_dict)
|
|
for k, v in resp_dict['headers'].items():
|
|
resp[k] = v
|
|
return resp
|
|
|
|
|
|
class AuthenticateAgainstKeystoneTests(utils.TestCase):
|
|
def test_authenticate_success(self):
|
|
cs = client.Client("username", "password", "project_id",
|
|
"auth_url/v2.0", service_type='compute')
|
|
resp = {
|
|
"access": {
|
|
"token": {
|
|
"expires": "12345",
|
|
"id": "FAKE_ID",
|
|
},
|
|
"serviceCatalog": [
|
|
{
|
|
"type": "compute",
|
|
"endpoints": [
|
|
{
|
|
"region": "RegionOne",
|
|
"adminURL": "http://localhost:8774/v1",
|
|
"internalURL": "http://localhost:8774/v1",
|
|
"publicURL": "http://localhost:8774/v1/",
|
|
},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
}
|
|
auth_response = httplib2.Response({
|
|
"status": 200,
|
|
"body": json.dumps(resp), })
|
|
|
|
mock_request = mock.Mock(return_value=(auth_response,
|
|
json.dumps(resp)))
|
|
|
|
@mock.patch.object(httplib2.Http, "request", mock_request)
|
|
def test_auth_call():
|
|
cs.client.authenticate()
|
|
headers = {
|
|
'User-Agent': cs.client.USER_AGENT,
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json',
|
|
}
|
|
body = {
|
|
'auth': {
|
|
'passwordCredentials': {
|
|
'username': cs.client.user,
|
|
'password': cs.client.password,
|
|
},
|
|
'tenantName': cs.client.projectid,
|
|
},
|
|
}
|
|
|
|
token_url = cs.client.auth_url + "/tokens"
|
|
mock_request.assert_called_with(token_url, "POST",
|
|
headers=headers,
|
|
body=json.dumps(body))
|
|
|
|
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
|
|
public_url = endpoints[0]["publicURL"].rstrip('/')
|
|
self.assertEqual(cs.client.management_url, public_url)
|
|
token_id = resp["access"]["token"]["id"]
|
|
self.assertEqual(cs.client.auth_token, token_id)
|
|
|
|
test_auth_call()
|
|
|
|
def test_authenticate_tenant_id(self):
|
|
cs = client.Client("username", "password", auth_url="auth_url/v2.0",
|
|
tenant_id='tenant_id', service_type='compute')
|
|
resp = {
|
|
"access": {
|
|
"token": {
|
|
"expires": "12345",
|
|
"id": "FAKE_ID",
|
|
"tenant": {
|
|
"description": None,
|
|
"enabled": True,
|
|
"id": "tenant_id",
|
|
"name": "demo"
|
|
} # tenant associated with token
|
|
},
|
|
"serviceCatalog": [
|
|
{
|
|
"type": "compute",
|
|
"endpoints": [
|
|
{
|
|
"region": "RegionOne",
|
|
"adminURL": "http://localhost:8774/v1",
|
|
"internalURL": "http://localhost:8774/v1",
|
|
"publicURL": "http://localhost:8774/v1/",
|
|
},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
}
|
|
auth_response = httplib2.Response({
|
|
"status": 200,
|
|
"body": json.dumps(resp), })
|
|
|
|
mock_request = mock.Mock(return_value=(auth_response,
|
|
json.dumps(resp)))
|
|
|
|
@mock.patch.object(httplib2.Http, "request", mock_request)
|
|
def test_auth_call():
|
|
cs.client.authenticate()
|
|
headers = {
|
|
'User-Agent': cs.client.USER_AGENT,
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json',
|
|
}
|
|
body = {
|
|
'auth': {
|
|
'passwordCredentials': {
|
|
'username': cs.client.user,
|
|
'password': cs.client.password,
|
|
},
|
|
'tenantId': cs.client.tenant_id,
|
|
},
|
|
}
|
|
|
|
token_url = cs.client.auth_url + "/tokens"
|
|
mock_request.assert_called_with(token_url, "POST",
|
|
headers=headers,
|
|
body=json.dumps(body))
|
|
|
|
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
|
|
public_url = endpoints[0]["publicURL"].rstrip('/')
|
|
self.assertEqual(cs.client.management_url, public_url)
|
|
token_id = resp["access"]["token"]["id"]
|
|
self.assertEqual(cs.client.auth_token, token_id)
|
|
tenant_id = resp["access"]["token"]["tenant"]["id"]
|
|
self.assertEqual(cs.client.tenant_id, tenant_id)
|
|
|
|
test_auth_call()
|
|
|
|
def test_authenticate_failure(self):
|
|
cs = client.Client("username", "password", "project_id",
|
|
"auth_url/v2.0")
|
|
resp = {"unauthorized": {"message": "Unauthorized", "code": "401"}}
|
|
auth_response = httplib2.Response({
|
|
"status": 401,
|
|
"body": json.dumps(resp), })
|
|
|
|
mock_request = mock.Mock(return_value=(auth_response,
|
|
json.dumps(resp)))
|
|
|
|
@mock.patch.object(httplib2.Http, "request", mock_request)
|
|
def test_auth_call():
|
|
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
|
|
|
|
test_auth_call()
|
|
|
|
def test_auth_redirect(self):
|
|
cs = client.Client("username", "password", "project_id",
|
|
"auth_url/v1", service_type='compute')
|
|
dict_correct_response = {
|
|
"access": {
|
|
"token": {
|
|
"expires": "12345",
|
|
"id": "FAKE_ID",
|
|
},
|
|
"serviceCatalog": [
|
|
{
|
|
"type": "compute",
|
|
"endpoints": [
|
|
{
|
|
"adminURL": "http://localhost:8774/v1",
|
|
"region": "RegionOne",
|
|
"internalURL": "http://localhost:8774/v1",
|
|
"publicURL": "http://localhost:8774/v1/",
|
|
},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
}
|
|
correct_response = json.dumps(dict_correct_response)
|
|
dict_responses = [
|
|
{"headers": {'location':'http://127.0.0.1:5001'},
|
|
"status": 305,
|
|
"body": "Use proxy"},
|
|
# Configured on admin port, cinder redirects to v2.0 port.
|
|
# When trying to connect on it, keystone auth succeed by v1.0
|
|
# protocol (through headers) but tokens are being returned in
|
|
# body (looks like keystone bug). Leaved for compatibility.
|
|
{"headers": {},
|
|
"status": 200,
|
|
"body": correct_response},
|
|
{"headers": {},
|
|
"status": 200,
|
|
"body": correct_response}
|
|
]
|
|
|
|
responses = [(to_http_response(resp), resp['body'])
|
|
for resp in dict_responses]
|
|
|
|
def side_effect(*args, **kwargs):
|
|
return responses.pop(0)
|
|
|
|
mock_request = mock.Mock(side_effect=side_effect)
|
|
|
|
@mock.patch.object(httplib2.Http, "request", mock_request)
|
|
def test_auth_call():
|
|
cs.client.authenticate()
|
|
headers = {
|
|
'User-Agent': cs.client.USER_AGENT,
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json',
|
|
}
|
|
body = {
|
|
'auth': {
|
|
'passwordCredentials': {
|
|
'username': cs.client.user,
|
|
'password': cs.client.password,
|
|
},
|
|
'tenantName': cs.client.projectid,
|
|
},
|
|
}
|
|
|
|
token_url = cs.client.auth_url + "/tokens"
|
|
mock_request.assert_called_with(token_url, "POST",
|
|
headers=headers,
|
|
body=json.dumps(body))
|
|
|
|
resp = dict_correct_response
|
|
endpoints = resp["access"]["serviceCatalog"][0]['endpoints']
|
|
public_url = endpoints[0]["publicURL"].rstrip('/')
|
|
self.assertEqual(cs.client.management_url, public_url)
|
|
token_id = resp["access"]["token"]["id"]
|
|
self.assertEqual(cs.client.auth_token, token_id)
|
|
|
|
test_auth_call()
|
|
|
|
def test_ambiguous_endpoints(self):
|
|
cs = client.Client("username", "password", "project_id",
|
|
"auth_url/v2.0", service_type='compute')
|
|
resp = {
|
|
"access": {
|
|
"token": {
|
|
"expires": "12345",
|
|
"id": "FAKE_ID",
|
|
},
|
|
"serviceCatalog": [
|
|
{
|
|
"adminURL": "http://localhost:8774/v1",
|
|
"type": "compute",
|
|
"name": "Compute CLoud",
|
|
"endpoints": [
|
|
{
|
|
"region": "RegionOne",
|
|
"internalURL": "http://localhost:8774/v1",
|
|
"publicURL": "http://localhost:8774/v1/",
|
|
},
|
|
],
|
|
},
|
|
{
|
|
"adminURL": "http://localhost:8774/v1",
|
|
"type": "compute",
|
|
"name": "Hyper-compute Cloud",
|
|
"endpoints": [
|
|
{
|
|
"internalURL": "http://localhost:8774/v1",
|
|
"publicURL": "http://localhost:8774/v1/",
|
|
},
|
|
],
|
|
},
|
|
],
|
|
},
|
|
}
|
|
auth_response = httplib2.Response(
|
|
{
|
|
"status": 200,
|
|
"body": json.dumps(resp),
|
|
})
|
|
|
|
mock_request = mock.Mock(return_value=(auth_response,
|
|
json.dumps(resp)))
|
|
|
|
@mock.patch.object(httplib2.Http, "request", mock_request)
|
|
def test_auth_call():
|
|
self.assertRaises(exceptions.AmbiguousEndpoints,
|
|
cs.client.authenticate)
|
|
|
|
test_auth_call()
|
|
|
|
|
|
class AuthenticationTests(utils.TestCase):
|
|
def test_authenticate_success(self):
|
|
cs = client.Client("username", "password", "project_id", "auth_url")
|
|
management_url = 'https://servers.api.rackspacecloud.com/v1.1/443470'
|
|
auth_response = httplib2.Response({
|
|
'status': 204,
|
|
'x-server-management-url': management_url,
|
|
'x-auth-token': '1b751d74-de0c-46ae-84f0-915744b582d1',
|
|
})
|
|
mock_request = mock.Mock(return_value=(auth_response, None))
|
|
|
|
@mock.patch.object(httplib2.Http, "request", mock_request)
|
|
def test_auth_call():
|
|
cs.client.authenticate()
|
|
headers = {
|
|
'Accept': 'application/json',
|
|
'X-Auth-User': 'username',
|
|
'X-Auth-Key': 'password',
|
|
'X-Auth-Project-Id': 'project_id',
|
|
'User-Agent': cs.client.USER_AGENT
|
|
}
|
|
mock_request.assert_called_with(cs.client.auth_url, 'GET',
|
|
headers=headers)
|
|
self.assertEqual(cs.client.management_url,
|
|
auth_response['x-server-management-url'])
|
|
self.assertEqual(cs.client.auth_token,
|
|
auth_response['x-auth-token'])
|
|
|
|
test_auth_call()
|
|
|
|
def test_authenticate_failure(self):
|
|
cs = client.Client("username", "password", "project_id", "auth_url")
|
|
auth_response = httplib2.Response({'status': 401})
|
|
mock_request = mock.Mock(return_value=(auth_response, None))
|
|
|
|
@mock.patch.object(httplib2.Http, "request", mock_request)
|
|
def test_auth_call():
|
|
self.assertRaises(exceptions.Unauthorized, cs.client.authenticate)
|
|
|
|
test_auth_call()
|
|
|
|
def test_auth_automatic(self):
|
|
cs = client.Client("username", "password", "project_id", "auth_url")
|
|
http_client = cs.client
|
|
http_client.management_url = ''
|
|
mock_request = mock.Mock(return_value=(None, None))
|
|
|
|
@mock.patch.object(http_client, 'request', mock_request)
|
|
@mock.patch.object(http_client, 'authenticate')
|
|
def test_auth_call(m):
|
|
http_client.get('/')
|
|
m.assert_called()
|
|
mock_request.assert_called()
|
|
|
|
test_auth_call()
|
|
|
|
def test_auth_manual(self):
|
|
cs = client.Client("username", "password", "project_id", "auth_url")
|
|
|
|
@mock.patch.object(cs.client, 'authenticate')
|
|
def test_auth_call(m):
|
|
cs.authenticate()
|
|
m.assert_called()
|
|
|
|
test_auth_call()
|