python-apmecclient/apmecclient/tests/unit/test_auth.py

371 lines
14 KiB
Python

# Copyright 2012 NEC Corporation
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import json
import uuid
from keystoneclient import exceptions as k_exceptions
import mock
import requests
import testtools
from apmecclient import client
from apmecclient.common import exceptions
USERNAME = 'testuser'
USER_ID = 'testuser_id'
TENANT_NAME = 'testtenant'
TENANT_ID = 'testtenant_id'
PASSWORD = 'password'
AUTH_URL = 'authurl'
ENDPOINT_URL = 'localurl'
ENDPOINT_OVERRIDE = 'otherurl'
TOKEN = 'tokentoken'
REGION = 'RegionTest'
NOAUTH = 'noauth'
KS_TOKEN_RESULT = {
'access': {
'token': {'id': TOKEN,
'expires': '2012-08-11T07:49:01Z',
'tenant': {'id': str(uuid.uuid1())}},
'user': {'id': str(uuid.uuid1())},
'serviceCatalog': [
{'endpoints_links': [],
'endpoints': [{'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL,
'region': REGION}],
'type': 'mec-orchestration',
'name': 'Apmec Service'}
]
}
}
ENDPOINTS_RESULT = {
'endpoints': [{
'type': 'mec-orchestration',
'name': 'Apmec Service',
'region': REGION,
'adminURL': ENDPOINT_URL,
'internalURL': ENDPOINT_URL,
'publicURL': ENDPOINT_URL
}]
}
def get_response(status_code, headers=None):
response = mock.Mock().CreateMock(requests.Response)
response.headers = headers or {}
response.status_code = status_code
return response
resp_200 = get_response(200)
resp_401 = get_response(401)
headers = {'X-Auth-Token': '',
'User-Agent': 'python-apmecclient'}
expected_headers = {'X-Auth-Token': TOKEN,
'User-Agent': 'python-apmecclient'}
agent_header = {'User-Agent': 'python-apmecclient'}
class CLITestAuthNoAuth(testtools.TestCase):
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthNoAuth, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
endpoint_url=ENDPOINT_URL,
auth_strategy=NOAUTH,
region_name=REGION)
self.addCleanup(mock.patch.stopall)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_noauth(self, mock_request):
mock_request.return_value = (resp_200, '')
self.client.do_request('/resource', 'GET',
headers=headers)
mock_request.assert_called_once_with(
ENDPOINT_URL + '/resource',
'GET',
headers=headers)
self.assertEqual(self.client.endpoint_url, ENDPOINT_URL)
class CLITestAuthKeystone(testtools.TestCase):
# Auth Body expected
auth_body = ('{"auth": {"tenantName": "testtenant", '
'"passwordCredentials": '
'{"username": "testuser", "password": "password"}}}')
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystone, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
self.addCleanup(mock.patch.stopall)
def test_reused_token_get_auth_info(self):
"""Test Client.get_auth_info().
Test that Client.get_auth_info() works even if client was
instantiated with predefined token.
"""
client_ = client.HTTPClient(username=USERNAME,
tenant_name=TENANT_NAME,
token=TOKEN,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
expected = {'auth_token': TOKEN,
'auth_tenant_id': None,
'auth_user_id': None,
'endpoint_url': self.client.endpoint_url}
self.assertEqual(client_.get_auth_info(), expected)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_token(self, mock_request):
mock_request.return_value = (resp_200, json.dumps(KS_TOKEN_RESULT))
self.client.do_request('/resource', 'GET')
mock_request.assert_called_with(
ENDPOINT_URL + '/resource', 'GET',
headers=expected_headers)
self.assertEqual(self.client.endpoint_url, ENDPOINT_URL)
self.assertEqual(self.client.auth_token, TOKEN)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_refresh_token(self, mock_request):
self.client.auth_token = TOKEN
self.client.endpoint_url = ENDPOINT_URL
# If a token is expired, apmec server retruns 401
mock_request.return_value = (resp_401, '')
self.assertRaises(exceptions.Unauthorized,
self.client.do_request,
'/resource',
'GET')
mock_request.return_value = (resp_200, json.dumps(KS_TOKEN_RESULT))
self.client.do_request('/resource', 'GET')
mock_request.assert_called_with(
ENDPOINT_URL + '/resource', 'GET',
headers=expected_headers)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_refresh_token_no_auth_url(self, mock_request):
self.client.auth_url = None
self.client.auth_token = TOKEN
self.client.endpoint_url = ENDPOINT_URL
# If a token is expired, apmec server retruns 401
mock_request.return_value = (resp_401, '')
self.assertRaises(exceptions.NoAuthURLProvided,
self.client.do_request,
'/resource',
'GET')
expected_url = ENDPOINT_URL + '/resource'
mock_request.assert_called_with(expected_url, 'GET',
headers=expected_headers)
def test_get_endpoint_url_with_invalid_auth_url(self):
# Handle the case when auth_url is not provided
self.client.auth_url = None
self.assertRaises(exceptions.NoAuthURLProvided,
self.client._get_endpoint_url)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_endpoint_url(self, mock_request):
self.client.auth_token = TOKEN
mock_request.return_value = (resp_200, json.dumps(ENDPOINTS_RESULT))
self.client.do_request('/resource', 'GET')
mock_request.assert_called_with(
ENDPOINT_URL + '/resource', 'GET',
headers=expected_headers)
mock_request.return_value = (resp_200, '')
self.client.do_request('/resource', 'GET',
headers=headers)
mock_request.assert_called_with(
ENDPOINT_URL + '/resource', 'GET',
headers=headers)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_use_given_endpoint_url(self, mock_request):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION,
endpoint_url=ENDPOINT_OVERRIDE)
self.assertEqual(self.client.endpoint_url, ENDPOINT_OVERRIDE)
self.client.auth_token = TOKEN
mock_request.return_value = (resp_200, '')
self.client.do_request('/resource', 'GET',
headers=headers)
mock_request.assert_called_with(
ENDPOINT_OVERRIDE + '/resource', 'GET',
headers=headers)
self.assertEqual(self.client.endpoint_url, ENDPOINT_OVERRIDE)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_endpoint_url_other(self, mock_request):
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='otherURL')
self.client.auth_token = TOKEN
mock_request.return_value = (resp_200, json.dumps(ENDPOINTS_RESULT))
self.assertRaises(exceptions.EndpointTypeNotFound,
self.client.do_request,
'/resource',
'GET')
expected_url = AUTH_URL + '/tokens/%s/endpoints' % TOKEN
headers = {'User-Agent': 'python-apmecclient'}
mock_request.assert_called_with(expected_url, 'GET',
headers=headers)
@mock.patch('apmecclient.client.HTTPClient.request')
def test_get_endpoint_url_failed(self, mock_request):
self.client.auth_token = TOKEN
self.client.auth_url = AUTH_URL + '/tokens/%s/endpoints' % TOKEN
mock_request.return_value = (resp_401, '')
self.assertRaises(exceptions.Unauthorized,
self.client.do_request,
'/resource',
'GET')
def test_endpoint_type(self):
resources = copy.deepcopy(KS_TOKEN_RESULT)
endpoints = resources['access']['serviceCatalog'][0]['endpoints'][0]
endpoints['internalURL'] = 'internal'
endpoints['adminURL'] = 'admin'
endpoints['publicURL'] = 'public'
# Test default behavior is to choose public.
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION)
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'public')
# Test admin url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='adminURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'admin')
# Test public url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='publicURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'public')
# Test internal url
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='internalURL')
self.client._extract_service_catalog(resources)
self.assertEqual(self.client.endpoint_url, 'internal')
# Test url that isn't found in the service catalog
self.client = client.HTTPClient(
username=USERNAME, tenant_name=TENANT_NAME, password=PASSWORD,
auth_url=AUTH_URL, region_name=REGION, endpoint_type='privateURL')
self.assertRaises(k_exceptions.EndpointNotFound,
self.client._extract_service_catalog,
resources)
@mock.patch('apmecclient.client.HTTPClient.request')
@mock.patch('apmecclient.common.utils.http_log_req')
def test_strip_credentials_from_log(self, mock_http_log_req,
mock_request,):
body = ('{"auth": {"tenantId": "testtenant_id",'
'"passwordCredentials": {"password": "password",'
'"userId": "testuser_id"}}}')
expected_body = ('{"auth": {"tenantId": "testtenant_id",'
'"REDACTEDCredentials": {"REDACTED": "REDACTED",'
'"userId": "testuser_id"}}}')
_headers = {'headers': expected_headers, 'body': expected_body}
mock_request.return_value = (resp_200, json.dumps(KS_TOKEN_RESULT))
self.client.do_request('/resource', 'GET', body=body)
args, kwargs = mock_http_log_req.call_args
# Check that credentials are stripped while logging.
self.assertEqual(_headers, args[2])
class CLITestAuthKeystoneWithId(CLITestAuthKeystone):
# Auth Body expected
auth_body = ('{"auth": {"passwordCredentials": '
'{"password": "password", "userId": "testuser_id"}, '
'"tenantId": "testtenant_id"}}')
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystoneWithId, self).setUp()
self.client = client.HTTPClient(user_id=USER_ID,
tenant_id=TENANT_ID,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)
class CLITestAuthKeystoneWithIdandName(CLITestAuthKeystone):
# Auth Body expected
auth_body = ('{"auth": {"passwordCredentials": '
'{"password": "password", "userId": "testuser_id"}, '
'"tenantId": "testtenant_id"}}')
def setUp(self):
"""Prepare the test environment."""
super(CLITestAuthKeystoneWithIdandName, self).setUp()
self.client = client.HTTPClient(username=USERNAME,
user_id=USER_ID,
tenant_id=TENANT_ID,
tenant_name=TENANT_NAME,
password=PASSWORD,
auth_url=AUTH_URL,
region_name=REGION)