Support of Server/Basic certification in OAuth2.0

Supported Server/Basic certification in OAuth2.0 as the part of
the feature of branching VNFM and NFVO in Tacker.

Implements: blueprint support-vnfm-operations
Spec: https://specs.openstack.org/openstack/tacker-specs/specs/victoria/support-sol003-vnfm-operations.html
Change-Id: Iba9dbe73c76e12603f80f95fe3093b8a7238cd7d
This commit is contained in:
Koichi Edagawa 2020-08-31 20:05:47 +09:00 committed by Aldinson C. Esto
parent a7dc3ab6b5
commit df399e54a5
5 changed files with 552 additions and 87 deletions

View File

@ -16,6 +16,7 @@
import routes import routes
from oslo_config import cfg
from tacker.api.vnflcm.v1 import controller as vnf_lcm_controller from tacker.api.vnflcm.v1 import controller as vnf_lcm_controller
from tacker import wsgi from tacker import wsgi
@ -32,13 +33,20 @@ class VnflcmAPIRouter(wsgi.Router):
all_methods = ['HEAD', 'GET', 'POST', 'PUT', 'PATCH', 'DELETE'] all_methods = ['HEAD', 'GET', 'POST', 'PUT', 'PATCH', 'DELETE']
missing_methods = [m for m in all_methods if m not in methods.keys()] missing_methods = [m for m in all_methods if m not in methods.keys()]
allowed_methods_str = ",".join(methods.keys()) allowed_methods_str = ",".join(methods.keys())
scope_opts = []
for method, action in methods.items(): for method, action in methods.items():
mapper.connect(url, mapper.connect(url,
controller=controller, controller=controller,
action=action, action=action,
conditions={'method': [method]}) conditions={'method': [method]})
add = cfg.ListOpt('vnflcm_' + action + '_scope',
default=[],
help="OAuth2.0 api token scope for" + action)
scope_opts.append(add)
cfg.CONF.register_opts(scope_opts, group='authentication')
if missing_methods: if missing_methods:
mapper.connect(url, mapper.connect(url,
controller=default_resource, controller=default_resource,

View File

@ -17,6 +17,7 @@
import routes import routes
from oslo_config import cfg
from tacker.api.vnfpkgm.v1 import controller as vnf_pkgm_controller from tacker.api.vnfpkgm.v1 import controller as vnf_pkgm_controller
from tacker import wsgi from tacker import wsgi
@ -32,13 +33,21 @@ class VnfpkgmAPIRouter(wsgi.Router):
all_methods = ['HEAD', 'GET', 'POST', 'PUT', 'PATCH', 'DELETE'] all_methods = ['HEAD', 'GET', 'POST', 'PUT', 'PATCH', 'DELETE']
missing_methods = [m for m in all_methods if m not in methods] missing_methods = [m for m in all_methods if m not in methods]
allowed_methods_str = ",".join(methods.keys()) allowed_methods_str = ",".join(methods.keys())
scope_opts = []
for method, action in methods.items(): for method, action in methods.items():
mapper.connect(url, mapper.connect(url,
controller=controller, controller=controller,
action=action, action=action,
conditions={'method': [method]}) conditions={'method': [method]})
add = cfg.ListOpt('vnfpkgm_'
+ action + '_scope',
default=[],
help="OAuth2.0 api token scope for" + action)
scope_opts.append(add)
cfg.CONF.register_opts(scope_opts, group='authentication')
if missing_methods: if missing_methods:
mapper.connect(url, mapper.connect(url,
controller=default_resource, controller=default_resource,

View File

@ -13,10 +13,13 @@
# under the License. # under the License.
import abc import abc
import base64
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_middleware import base from oslo_middleware import base
import requests import requests
from tacker.api.vnflcm.v1 import router as vnflcm_router
from tacker.api.vnfpkgm.v1 import router as vnfpkgm_router
import threading import threading
import webob.dec import webob.dec
import webob.exc import webob.exc
@ -286,6 +289,227 @@ class _AuthManager:
return self.__manages.get(id, self.__DEFAULT_CLIENT) return self.__manages.get(id, self.__DEFAULT_CLIENT)
class _AuthBase(metaclass=abc.ABCMeta):
def __init__(self, api_name, token_type, token_value):
self.api_name = api_name
self.token_type = token_type
self.token_value = token_value
@abc.abstractmethod
def do_auth(self):
pass
def _check_token_type(self):
if self.token_type == cfg.CONF.authentication.token_type:
return
msg = ("Auth Scheme is not expected token_type: expect={%s},\
actual={%s}" % (cfg.CONF.authentication.token_type, self.token_type))
raise webob.exc.HTTPUnauthorized(msg)
class _AuthValidateIgnore(_AuthBase):
def __init__(self, api_name, token_type, token_value):
super().__init__(api_name, token_type, token_value)
def do_auth(self):
conf_auth_type = cfg.CONF.authentication.token_type
if (conf_auth_type is not None and self.token_type is None):
msg = ("Request header is None but tacker conf\
has auth information.")
raise webob.exc.HTTPUnauthorized(msg)
return None
class _AuthValidateBearer(_AuthBase):
def __init__(self, application_type, api_name, token_type, token_value):
super().__init__(api_name, token_type, token_value)
self.application_type = application_type
self.expires_in = 0
self.res_access_token = None
self.__access_token_info = {}
def request(self):
auth_url = cfg.CONF.authentication.auth_url
response = requests.Session().get(auth_url)
if response.status_code == 401:
return None
return response
def do_auth(self):
self._check_token_type()
if self.res_access_token is None:
response = self.request()
if response is None:
msg = "No responce from Authorization Server."
raise webob.exc.HTTPUnauthorized(msg)
response_body = response.json()
if (response_body.get('access_token') is None or
response_body.get('token_type') is None):
msg = "No access_token or token_type exist."
raise webob.exc.HTTPUnauthorized(msg)
if self.token_value != response_body.get('access_token'):
msg = "access_token is invalid."
raise webob.exc.HTTPUnauthorized(msg)
self.expires_in = response_body.get('expires_in')
self.res_access_token = response_body.get('access_token')
self._validate_scope(response_body.get('scope'))
self._scheduler_delete_token(response_body)
def _scheduler_delete_token(self, response_body):
if not (response_body.get('expires_in')):
LOG.debug("'expires_in' does not exist in the response body.")
return
try:
expires_in = int(response_body.get('expires_in'))
expires_in_timer = threading.Timer(
expires_in, self._delete_access_token)
expires_in_timer.start()
LOG.info(
"expires_in=<{}> exist, scheduler regist.".format(expires_in))
except (ValueError, TypeError):
pass
def _delete_access_token(self):
self.access_token = None
self.expires_in = 0
def _generate_api_scope_name(self):
if self.application_type == vnflcm_router.VnflcmAPIRouter:
scope_prefix = 'vnflcm_'
return scope_prefix + self.api_name
elif self.application_type == vnfpkgm_router.VnfpkgmAPIRouter:
scope_prefix = 'vnfpkgm_'
return scope_prefix + self.api_name
return ''
def _validate_scope(self, res_scope):
if res_scope is None:
return
try:
api_scope_name = self._generate_api_scope_name() + '_scope'
scopes = cfg.CONF.authentication.__getitem__(api_scope_name)
except Exception:
msg = ("Getting scope error."
"api_scope_name={%s}, scopes={%s}",
api_scope_name, scopes)
raise webob.exc.HTTPUnauthorized(msg)
if len(scopes) == 0:
return
if res_scope in scopes:
return
raise webob.exc.HTTPForbidden("scope is undefined in tacker.conf")
class _AuthValidateBasic(_AuthBase):
def __init__(self, api_name, token_type, token_value):
super().__init__(api_name, token_type, token_value)
def do_auth(self):
self._check_token_type()
base = cfg.CONF.authentication.user_name +\
cfg.CONF.authentication.password
base64_encode = base64.b64encode(base.encode())
if self.token_value != base64_encode:
msg = "access_token and encoded base64 are not same."
raise webob.exc.HTTPUnauthorized(msg)
class _AuthValidateManager:
atuh_opts = [
cfg.StrOpt('token_type',
default=None,
choices=['Bearer', 'Basic'],
help="authentication type"),
cfg.StrOpt('user_name',
default=None,
help="user_name used in basic authentication"),
cfg.StrOpt('password',
default=None,
help="password used in basic authentication"),
cfg.StrOpt('auth_url',
default=None,
help="URL of the authorization server")
]
cfg.CONF.register_opts(atuh_opts, group='authentication')
def __init__(self):
self.__manages = {}
self.__lock = threading.RLock()
def __add_manages(self, token_value, auth_obj):
with self.__lock:
self.__manages[token_value] = auth_obj
def _parse_request_header(self, request):
auth_req = request.headers.get('Authorization')
if auth_req is None:
return (None, None)
auth_req_arry = auth_req.split(' ')
if len(auth_req_arry) > 3:
msg = "Invalid Authorization header."
raise webob.exc.HTTPUnauthorized(msg)
return (auth_req_arry[0], auth_req_arry[1])
def _get_auth_type(self, request, application):
token_type, token_value = self._parse_request_header(request)
if token_value in self.__manages:
return self.__manages.get(token_value)
match = application.map.match(request.path_info)
api_name = match[0].get("action")
if token_type == 'Bearer':
auth_obj = _AuthValidateBearer(
type(application), api_name, token_type, token_value)
self.__add_manages(token_value, auth_obj)
return auth_obj
elif token_type == 'Basic':
auth_obj = _AuthValidateBasic(api_name, token_type, token_value)
self.__add_manages(token_value, auth_obj)
return auth_obj
return _AuthValidateIgnore(api_name, token_type, token_value)
def auth_main(self, request, application):
auth_validator = self._get_auth_type(request, application)
if auth_validator:
auth_validator.do_auth()
class AuthValidatorExecution(base.ConfigurableMiddleware):
@ webob.dec.wsgify
def __call__(self, req):
auth_validator_manager.auth_main(req, self.application)
return self.application
def pipeline_factory(loader, global_conf, **local_conf): def pipeline_factory(loader, global_conf, **local_conf):
"""Create a paste pipeline based on the 'auth_strategy' config option.""" """Create a paste pipeline based on the 'auth_strategy' config option."""
pipeline = local_conf[cfg.CONF.auth_strategy] pipeline = local_conf[cfg.CONF.auth_strategy]
@ -299,3 +523,4 @@ def pipeline_factory(loader, global_conf, **local_conf):
auth_manager = _AuthManager() auth_manager = _AuthManager()
auth_validator_manager = _AuthValidateManager()

View File

@ -0,0 +1,23 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def fake_response(**update):
response = {"access_token": "SampleAccessToken",
"token_type": "SampleScheme",
"expires_in": 3600,
"scope": "create"
}
if update:
response.update(update)
return response

View File

@ -13,14 +13,18 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import base64
import ddt import ddt
from oslo_config import cfg from oslo_config import cfg
from oslo_middleware import request_id from oslo_middleware import request_id
import requests import requests
from requests_mock.contrib import fixture as requests_mock_fixture from requests_mock.contrib import fixture as requests_mock_fixture
import tacker.api.vnflcm.v1.router as vnflcm_router
import tacker.api.vnfpkgm.v1.router as vnfpkgm_router
from tacker import auth from tacker import auth
from tacker.tests import base from tacker.tests import base as test_base
import tacker.tests.unit.vnfm.test_nfvo_client as nfvo_client from tacker.tests.unit import base as unit_base
from tacker.tests.unit import fake_auth
import threading import threading
@ -33,7 +37,17 @@ import webob
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class TackerKeystoneContextTestCase(base.BaseTestCase): def _count_mock_history(history, *url):
req_count = 0
for mock_history in history:
actual_url = '{}://{}'.format(mock_history.scheme,
mock_history.hostname)
if actual_url in url:
req_count += 1
return req_count
class TackerKeystoneContextTestCase(test_base.BaseTestCase):
def setUp(self): def setUp(self):
super(TackerKeystoneContextTestCase, self).setUp() super(TackerKeystoneContextTestCase, self).setUp()
@ -126,12 +140,11 @@ class TackerKeystoneContextTestCase(base.BaseTestCase):
@ddt.ddt @ddt.ddt
class TestAuthManager(base.BaseTestCase): class TestAuthManager(test_base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestAuthManager, self).setUp() super(TestAuthManager, self).setUp()
self.token_endpoint_url = 'https://oauth2/tokens' self.url = 'https://oauth2/tokens'
self.oauth_url = 'https://oauth2'
self.user_name = 'test_user' self.user_name = 'test_user'
self.password = 'test_password' self.password = 'test_password'
auth.auth_manager = auth._AuthManager() auth.auth_manager = auth._AuthManager()
@ -153,7 +166,7 @@ class TestAuthManager(base.BaseTestCase):
def test_get_auth_client_oauth2_client_credentials_with_local(self): def test_get_auth_client_oauth2_client_credentials_with_local(self):
cfg.CONF.set_override('auth_type', 'OAUTH2_CLIENT_CREDENTIALS', cfg.CONF.set_override('auth_type', 'OAUTH2_CLIENT_CREDENTIALS',
group='authentication') group='authentication')
cfg.CONF.set_override('token_endpoint', self.token_endpoint_url, cfg.CONF.set_override('token_endpoint', self.url,
group='authentication') group='authentication')
cfg.CONF.set_override('client_id', self.user_name, cfg.CONF.set_override('client_id', self.user_name,
group='authentication') group='authentication')
@ -161,7 +174,7 @@ class TestAuthManager(base.BaseTestCase):
group='authentication') group='authentication')
self.requests_mock.register_uri('GET', self.requests_mock.register_uri('GET',
self.token_endpoint_url, self.url,
json={'access_token': 'test_token3', 'token_type': 'bearer'}, json={'access_token': 'test_token3', 'token_type': 'bearer'},
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
status_code=200) status_code=200)
@ -177,12 +190,11 @@ class TestAuthManager(base.BaseTestCase):
self.password, self.password,
client.grant.client_password) client.grant.client_password)
self.assertEqual( self.assertEqual(
self.token_endpoint_url, self.url,
client.grant.token_endpoint) client.grant.token_endpoint)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.oauth_url) self.assertEqual(1, len(history))
self.assertEqual(1, req_count)
def test_get_auth_client_basic_with_local(self): def test_get_auth_client_basic_with_local(self):
cfg.CONF.set_override('auth_type', 'BASIC', cfg.CONF.set_override('auth_type', 'BASIC',
@ -200,8 +212,7 @@ class TestAuthManager(base.BaseTestCase):
self.assertEqual(self.password, client.password) self.assertEqual(self.password, client.password)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.oauth_url) self.assertEqual(0, len(history))
self.assertEqual(0, req_count)
def test_get_auth_client_noauth_with_local(self): def test_get_auth_client_noauth_with_local(self):
cfg.CONF.set_override('auth_type', None, cfg.CONF.set_override('auth_type', None,
@ -211,12 +222,11 @@ class TestAuthManager(base.BaseTestCase):
self.assertIsInstance(client, requests.Session) self.assertIsInstance(client, requests.Session)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.oauth_url) self.assertEqual(0, len(history))
self.assertEqual(0, req_count)
def test_get_auth_client_oauth2_client_credentials_with_subscription(self): def test_get_auth_client_oauth2_client_credentials_with_subscription(self):
self.requests_mock.register_uri('GET', self.requests_mock.register_uri('GET',
self.token_endpoint_url, self.url,
json={'access_token': 'test_token', 'token_type': 'bearer'}, json={'access_token': 'test_token', 'token_type': 'bearer'},
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
status_code=200) status_code=200)
@ -224,7 +234,7 @@ class TestAuthManager(base.BaseTestCase):
params_oauth2_client_credentials = { params_oauth2_client_credentials = {
'clientId': self.user_name, 'clientId': self.user_name,
'clientPassword': self.password, 'clientPassword': self.password,
'tokenEndpoint': self.token_endpoint_url} 'tokenEndpoint': self.url}
auth.auth_manager.set_auth_client( auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id, id=uuidsentinel.subscription_id,
@ -241,12 +251,11 @@ class TestAuthManager(base.BaseTestCase):
self.password, self.password,
client.grant.client_password) client.grant.client_password)
self.assertEqual( self.assertEqual(
self.token_endpoint_url, self.url,
client.grant.token_endpoint) client.grant.token_endpoint)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.oauth_url) self.assertEqual(1, len(history))
self.assertEqual(1, req_count)
def test_get_auth_client_basic_with_subscription(self): def test_get_auth_client_basic_with_subscription(self):
params_basic = { params_basic = {
@ -265,8 +274,7 @@ class TestAuthManager(base.BaseTestCase):
self.assertEqual(self.password, client.password) self.assertEqual(self.password, client.password)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.oauth_url) self.assertEqual(0, len(history))
self.assertEqual(0, req_count)
def test_set_auth_client_noauth(self): def test_set_auth_client_noauth(self):
auth.auth_manager.set_auth_client( auth.auth_manager.set_auth_client(
@ -297,7 +305,7 @@ class TestAuthManager(base.BaseTestCase):
def test_set_auth_client_oauth2_client_credentials(self): def test_set_auth_client_oauth2_client_credentials(self):
self.requests_mock.register_uri( self.requests_mock.register_uri(
'GET', self.token_endpoint_url, 'GET', self.url,
json={ json={
'access_token': 'test_token', 'token_type': 'bearer'}, 'access_token': 'test_token', 'token_type': 'bearer'},
headers={ headers={
@ -307,7 +315,7 @@ class TestAuthManager(base.BaseTestCase):
params_oauth2_client_credentials = { params_oauth2_client_credentials = {
'clientId': self.user_name, 'clientId': self.user_name,
'clientPassword': self.password, 'clientPassword': self.password,
'tokenEndpoint': self.token_endpoint_url} 'tokenEndpoint': self.url}
auth.auth_manager.set_auth_client( auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id, id=uuidsentinel.subscription_id,
@ -326,12 +334,11 @@ class TestAuthManager(base.BaseTestCase):
self.password, self.password,
client.grant.client_password) client.grant.client_password)
self.assertEqual( self.assertEqual(
self.token_endpoint_url, self.url,
client.grant.token_endpoint) client.grant.token_endpoint)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.oauth_url) self.assertEqual(1, len(history))
self.assertEqual(1, req_count)
def test_set_auth_client_used_chahe(self): def test_set_auth_client_used_chahe(self):
params_basic = { params_basic = {
@ -346,7 +353,7 @@ class TestAuthManager(base.BaseTestCase):
params_oauth2_client_credentials = { params_oauth2_client_credentials = {
'clientId': self.user_name, 'clientId': self.user_name,
'clientPassword': self.password, 'clientPassword': self.password,
'tokenEndpoint': self.token_endpoint_url} 'tokenEndpoint': self.url}
auth.auth_manager.set_auth_client( auth.auth_manager.set_auth_client(
id=uuidsentinel.subscription_id, id=uuidsentinel.subscription_id,
@ -363,12 +370,11 @@ class TestAuthManager(base.BaseTestCase):
@ddt.ddt @ddt.ddt
class TestBasicAuthSession(base.BaseTestCase): class TestBasicAuthSession(test_base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestBasicAuthSession, self).setUp() super(TestBasicAuthSession, self).setUp()
self.token_endpoint_url = 'https://oauth2/tokens' self.url = 'https://oauth2/tokens'
self.nfvo_url = 'http://nfvo.co.jp'
self.user_name = 'test_user' self.user_name = 'test_user'
self.password = 'test_password' self.password = 'test_password'
self.requests_mock = self.useFixture(requests_mock_fixture.Fixture()) self.requests_mock = self.useFixture(requests_mock_fixture.Fixture())
@ -384,44 +390,43 @@ class TestBasicAuthSession(base.BaseTestCase):
password=self.password) password=self.password)
self.requests_mock.register_uri(http_method, self.requests_mock.register_uri(http_method,
self.nfvo_url, 'https://nfvo.co.jp',
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
status_code=200) status_code=200)
if http_method == 'GET': if http_method == 'GET':
response = client.get( response = client.get(
self.nfvo_url, 'https://nfvo.co.jp',
params={ params={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
elif http_method == 'PUT': elif http_method == 'PUT':
response = client.put( response = client.put(
self.nfvo_url, 'https://nfvo.co.jp',
data={ data={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
elif http_method == 'POST': elif http_method == 'POST':
response = client.post( response = client.post(
self.nfvo_url, 'https://nfvo.co.jp',
data={ data={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
elif http_method == 'DELETE': elif http_method == 'DELETE':
response = client.delete( response = client.delete(
self.nfvo_url, 'https://nfvo.co.jp',
params={ params={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
elif http_method == 'PATCH': elif http_method == 'PATCH':
response = client.patch( response = client.patch(
self.nfvo_url, 'https://nfvo.co.jp',
data={ data={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
self.assertEqual(200, response.status_code) self.assertEqual(200, response.status_code)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.nfvo_url) self.assertEqual(1, len(history))
self.assertEqual(1, req_count)
@ddt.ddt @ddt.ddt
class TestOAuth2Session(base.BaseTestCase): class TestOAuth2Session(test_base.BaseTestCase):
class MockThread(threading.Timer): class MockThread(threading.Timer):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
@ -433,8 +438,7 @@ class TestOAuth2Session(base.BaseTestCase):
def setUp(self): def setUp(self):
super(TestOAuth2Session, self).setUp() super(TestOAuth2Session, self).setUp()
self.token_endpoint_url = 'https://oauth2/tokens' self.url = 'https://oauth2/tokens'
self.oauth_url = 'https://oauth2'
self.user_name = 'test_user' self.user_name = 'test_user'
self.password = 'test_password' self.password = 'test_password'
self.requests_mock = self.useFixture(requests_mock_fixture.Fixture()) self.requests_mock = self.useFixture(requests_mock_fixture.Fixture())
@ -460,21 +464,19 @@ class TestOAuth2Session(base.BaseTestCase):
self.requests_mock.register_uri( self.requests_mock.register_uri(
'GET', 'GET',
self.token_endpoint_url, [res_mock, res_mock2]) self.url, [res_mock, res_mock2])
grant = auth._ClientCredentialsGrant( grant = auth._ClientCredentialsGrant(
client_id=self.user_name, client_id=self.user_name,
client_password=self.password, client_password=self.password,
token_endpoint=self.token_endpoint_url) token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m: with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
client = auth._OAuth2Session(grant) client = auth._OAuth2Session(grant)
client.apply_access_token_info() client.apply_access_token_info()
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.assertEqual(2, len(history))
self.oauth_url)
self.assertEqual(2, req_count)
self.assertEqual(1, m.call_count) self.assertEqual(1, m.call_count)
def test_apply_access_token_info_fail_error_response(self): def test_apply_access_token_info_fail_error_response(self):
@ -485,7 +487,7 @@ class TestOAuth2Session(base.BaseTestCase):
""" """
self.requests_mock.register_uri( self.requests_mock.register_uri(
'GET', 'GET',
self.token_endpoint_url, self.url,
headers={ headers={
'Content-Type': 'application/json;charset=UTF-8', 'Content-Type': 'application/json;charset=UTF-8',
'Cache-Control': 'no-store', 'Cache-Control': 'no-store',
@ -499,7 +501,7 @@ class TestOAuth2Session(base.BaseTestCase):
grant = auth._ClientCredentialsGrant( grant = auth._ClientCredentialsGrant(
client_id=self.user_name, client_id=self.user_name,
client_password=self.password, client_password=self.password,
token_endpoint=self.token_endpoint_url) token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m: with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
try: try:
@ -509,21 +511,19 @@ class TestOAuth2Session(base.BaseTestCase):
self.assertEqual(401, e.response.status_code) self.assertEqual(401, e.response.status_code)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.assertEqual(1, len(history))
self.oauth_url)
self.assertEqual(1, req_count)
self.assertEqual(0, m.call_count) self.assertEqual(0, m.call_count)
def test_apply_access_token_info_fail_timeout(self): def test_apply_access_token_info_fail_timeout(self):
self.requests_mock.register_uri( self.requests_mock.register_uri(
'GET', 'GET',
self.token_endpoint_url, self.url,
exc=requests.exceptions.ConnectTimeout) exc=requests.exceptions.ConnectTimeout)
grant = auth._ClientCredentialsGrant( grant = auth._ClientCredentialsGrant(
client_id=self.user_name, client_id=self.user_name,
client_password=self.password, client_password=self.password,
token_endpoint=self.token_endpoint_url) token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m: with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
try: try:
@ -533,15 +533,13 @@ class TestOAuth2Session(base.BaseTestCase):
self.assertIsNone(e.response) self.assertIsNone(e.response)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.assertEqual(1, len(history))
self.oauth_url)
self.assertEqual(1, req_count)
self.assertEqual(0, m.call_count) self.assertEqual(0, m.call_count)
def test_schedule_refrash_token_expaire(self): def test_schedule_refrash_token_expaire(self):
self.requests_mock.register_uri( self.requests_mock.register_uri(
'GET', 'GET',
self.token_endpoint_url, self.url,
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
json={ json={
'access_token': 'test_token', 'access_token': 'test_token',
@ -551,7 +549,7 @@ class TestOAuth2Session(base.BaseTestCase):
grant = auth._ClientCredentialsGrant( grant = auth._ClientCredentialsGrant(
client_id=self.user_name, client_id=self.user_name,
client_password=self.password, client_password=self.password,
token_endpoint=self.token_endpoint_url) token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m: with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
client = auth._OAuth2Session(grant) client = auth._OAuth2Session(grant)
@ -562,16 +560,14 @@ class TestOAuth2Session(base.BaseTestCase):
client.schedule_refrash_token() client.schedule_refrash_token()
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.assertEqual(1, len(history))
self.oauth_url)
self.assertEqual(1, req_count)
self.assertEqual(1, m.call_count) self.assertEqual(1, m.call_count)
def test_schedule_refrash_token_non_expaire(self): def test_schedule_refrash_token_non_expaire(self):
grant = auth._ClientCredentialsGrant( grant = auth._ClientCredentialsGrant(
client_id=self.user_name, client_id=self.user_name,
client_password=self.password, client_password=self.password,
token_endpoint=self.token_endpoint_url) token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m: with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
client = auth._OAuth2Session(grant) client = auth._OAuth2Session(grant)
@ -581,9 +577,7 @@ class TestOAuth2Session(base.BaseTestCase):
client.schedule_refrash_token() client.schedule_refrash_token()
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.assertEqual(0, len(history))
self.oauth_url)
self.assertEqual(0, req_count)
self.assertEqual(0, m.call_count) self.assertEqual(0, m.call_count)
@ddt.data(None, "") @ddt.data(None, "")
@ -591,7 +585,7 @@ class TestOAuth2Session(base.BaseTestCase):
grant = auth._ClientCredentialsGrant( grant = auth._ClientCredentialsGrant(
client_id=self.user_name, client_id=self.user_name,
client_password=self.password, client_password=self.password,
token_endpoint=self.token_endpoint_url) token_endpoint=self.url)
with mock.patch("threading.Timer", side_effect=self.MockThread) as m: with mock.patch("threading.Timer", side_effect=self.MockThread) as m:
client = auth._OAuth2Session(grant) client = auth._OAuth2Session(grant)
@ -602,15 +596,13 @@ class TestOAuth2Session(base.BaseTestCase):
client.schedule_refrash_token() client.schedule_refrash_token()
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.assertEqual(0, len(history))
self.oauth_url)
self.assertEqual(0, req_count)
self.assertEqual(0, m.call_count) self.assertEqual(0, m.call_count)
@ddt.data('GET', 'PUT', 'POST', 'DELETE', 'PATCH') @ddt.data('GET', 'PUT', 'POST', 'DELETE', 'PATCH')
def test_request_client_credentials(self, http_method): def test_request_client_credentials(self, http_method):
self.requests_mock.register_uri('GET', self.requests_mock.register_uri('GET',
self.token_endpoint_url, self.url,
json={'access_token': 'test_token3', 'token_type': 'bearer'}, json={'access_token': 'test_token3', 'token_type': 'bearer'},
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
status_code=200) status_code=200)
@ -618,49 +610,48 @@ class TestOAuth2Session(base.BaseTestCase):
grant = auth._ClientCredentialsGrant( grant = auth._ClientCredentialsGrant(
client_id=self.user_name, client_id=self.user_name,
client_password=self.password, client_password=self.password,
token_endpoint=self.token_endpoint_url) token_endpoint=self.url)
client = auth._OAuth2Session(grant) client = auth._OAuth2Session(grant)
client.apply_access_token_info() client.apply_access_token_info()
self.requests_mock.register_uri(http_method, self.requests_mock.register_uri(http_method,
self.oauth_url, 'https://nfvo.co.jp',
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
status_code=200) status_code=200)
if http_method == 'GET': if http_method == 'GET':
response = client.get( response = client.get(
self.oauth_url, 'https://nfvo.co.jp',
params={ params={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
elif http_method == 'PUT': elif http_method == 'PUT':
response = client.put( response = client.put(
self.oauth_url, 'https://nfvo.co.jp',
data={ data={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
elif http_method == 'POST': elif http_method == 'POST':
response = client.post( response = client.post(
self.oauth_url, 'https://nfvo.co.jp',
data={ data={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
elif http_method == 'DELETE': elif http_method == 'DELETE':
response = client.delete( response = client.delete(
self.oauth_url, 'https://nfvo.co.jp',
params={ params={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
elif http_method == 'PATCH': elif http_method == 'PATCH':
response = client.patch( response = client.patch(
self.oauth_url, 'https://nfvo.co.jp',
data={ data={
'sample_key': 'sample_value'}) 'sample_key': 'sample_value'})
self.assertEqual(200, response.status_code) self.assertEqual(200, response.status_code)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history(history, self.oauth_url) self.assertEqual(2, len(history))
self.assertEqual(2, req_count)
def test_request_client_credentials_auth_error(self): def test_request_client_credentials_auth_error(self):
self.requests_mock.register_uri('GET', self.requests_mock.register_uri('GET',
self.token_endpoint_url, self.url,
json={'access_token': 'test_token3', 'token_type': 'bearer'}, json={'access_token': 'test_token3', 'token_type': 'bearer'},
headers={'Content-Type': 'application/json'}, headers={'Content-Type': 'application/json'},
status_code=200) status_code=200)
@ -673,7 +664,7 @@ class TestOAuth2Session(base.BaseTestCase):
grant = auth._ClientCredentialsGrant( grant = auth._ClientCredentialsGrant(
client_id=self.user_name, client_id=self.user_name,
client_password=self.password, client_password=self.password,
token_endpoint=self.token_endpoint_url) token_endpoint=self.url)
client = auth._OAuth2Session(grant) client = auth._OAuth2Session(grant)
client.apply_access_token_info() client.apply_access_token_info()
@ -681,6 +672,215 @@ class TestOAuth2Session(base.BaseTestCase):
self.assertEqual(401, response.status_code) self.assertEqual(401, response.status_code)
history = self.requests_mock.request_history history = self.requests_mock.request_history
req_count = nfvo_client._count_mock_history( self.assertEqual(3, len(history))
history, self.oauth_url, 'https://nfvo.co.jp')
self.assertEqual(3, req_count)
class TestAuthValidateBearer(unit_base.FixturedTestCase):
def setUp(self):
super(TestAuthValidateBearer, self).setUp()
token_type = 'Bearer'
api_name = 'dummy'
token_value = 'SampleAccessToken'
application_type = vnflcm_router.VnflcmAPIRouter
self.auth_opts = [cfg.ListOpt('vnflcm_dummy_scope',
default='test_api',
help="OAuth2.0 api token scope for create")]
cfg.CONF.register_opts(self.auth_opts, group='authentication')
self.requests_mock = self.useFixture(requests_mock_fixture.Fixture())
self.url = 'http://auth/authorize/'
self.auth_bearer = auth._AuthValidateBearer(
application_type, api_name, token_type, token_value)
auth._AuthValidateManager()
def tearDown(self):
super(TestAuthValidateBearer, self).tearDown()
@mock.patch.object(auth._AuthValidateBearer, 'request')
def test_do_auth_no_response(self, mock_request):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
mock_request.return_value = None
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_bearer.do_auth)
def test_do_auth_no_token_value_in_response(self):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
cfg.CONF.set_override('auth_url', 'http://auth/authorize/',
group='authentication')
update = {'access_token': None}
json = fake_auth.fake_response(**update)
self.requests_mock.register_uri('GET',
self.url,
json=json,
headers={'Content-Type': 'application/json'},
status_code=200)
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_bearer.do_auth)
history = self.requests_mock.request_history
req_count = _count_mock_history(history, 'http://auth')
self.assertEqual(1, req_count)
def test_do_auth_no_token_type_in_response(self):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
cfg.CONF.set_override('auth_url', 'http://auth/authorize/',
group='authentication')
update = {'token_type': None}
json = fake_auth.fake_response(**update)
self.requests_mock.register_uri('GET',
self.url,
json=json,
headers={'Content-Type': 'application/json'},
status_code=200)
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_bearer.do_auth)
history = self.requests_mock.request_history
req_count = _count_mock_history(history, 'http://auth')
self.assertEqual(1, req_count)
def test_do_auth_invalid_token_value(self):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
cfg.CONF.set_override('auth_url', 'http://auth/authorize/',
group='authentication')
update = {'access_token': 'Test'}
json = fake_auth.fake_response(**update)
self.requests_mock.register_uri('GET',
self.url,
json=json,
headers={'Content-Type': 'application/json'},
status_code=200)
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_bearer.do_auth)
history = self.requests_mock.request_history
req_count = _count_mock_history(history, 'http://auth')
self.assertEqual(1, req_count)
def test_do_auth_invalid_scope(self):
cfg.CONF.set_override('token_type', 'Bearer',
group='authentication')
cfg.CONF.set_override('auth_url', 'http://auth/authorize/',
group='authentication')
json = fake_auth.fake_response()
self.requests_mock.register_uri('GET',
self.url,
json=json,
headers={'Content-Type': 'application/json'},
status_code=200)
self.assertRaises(webob.exc.HTTPForbidden, self.auth_bearer.do_auth)
history = self.requests_mock.request_history
req_count = _count_mock_history(history, 'http://auth')
self.assertEqual(1, req_count)
class TestAuthValidateBasic(unit_base.FixturedTestCase):
def setUp(self):
super(TestAuthValidateBasic, self).setUp()
self.api_name = 'test'
self.user_name = 'test_user'
self.password = 'test_pass'
self.token_type = 'Basic'
self.token_value = self._encode_base64(self.user_name + self.password)
self.auth_basic = auth._AuthValidateBasic(
self.api_name, self.token_type, self.token_value)
auth._AuthValidateManager()
def _encode_base64(self, info):
encode = base64.b64encode(info.encode())
return encode
def test_do_auth(self):
cfg.CONF.set_override('token_type', 'Basic',
group='authentication')
cfg.CONF.set_override('user_name', self.user_name,
group='authentication')
cfg.CONF.set_override('password', self.password,
group='authentication')
auth._AuthValidateBasic(self.api_name, self.token_type,
self.token_value)
self.auth_basic.do_auth()
def test_do_auth_invalid_token_value(self):
cfg.CONF.set_override('token_type', 'Basic',
group='authentication')
cfg.CONF.set_override('user_name', 'test',
group='authentication')
cfg.CONF.set_override('password', self.password,
group='authentication')
auth._AuthValidateBasic(
self.api_name,
self.token_type,
self.token_value)
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_basic.do_auth)
def test_do_auth_invalid_token_type(self):
cfg.CONF.set_override('token_type', 'Basic',
group='authentication')
self.auth_basic = auth._AuthValidateBasic(
'test', 'test_type', 'test_val')
self.assertRaises(webob.exc.HTTPUnauthorized, self.auth_basic.do_auth)
@ddt.ddt
class TestAuthValidateManager(unit_base.FixturedTestCase):
def setUp(self):
super(TestAuthValidateManager, self).setUp()
self.auth_validate = auth._AuthValidateManager()
@ddt.data(vnflcm_router.VnflcmAPIRouter, vnfpkgm_router.VnfpkgmAPIRouter)
def test_auth_main_bearer(self, obj):
mock_response = mock.MagicMock()
mock_response.request = mock.MagicMock()
mock_response.request.headers = {
'Authorization': 'Bearer 123456abc'}
mock_response.application.return_value = obj
ret = self.auth_validate._get_auth_type(
mock_response.request, mock_response.application)
self.assertIsInstance(ret, auth._AuthValidateBearer)
def test_auth_main_basic(self):
mock_response = mock.MagicMock()
mock_response.request = mock.MagicMock()
mock_response.request.headers = {
'Authorization': 'Basic 123456abc'}
mock_response.application = mock.MagicMock()
mock_response.application.return_value = vnflcm_router.VnflcmAPIRouter
ret = self.auth_validate._get_auth_type(
mock_response.request, mock_response.application)
self.assertIsInstance(ret, auth._AuthValidateBasic)
def test_auth_main_none(self):
mock_response = mock.MagicMock()
mock_response.request = mock.MagicMock()
mock_response.request.headers = {
'Authorization': 'Test 123456abc'}
mock_response.application.return_value = vnflcm_router.VnflcmAPIRouter
ret = self.auth_validate._get_auth_type(
mock_response.request, mock_response.application)
self.assertIsInstance(ret, auth._AuthValidateIgnore)
class TestAuthValidatorExecution(test_base.BaseTestCase):
def setUp(self):
super(TestAuthValidatorExecution, self).setUp()
@webob.dec.wsgify
def fake_app(req):
# self.context = req.environ['tacker.context']
return webob.Response()
self.context = None
self.middleware = auth.AuthValidatorExecution(fake_app)
self.request = webob.Request.blank('/')
self.request.headers['X_AUTH_TOKEN'] = 'testauthtoken'
@mock.patch.object(auth.auth_validator_manager, "auth_main")
def test_called(self, mock_auth_main):
response = self.request.get_response(self.middleware)
self.assertEqual('200 OK', response.status)