tacker/tacker/tests/unit/test_auth.py

886 lines
33 KiB
Python

# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import ddt
from oslo_config import cfg
from oslo_middleware import request_id
import requests
from requests_mock.contrib import fixture as requests_mock_fixture
import tacker.api.vnflcm.v1.router as vnflcm_router
import tacker.api.vnfpkgm.v1.router as vnfpkgm_router
from tacker import auth
from tacker.tests import base as test_base
from tacker.tests.unit import base as unit_base
from tacker.tests.unit import fake_auth
import threading
from tacker.tests import uuidsentinel
from oslo_log import log as logging
from unittest import mock
import webob
LOG = logging.getLogger(__name__)
def _count_mock_history(history, *url):
req_count = 0
for mock_history in history:
actual_url = '{}://{}'.format(mock_history.scheme,
mock_history.hostname)
if actual_url in url:
req_count += 1
return req_count
class TackerKeystoneContextTestCase(test_base.BaseTestCase):
def setUp(self):
super(TackerKeystoneContextTestCase, self).setUp()
@webob.dec.wsgify
def fake_app(req):
self.context = req.environ['tacker.context']
return webob.Response()
self.context = None
self.middleware = auth.TackerKeystoneContext(fake_app)
self.request = webob.Request.blank('/')
self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
def test_no_user_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
response = self.request.get_response(self.middleware)
self.assertEqual('401 Unauthorized', response.status)
def test_with_user_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)
self.assertEqual('testuserid', self.context.user_id)
def test_with_tenant_id(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'test_user_id'
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)
self.assertEqual('testtenantid', self.context.tenant_id)
self.assertEqual('testtenantid', self.context.project_id)
def test_roles_no_admin(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.headers['X_ROLES'] = 'role1, role2 , role3,role4,role5'
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)
self.assertEqual(['role1', 'role2', 'role3', 'role4', 'role5'],
self.context.roles)
self.assertFalse(self.context.is_admin)
def test_roles_with_admin(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.headers['X_ROLES'] = ('role1, role2 , role3,role4,role5,'
'AdMiN')
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)
self.assertEqual(['role1', 'role2', 'role3', 'role4', 'role5',
'AdMiN'],
self.context.roles)
self.assertTrue(self.context.is_admin)
def test_with_user_tenant_name(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.headers['X_PROJECT_NAME'] = 'testtenantname'
self.request.headers['X_USER_NAME'] = 'testusername'
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)
self.assertEqual('testuserid', self.context.user_id)
self.assertEqual('testusername', self.context.user_name)
self.assertEqual('testtenantid', self.context.tenant_id)
self.assertEqual('testtenantname', self.context.tenant_name)
def test_request_id_extracted_from_env(self):
req_id = 'dummy-request-id'
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
self.request.environ[request_id.ENV_REQUEST_ID] = req_id
self.request.get_response(self.middleware)
self.assertEqual(req_id, self.context.request_id)
def test_with_auth_token(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)
self.assertEqual('testauthtoken', self.context.auth_token)
def test_without_auth_token(self):
self.request.headers['X_PROJECT_ID'] = 'testtenantid'
self.request.headers['X_USER_ID'] = 'testuserid'
del self.request.headers['X_AUTH_TOKEN']
self.request.get_response(self.middleware)
self.assertIsNone(self.context.auth_token)
@ddt.ddt
class TestAuthManager(test_base.BaseTestCase):
def setUp(self):
super(TestAuthManager, self).setUp()
self.url = 'https://oauth2/tokens'
self.user_name = 'test_user'
self.password = 'test_password'
auth.auth_manager = auth._AuthManager()
self.requests_mock = self.useFixture(requests_mock_fixture.Fixture())
def tearDown(self):
super(TestAuthManager, self).tearDown()
self.addCleanup(mock.patch.stopall)
def test_init(self):
self.assertEqual(None, cfg.CONF.authentication.auth_type)
self.assertEqual(20, cfg.CONF.authentication.timeout)
self.assertEqual(None, cfg.CONF.authentication.token_endpoint)
self.assertEqual(None, cfg.CONF.authentication.client_id)
self.assertEqual(None, cfg.CONF.authentication.client_password)
self.assertEqual(None, cfg.CONF.authentication.user_name)
self.assertEqual(None, cfg.CONF.authentication.password)
def test_get_auth_client_oauth2_client_credentials_with_local(self):
cfg.CONF.set_override('auth_type', 'OAUTH2_CLIENT_CREDENTIALS',
group='authentication')
cfg.CONF.set_override('token_endpoint', self.url,
group='authentication')
cfg.CONF.set_override('client_id', self.user_name,
group='authentication')
cfg.CONF.set_override('client_password', self.password,
group='authentication')
self.requests_mock.register_uri('POST',
self.url,
json={'access_token': 'test_token3', 'token_type': 'bearer'},
headers={'Content-Type': 'application/json'},
status_code=200)
auth.auth_manager = auth._AuthManager()
client = auth.auth_manager.get_auth_client()
self.assertIsInstance(client, auth._OAuth2Session)
self.assertEqual(
self.user_name,
client.grant.client_id)
self.assertEqual(
self.password,
client.grant.client_password)
self.assertEqual(
self.url,
client.grant.token_endpoint)
history = self.requests_mock.request_history
self.assertEqual(1, len(history))
def test_get_auth_client_basic_with_local(self):
cfg.CONF.set_override('auth_type', 'BASIC',
group='authentication')
cfg.CONF.set_override('user_name', self.user_name,
group='authentication')
cfg.CONF.set_override('password', self.password,
group='authentication')
auth.auth_manager = auth._AuthManager()
client = auth.auth_manager.get_auth_client()
self.assertIsInstance(client, auth._BasicAuthSession)
self.assertEqual(self.user_name, client.user_name)
self.assertEqual(self.password, client.password)
history = self.requests_mock.request_history
self.assertEqual(0, len(history))
def test_get_auth_client_noauth_with_local(self):
cfg.CONF.set_override('auth_type', None,
group='authentication')
client = auth.auth_manager.get_auth_client()
self.assertIsInstance(client, requests.Session)
history = self.requests_mock.request_history
self.assertEqual(0, len(history))
def test_get_auth_client_oauth2_client_credentials_with_subscription(self):
self.requests_mock.register_uri('POST',
self.url,
json={'access_token': 'test_token', 'token_type': 'bearer'},
headers={'Content-Type': 'application/json'},
status_code=200)
params_oauth2_client_credentials = {
'clientId': self.user_name,
'clientPassword': self.password,
'tokenEndpoint': self.url}
auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id,
auth_type='OAUTH2_CLIENT_CREDENTIALS',
auth_params=params_oauth2_client_credentials)
client = auth.auth_manager.get_auth_client(
id=uuidsentinel.subscription_id)
self.assertIsInstance(client, auth._OAuth2Session)
self.assertEqual(
self.user_name,
client.grant.client_id)
self.assertEqual(
self.password,
client.grant.client_password)
self.assertEqual(
self.url,
client.grant.token_endpoint)
history = self.requests_mock.request_history
self.assertEqual(1, len(history))
def test_get_auth_client_basic_with_subscription(self):
params_basic = {
'userName': self.user_name,
'password': self.password}
auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id,
auth_type='BASIC',
auth_params=params_basic)
client = auth.auth_manager.get_auth_client(
id=uuidsentinel.subscription_id)
self.assertIsInstance(client, auth._BasicAuthSession)
self.assertEqual(self.user_name, client.user_name)
self.assertEqual(self.password, client.password)
history = self.requests_mock.request_history
self.assertEqual(0, len(history))
def test_set_auth_client_noauth(self):
auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id,
auth_type=None,
auth_params={})
manages = auth.auth_manager._AuthManager__manages
self.assertNotIn(uuidsentinel.subscription_id, manages)
def test_set_auth_client_basic(self):
params_basic = {
'userName': self.user_name,
'password': self.password}
auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id,
auth_type='BASIC',
auth_params=params_basic)
manages = auth.auth_manager._AuthManager__manages
self.assertIn(uuidsentinel.subscription_id, manages)
client = manages.get(uuidsentinel.subscription_id)
self.assertIsInstance(client, auth._BasicAuthSession)
self.assertEqual(self.user_name, client.user_name)
self.assertEqual(self.password, client.password)
def test_set_auth_client_oauth2_client_credentials(self):
self.requests_mock.register_uri(
'POST', self.url,
json={
'access_token': 'test_token', 'token_type': 'bearer'},
headers={
'Content-Type': 'application/json'},
status_code=200)
params_oauth2_client_credentials = {
'clientId': self.user_name,
'clientPassword': self.password,
'tokenEndpoint': self.url}
auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id,
auth_type='OAUTH2_CLIENT_CREDENTIALS',
auth_params=params_oauth2_client_credentials)
manages = auth.auth_manager._AuthManager__manages
self.assertIn(uuidsentinel.subscription_id, manages)
client = manages.get(uuidsentinel.subscription_id)
self.assertIsInstance(client, auth._OAuth2Session)
self.assertEqual(
self.user_name,
client.grant.client_id)
self.assertEqual(
self.password,
client.grant.client_password)
self.assertEqual(
self.url,
client.grant.token_endpoint)
history = self.requests_mock.request_history
self.assertEqual(1, len(history))
def test_set_auth_client_used_chahe(self):
params_basic = {
'userName': self.user_name,
'password': self.password}
auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id,
auth_type='BASIC',
auth_params=params_basic)
params_oauth2_client_credentials = {
'clientId': self.user_name,
'clientPassword': self.password,
'tokenEndpoint': self.url}
auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id,
auth_type='OAUTH2_CLIENT_CREDENTIALS',
auth_params=params_oauth2_client_credentials)
manages = auth.auth_manager._AuthManager__manages
self.assertIn(uuidsentinel.subscription_id, manages)
client = manages.get(uuidsentinel.subscription_id)
self.assertIsInstance(client, auth._BasicAuthSession)
self.assertEqual(self.user_name, client.user_name)
self.assertEqual(self.password, client.password)
@ddt.ddt
class TestBasicAuthSession(test_base.BaseTestCase):
def setUp(self):
super(TestBasicAuthSession, self).setUp()
self.url = 'https://oauth2/tokens'
self.user_name = 'test_user'
self.password = 'test_password'
self.requests_mock = self.useFixture(requests_mock_fixture.Fixture())
def tearDown(self):
super(TestBasicAuthSession, self).tearDown()
self.addCleanup(mock.patch.stopall)
@ddt.data('GET', 'PUT', 'POST', 'DELETE', 'PATCH')
def test_request(self, http_method):
client = auth._BasicAuthSession(
user_name=self.user_name,
password=self.password)
self.requests_mock.register_uri(http_method,
'https://nfvo.co.jp',
headers={'Content-Type': 'application/json'},
status_code=200)
if http_method == 'GET':
response = client.get(
'https://nfvo.co.jp',
params={
'sample_key': 'sample_value'})
elif http_method == 'PUT':
response = client.put(
'https://nfvo.co.jp',
data={
'sample_key': 'sample_value'})
elif http_method == 'POST':
response = client.post(
'https://nfvo.co.jp',
data={
'sample_key': 'sample_value'})
elif http_method == 'DELETE':
response = client.delete(
'https://nfvo.co.jp',
params={
'sample_key': 'sample_value'})
elif http_method == 'PATCH':
response = client.patch(
'https://nfvo.co.jp',
data={
'sample_key': 'sample_value'})
self.assertEqual(200, response.status_code)
history = self.requests_mock.request_history
self.assertEqual(1, len(history))
@ddt.ddt
class TestOAuth2Session(test_base.BaseTestCase):
class MockThread(threading.Timer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def start(self):
super().start()
super().join(60)
def setUp(self):
super(TestOAuth2Session, self).setUp()
self.url = 'https://oauth2/tokens'
self.user_name = 'test_user'
self.password = 'test_password'
self.requests_mock = self.useFixture(requests_mock_fixture.Fixture())
def tearDown(self):
super(TestOAuth2Session, self).tearDown()
self.addCleanup(mock.patch.stopall)
def test_apply_access_token_info(self):
res_mock = {
'json': {
'access_token': 'test_token',
'token_type': 'bearer',
'expires_in': '1'},
'headers': {'Content-Type': 'application/json'},
'status_code': 200}
res_mock2 = {
'json': {
'access_token': 'test_token2',
'token_type': 'bearer'},
'headers': {'Content-Type': 'application/json'},
'status_code': 200}
self.requests_mock.register_uri(
'POST',
self.url, [res_mock, res_mock2])
grant = auth._ClientCredentialsGrant(
client_id=self.user_name,
client_password=self.password,
token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
client = auth._OAuth2Session(grant)
client.apply_access_token_info()
history = self.requests_mock.request_history
self.assertEqual(2, len(history))
self.assertEqual(1, m.call_count)
def test_apply_access_token_info_fail_error_response(self):
error_description = """
Either your username or password is incorrect
or you are not an active user.
Please try again or contact your administrator.
"""
self.requests_mock.register_uri(
'POST',
self.url,
headers={
'Content-Type': 'application/json;charset=UTF-8',
'Cache-Control': 'no-store',
'Pragma': 'no-store',
'WWW-Authenticate': 'Basic realm="example"'},
json={
'error': 'invalid_client',
'error_description': error_description},
status_code=401)
grant = auth._ClientCredentialsGrant(
client_id=self.user_name,
client_password=self.password,
token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
try:
client = auth._OAuth2Session(grant)
client.apply_access_token_info()
except requests.exceptions.RequestException as e:
self.assertEqual(401, e.response.status_code)
history = self.requests_mock.request_history
self.assertEqual(1, len(history))
self.assertEqual(0, m.call_count)
def test_apply_access_token_info_fail_timeout(self):
self.requests_mock.register_uri(
'POST',
self.url,
exc=requests.exceptions.ConnectTimeout)
grant = auth._ClientCredentialsGrant(
client_id=self.user_name,
client_password=self.password,
token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
try:
client = auth._OAuth2Session(grant)
client.apply_access_token_info()
except requests.exceptions.RequestException as e:
self.assertIsNone(e.response)
history = self.requests_mock.request_history
self.assertEqual(1, len(history))
self.assertEqual(0, m.call_count)
def test_schedule_refrash_token_expaire(self):
self.requests_mock.register_uri(
'GET',
self.url,
headers={'Content-Type': 'application/json'},
json={
'access_token': 'test_token',
'token_type': 'bearer'},
status_code=200)
grant = auth._ClientCredentialsGrant(
client_id=self.user_name,
client_password=self.password,
token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
client = auth._OAuth2Session(grant)
client._OAuth2Session__access_token_info.update({
'access_token': 'test_token',
'token_type': 'bearer',
'expires_in': '1'})
client.schedule_refrash_token()
history = self.requests_mock.request_history
self.assertEqual(1, len(history))
self.assertEqual(1, m.call_count)
def test_schedule_refrash_token_non_expaire(self):
grant = auth._ClientCredentialsGrant(
client_id=self.user_name,
client_password=self.password,
token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
client = auth._OAuth2Session(grant)
client._OAuth2Session__access_token_info.update({
'access_token': 'test_token',
'token_type': 'bearer'})
client.schedule_refrash_token()
history = self.requests_mock.request_history
self.assertEqual(0, len(history))
self.assertEqual(0, m.call_count)
@ddt.data(None, "")
def test_schedule_refrash_token_invalid_value(self, invalid_value):
grant = auth._ClientCredentialsGrant(
client_id=self.user_name,
client_password=self.password,
token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
client = auth._OAuth2Session(grant)
client._OAuth2Session__access_token_info.update({
'access_token': 'test_token',
'token_type': 'bearer',
'expires_in': invalid_value})
client.schedule_refrash_token()
history = self.requests_mock.request_history
self.assertEqual(0, len(history))
self.assertEqual(0, m.call_count)
@ddt.data('GET', 'PUT', 'POST', 'DELETE', 'PATCH')
def test_request_client_credentials(self, http_method):
self.requests_mock.register_uri('POST',
self.url,
json={'access_token': 'test_token3', 'token_type': 'bearer'},
headers={'Content-Type': 'application/json'},
status_code=200)
grant = auth._ClientCredentialsGrant(
client_id=self.user_name,
client_password=self.password,
token_endpoint=self.url)
client = auth._OAuth2Session(grant)
client.apply_access_token_info()
self.requests_mock.register_uri(http_method,
'https://nfvo.co.jp',
headers={'Content-Type': 'application/json'},
status_code=200)
if http_method == 'GET':
response = client.get(
'https://nfvo.co.jp',
params={
'sample_key': 'sample_value'})
elif http_method == 'PUT':
response = client.put(
'https://nfvo.co.jp',
data={
'sample_key': 'sample_value'})
elif http_method == 'POST':
response = client.post(
'https://nfvo.co.jp',
data={
'sample_key': 'sample_value'})
elif http_method == 'DELETE':
response = client.delete(
'https://nfvo.co.jp',
params={
'sample_key': 'sample_value'})
elif http_method == 'PATCH':
response = client.patch(
'https://nfvo.co.jp',
data={
'sample_key': 'sample_value'})
self.assertEqual(200, response.status_code)
history = self.requests_mock.request_history
self.assertEqual(2, len(history))
def test_request_client_credentials_auth_error(self):
self.requests_mock.register_uri('POST',
self.url,
json={'access_token': 'test_token3', 'token_type': 'bearer'},
headers={'Content-Type': 'application/json'},
status_code=200)
self.requests_mock.register_uri('GET',
"https://nfvo.co.jp",
text="error.",
status_code=401)
grant = auth._ClientCredentialsGrant(
client_id=self.user_name,
client_password=self.password,
token_endpoint=self.url)
client = auth._OAuth2Session(grant)
client.apply_access_token_info()
response = client.get('https://nfvo.co.jp')
self.assertEqual(401, response.status_code)
history = self.requests_mock.request_history
self.assertEqual(3, len(history))
class TestAuthValidateBearer(unit_base.FixturedTestCase):
def setUp(self):
super(TestAuthValidateBearer, self).setUp()
token_type = 'Bearer'
api_name = 'dummy'
token_value = 'SampleAccessToken'
application_type = vnflcm_router.VnflcmAPIRouter
self.auth_opts = [cfg.ListOpt('vnflcm_dummy_scope',
default='test_api',
help="OAuth2.0 api token scope for create")]
cfg.CONF.register_opts(self.auth_opts, group='authentication')
self.requests_mock = self.useFixture(requests_mock_fixture.Fixture())
self.url = 'http://auth/authorize/'
self.auth_bearer = auth._AuthValidateBearer(
application_type, api_name, token_type, token_value)
auth._AuthValidateManager()
def tearDown(self):
super(TestAuthValidateBearer, self).tearDown()
@mock.patch.object(auth._AuthValidateBearer, 'request')
def test_do_auth_no_response(self, mock_request):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
mock_request.return_value = None
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_bearer.do_auth)
def test_do_auth_no_token_value_in_response(self):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
cfg.CONF.set_override('auth_url', 'http://auth/authorize/',
group='authentication')
update = {'access_token': None}
json = fake_auth.fake_response(**update)
self.requests_mock.register_uri('GET',
self.url,
json=json,
headers={'Content-Type': 'application/json'},
status_code=200)
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_bearer.do_auth)
history = self.requests_mock.request_history
req_count = _count_mock_history(history, 'http://auth')
self.assertEqual(1, req_count)
def test_do_auth_no_token_type_in_response(self):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
cfg.CONF.set_override('auth_url', 'http://auth/authorize/',
group='authentication')
update = {'token_type': None}
json = fake_auth.fake_response(**update)
self.requests_mock.register_uri('GET',
self.url,
json=json,
headers={'Content-Type': 'application/json'},
status_code=200)
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_bearer.do_auth)
history = self.requests_mock.request_history
req_count = _count_mock_history(history, 'http://auth')
self.assertEqual(1, req_count)
def test_do_auth_invalid_token_value(self):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
cfg.CONF.set_override('auth_url', 'http://auth/authorize/',
group='authentication')
update = {'access_token': 'Test'}
json = fake_auth.fake_response(**update)
self.requests_mock.register_uri('GET',
self.url,
json=json,
headers={'Content-Type': 'application/json'},
status_code=200)
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_bearer.do_auth)
history = self.requests_mock.request_history
req_count = _count_mock_history(history, 'http://auth')
self.assertEqual(1, req_count)
def test_do_auth_invalid_scope(self):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
cfg.CONF.set_override('auth_url', 'http://auth/authorize/',
group='authentication')
json = fake_auth.fake_response()
self.requests_mock.register_uri('GET',
self.url,
json=json,
headers={'Content-Type': 'application/json'},
status_code=200)
self.assertRaises(webob.exc.HTTPForbidden, self.auth_bearer.do_auth)
history = self.requests_mock.request_history
req_count = _count_mock_history(history, 'http://auth')
self.assertEqual(1, req_count)
class TestAuthValidateBasic(unit_base.FixturedTestCase):
def setUp(self):
super(TestAuthValidateBasic, self).setUp()
self.api_name = 'test'
self.user_name = 'test_user'
self.password = 'test_pass'
self.token_type = 'Basic'
self.token_value = self._encode_base64(self.user_name + self.password)
self.auth_basic = auth._AuthValidateBasic(
self.api_name, self.token_type, self.token_value)
auth._AuthValidateManager()
def _encode_base64(self, info):
encode = base64.b64encode(info.encode())
return encode
def test_do_auth(self):
cfg.CONF.set_override('token_type', 'Basic',
group='authentication')
cfg.CONF.set_override('user_name', self.user_name,
group='authentication')
cfg.CONF.set_override('password', self.password,
group='authentication')
auth._AuthValidateBasic(self.api_name, self.token_type,
self.token_value)
self.auth_basic.do_auth()
def test_do_auth_invalid_token_value(self):
cfg.CONF.set_override('token_type', 'Basic',
group='authentication')
cfg.CONF.set_override('user_name', 'test',
group='authentication')
cfg.CONF.set_override('password', self.password,
group='authentication')
auth._AuthValidateBasic(
self.api_name,
self.token_type,
self.token_value)
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_basic.do_auth)
def test_do_auth_invalid_token_type(self):
cfg.CONF.set_override('token_type', 'Basic',
group='authentication')
self.auth_basic = auth._AuthValidateBasic(
'test', 'test_type', 'test_val')
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_basic.do_auth)
@ddt.ddt
class TestAuthValidateManager(unit_base.FixturedTestCase):
def setUp(self):
super(TestAuthValidateManager, self).setUp()
self.auth_validate = auth._AuthValidateManager()
@ddt.data(vnflcm_router.VnflcmAPIRouter, vnfpkgm_router.VnfpkgmAPIRouter)
def test_auth_main_bearer(self, obj):
mock_response = mock.MagicMock()
mock_response.request = mock.MagicMock()
mock_response.request.headers = {
'Authorization': 'Bearer 123456abc'}
mock_response.application.return_value = obj
ret = self.auth_validate._get_auth_type(
mock_response.request, mock_response.application)
self.assertIsInstance(ret, auth._AuthValidateBearer)
def test_auth_main_basic(self):
mock_response = mock.MagicMock()
mock_response.request = mock.MagicMock()
mock_response.request.headers = {
'Authorization': 'Basic 123456abc'}
mock_response.application = mock.MagicMock()
mock_response.application.return_value = vnflcm_router.VnflcmAPIRouter
ret = self.auth_validate._get_auth_type(
mock_response.request, mock_response.application)
self.assertIsInstance(ret, auth._AuthValidateBasic)
def test_auth_main_none(self):
mock_response = mock.MagicMock()
mock_response.request = mock.MagicMock()
mock_response.request.headers = {
'Authorization': 'Test 123456abc'}
mock_response.application.return_value = vnflcm_router.VnflcmAPIRouter
ret = self.auth_validate._get_auth_type(
mock_response.request, mock_response.application)
self.assertIsInstance(ret, auth._AuthValidateIgnore)
class TestAuthValidatorExecution(test_base.BaseTestCase):
def setUp(self):
super(TestAuthValidatorExecution, self).setUp()
@webob.dec.wsgify
def fake_app(req):
# self.context = req.environ['tacker.context']
return webob.Response()
self.context = None
self.middleware = auth.AuthValidatorExecution(fake_app)
self.request = webob.Request.blank('/')
self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
@mock.patch.object(auth.auth_validator_manager, "auth_main")
def test_called(self, mock_auth_main):
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)