OAuth2.0 Client Credentials Grant Flow Support
Added a new OAuth2ClientCredential plugin, accessible via the 'v3oauth2clientcredential' entry point, making possible to authenticate using an application credentials as an OAuth2.0 client credentials. Change-Id: I77d6faef4cbc75abb8e7d86f386fb6d16e40cabf
This commit is contained in:
parent
2445a5df78
commit
aa9c5d230f
@ -61,6 +61,9 @@ V3ApplicationCredential = v3.ApplicationCredential
|
||||
V3MultiFactor = v3.MultiFactor
|
||||
"""See :class:`keystoneauth1.identity.v3.MultiFactor`"""
|
||||
|
||||
V3OAuth2ClientCredential = v3.OAuth2ClientCredential
|
||||
"""See :class:`keystoneauth1.identity.v3.OAuth2ClientCredential`"""
|
||||
|
||||
__all__ = ('BaseIdentityPlugin',
|
||||
'Password',
|
||||
'Token',
|
||||
@ -74,4 +77,5 @@ __all__ = ('BaseIdentityPlugin',
|
||||
'V3TOTP',
|
||||
'V3TokenlessAuth',
|
||||
'V3ApplicationCredential',
|
||||
'V3MultiFactor')
|
||||
'V3MultiFactor',
|
||||
'V3OAuth2ClientCredential')
|
||||
|
@ -23,6 +23,7 @@ from keystoneauth1.identity.v3.receipt import * # noqa
|
||||
from keystoneauth1.identity.v3.token import * # noqa
|
||||
from keystoneauth1.identity.v3.totp import * # noqa
|
||||
from keystoneauth1.identity.v3.tokenless_auth import * # noqa
|
||||
from keystoneauth1.identity.v3.oauth2_client_credential import * # noqa
|
||||
|
||||
|
||||
__all__ = ('ApplicationCredential',
|
||||
@ -55,4 +56,7 @@ __all__ = ('ApplicationCredential',
|
||||
|
||||
'ReceiptMethod',
|
||||
|
||||
'MultiFactor', )
|
||||
'MultiFactor',
|
||||
|
||||
'OAuth2ClientCredential',
|
||||
'OAuth2ClientCredentialMethod',)
|
||||
|
130
keystoneauth1/identity/v3/oauth2_client_credential.py
Normal file
130
keystoneauth1/identity/v3/oauth2_client_credential.py
Normal file
@ -0,0 +1,130 @@
|
||||
# Copyright 2022 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import requests.auth
|
||||
|
||||
from keystoneauth1.exceptions import ClientException
|
||||
from keystoneauth1.identity.v3 import base
|
||||
|
||||
__all__ = ('OAuth2ClientCredentialMethod', 'OAuth2ClientCredential')
|
||||
|
||||
|
||||
class OAuth2ClientCredentialMethod(base.AuthMethod):
|
||||
"""An auth method to fetch a token via an OAuth2.0 client credential.
|
||||
|
||||
:param string oauth2_endpoint: OAuth2.0 endpoint.
|
||||
:param string oauth2_client_id: OAuth2.0 client credential id.
|
||||
:param string oauth2_client_secret: OAuth2.0 client credential secret.
|
||||
"""
|
||||
|
||||
_method_parameters = [
|
||||
'oauth2_endpoint',
|
||||
'oauth2_client_id',
|
||||
'oauth2_client_secret'
|
||||
]
|
||||
|
||||
def get_auth_data(self, session, auth, headers, **kwargs):
|
||||
"""Return the authentication section of an auth plugin.
|
||||
|
||||
:param session: The communication session.
|
||||
:type session: keystoneauth1.session.Session
|
||||
:param base.Auth auth: The auth plugin calling the method.
|
||||
:param dict headers: The headers that will be sent with the auth
|
||||
request if a plugin needs to add to them.
|
||||
:return: The identifier of this plugin and a dict of authentication
|
||||
data for the auth type.
|
||||
:rtype: tuple(string, dict)
|
||||
"""
|
||||
|
||||
auth_data = {
|
||||
'id': self.oauth2_client_id,
|
||||
'secret': self.oauth2_client_secret
|
||||
}
|
||||
return 'application_credential', auth_data
|
||||
|
||||
def get_cache_id_elements(self):
|
||||
"""Get the elements for this auth method that make it unique.
|
||||
|
||||
These elements will be used as part of the
|
||||
:py:meth:`keystoneauth1.plugin.BaseIdentityPlugin.get_cache_id` to
|
||||
allow caching of the auth plugin.
|
||||
|
||||
Plugins should override this if they want to allow caching of their
|
||||
state.
|
||||
|
||||
To avoid collision or overrides the keys of the returned dictionary
|
||||
should be prefixed with the plugin identifier. For example the password
|
||||
plugin returns its username value as 'password_username'.
|
||||
"""
|
||||
|
||||
return dict(('oauth2_client_credential_%s' % p, getattr(self, p))
|
||||
for p in self._method_parameters)
|
||||
|
||||
|
||||
class OAuth2ClientCredential(base.AuthConstructor):
|
||||
"""A plugin for authenticating via an OAuth2.0 client credential.
|
||||
|
||||
:param string auth_url: Identity service endpoint for authentication.
|
||||
:param string oauth2_endpoint: OAuth2.0 endpoint.
|
||||
:param string oauth2_client_id: OAuth2.0 client credential id.
|
||||
:param string oauth2_client_secret: OAuth2.0 client credential secret.
|
||||
"""
|
||||
|
||||
_auth_method_class = OAuth2ClientCredentialMethod
|
||||
|
||||
def __init__(self, auth_url, *args, **kwargs):
|
||||
super(OAuth2ClientCredential, self).__init__(auth_url, *args, **kwargs)
|
||||
self._oauth2_endpoint = kwargs['oauth2_endpoint']
|
||||
self._oauth2_client_id = kwargs['oauth2_client_id']
|
||||
self._oauth2_client_secret = kwargs['oauth2_client_secret']
|
||||
|
||||
def get_headers(self, session, **kwargs):
|
||||
"""Fetch authentication headers for message.
|
||||
|
||||
:param session: The session object that the auth_plugin belongs to.
|
||||
:type session: keystoneauth1.session.Session
|
||||
|
||||
:returns: Headers that are set to authenticate a message or None for
|
||||
failure. Note that when checking this value that the empty
|
||||
dict is a valid, non-failure response.
|
||||
:rtype: dict
|
||||
"""
|
||||
|
||||
# get headers for X-Auth-Token
|
||||
headers = super(OAuth2ClientCredential, self).get_headers(
|
||||
session, **kwargs)
|
||||
|
||||
# Get OAuth2.0 access token and add the field 'Authorization'
|
||||
data = {"grant_type": "client_credentials"}
|
||||
auth = requests.auth.HTTPBasicAuth(self._oauth2_client_id,
|
||||
self._oauth2_client_secret)
|
||||
resp = session.request(self._oauth2_endpoint,
|
||||
"POST",
|
||||
authenticated=False,
|
||||
raise_exc=False,
|
||||
data=data,
|
||||
requests_auth=auth)
|
||||
if resp.status_code == 200:
|
||||
oauth2 = resp.json()
|
||||
oauth2_token = oauth2["access_token"]
|
||||
if headers:
|
||||
headers['Authorization'] = f'Bearer {oauth2_token}'
|
||||
else:
|
||||
headers = {'Authorization': f'Bearer {oauth2_token}'}
|
||||
else:
|
||||
error = resp.json()
|
||||
msg = error.get("error_description")
|
||||
raise ClientException(msg)
|
||||
|
||||
return headers
|
@ -338,3 +338,44 @@ class MultiFactor(loading.BaseV3Loader):
|
||||
self._methods = kwargs['auth_methods']
|
||||
|
||||
return super(MultiFactor, self).load_from_options(**kwargs)
|
||||
|
||||
|
||||
class OAuth2ClientCredential(loading.BaseV3Loader):
|
||||
|
||||
@property
|
||||
def plugin_class(self):
|
||||
return identity.V3OAuth2ClientCredential
|
||||
|
||||
def get_options(self):
|
||||
options = super(OAuth2ClientCredential, self).get_options()
|
||||
options.extend([
|
||||
loading.Opt('oauth2_endpoint',
|
||||
required=True,
|
||||
help='Endpoint for OAuth2.0'),
|
||||
]),
|
||||
options.extend([
|
||||
loading.Opt('oauth2_client_id',
|
||||
required=True,
|
||||
help='Client id for OAuth2.0'),
|
||||
]),
|
||||
options.extend([
|
||||
loading.Opt('oauth2_client_secret',
|
||||
secret=True,
|
||||
required=True,
|
||||
help='Client secret for OAuth2.0'),
|
||||
])
|
||||
|
||||
return options
|
||||
|
||||
def load_from_options(self, **kwargs):
|
||||
if not kwargs.get('oauth2_endpoint'):
|
||||
m = 'You must provide an OAuth2.0 endpoint.'
|
||||
raise exceptions.OptionError(m)
|
||||
if not kwargs.get('oauth2_client_id'):
|
||||
m = 'You must provide an OAuth2.0 client credential ID.'
|
||||
raise exceptions.OptionError(m)
|
||||
if not kwargs.get('oauth2_client_secret'):
|
||||
m = 'You must provide an OAuth2.0 client credential auth secret.'
|
||||
raise exceptions.OptionError(m)
|
||||
|
||||
return super(OAuth2ClientCredential, self).load_from_options(**kwargs)
|
||||
|
@ -13,11 +13,13 @@
|
||||
import copy
|
||||
import json
|
||||
import time
|
||||
import unittest
|
||||
import uuid
|
||||
|
||||
from keystoneauth1 import _utils as ksa_utils
|
||||
from keystoneauth1 import access
|
||||
from keystoneauth1 import exceptions
|
||||
from keystoneauth1.exceptions import ClientException
|
||||
from keystoneauth1 import fixture
|
||||
from keystoneauth1.identity import v3
|
||||
from keystoneauth1.identity.v3 import base as v3_base
|
||||
@ -37,6 +39,9 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
TEST_APP_CRED_ID = 'appcredid'
|
||||
TEST_APP_CRED_SECRET = 'secret'
|
||||
|
||||
TEST_CLIENT_CRED_ID = 'clientcredid'
|
||||
TEST_CLIENT_CRED_SECRET = 'secret'
|
||||
|
||||
TEST_SERVICE_CATALOG = [{
|
||||
"endpoints": [{
|
||||
"url": "http://cdn.admin-nets.local:8774/v1.0/",
|
||||
@ -822,3 +827,261 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
self.assertRequestHeaderEqual('Content-Type', 'application/json')
|
||||
self.assertRequestHeaderEqual('Accept', 'application/json')
|
||||
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
|
||||
def test_oauth2_client_credential_method_http(self):
|
||||
base_http = self.TEST_URL
|
||||
oauth2_endpoint = f'{self.TEST_URL}/oauth_token'
|
||||
oauth2_token = 'HW9bB6oYWJywz6mAN_KyIBXlof15Pk'
|
||||
self.stub_auth(json=self.TEST_APP_CRED_TOKEN_RESPONSE)
|
||||
client_cre = v3.OAuth2ClientCredential(
|
||||
base_http,
|
||||
oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_id=self.TEST_CLIENT_CRED_ID,
|
||||
oauth2_client_secret=self.TEST_CLIENT_CRED_SECRET
|
||||
)
|
||||
oauth2_resp = {
|
||||
'status_code': 200,
|
||||
'json': {
|
||||
'access_token': oauth2_token,
|
||||
'expires_in': 3600,
|
||||
'token_type': 'Bearer'
|
||||
}
|
||||
}
|
||||
self.requests_mock.post(oauth2_endpoint,
|
||||
[oauth2_resp])
|
||||
|
||||
sess = session.Session(auth=client_cre)
|
||||
initial_cache_id = client_cre.get_cache_id()
|
||||
|
||||
auth_head = sess.get_auth_headers()
|
||||
self.assertEqual(self.TEST_TOKEN, auth_head['X-Auth-Token'])
|
||||
self.assertEqual(f'Bearer {oauth2_token}', auth_head['Authorization'])
|
||||
|
||||
self.assertEqual(sess.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
self.assertEqual(initial_cache_id, client_cre.get_cache_id())
|
||||
|
||||
resp_ok = {
|
||||
'status_code': 200
|
||||
}
|
||||
self.requests_mock.post(f'{base_http}/test_api',
|
||||
[resp_ok])
|
||||
resp = sess.post(f'{base_http}/test_api', authenticated=True)
|
||||
self.assertRequestHeaderEqual('Authorization',
|
||||
f'Bearer {oauth2_token}')
|
||||
self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
def test_oauth2_client_credential_method_https(self):
|
||||
self.TEST_URL = self.TEST_URL.replace('http:', 'https:')
|
||||
base_https = self.TEST_URL
|
||||
oauth2_endpoint = f'{base_https}/oauth_token'
|
||||
oauth2_token = 'HW9bB6oYWJywz6mAN_KyIBXlof15Pk'
|
||||
self.stub_auth(json=self.TEST_APP_CRED_TOKEN_RESPONSE)
|
||||
client_cre = v3.OAuth2ClientCredential(
|
||||
base_https,
|
||||
oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_id=self.TEST_CLIENT_CRED_ID,
|
||||
oauth2_client_secret=self.TEST_CLIENT_CRED_SECRET
|
||||
)
|
||||
oauth2_resp = {
|
||||
'status_code': 200,
|
||||
'json': {
|
||||
'access_token': oauth2_token,
|
||||
'expires_in': 3600,
|
||||
'token_type': 'Bearer'
|
||||
}
|
||||
}
|
||||
self.requests_mock.post(oauth2_endpoint,
|
||||
[oauth2_resp])
|
||||
|
||||
sess = session.Session(auth=client_cre)
|
||||
initial_cache_id = client_cre.get_cache_id()
|
||||
|
||||
auth_head = sess.get_auth_headers()
|
||||
self.assertEqual(self.TEST_TOKEN, auth_head['X-Auth-Token'])
|
||||
self.assertEqual(f'Bearer {oauth2_token}', auth_head['Authorization'])
|
||||
|
||||
self.assertEqual(sess.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
self.assertEqual(initial_cache_id, client_cre.get_cache_id())
|
||||
|
||||
resp_ok = {
|
||||
'status_code': 200
|
||||
}
|
||||
self.requests_mock.post(f'{base_https}/test_api',
|
||||
[resp_ok])
|
||||
resp = sess.post(f'{base_https}/test_api', authenticated=True)
|
||||
self.assertRequestHeaderEqual('Authorization',
|
||||
f'Bearer {oauth2_token}')
|
||||
self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
def test_oauth2_client_credential_method_base_header_none(self):
|
||||
base_https = self.TEST_URL.replace('http:', 'https:')
|
||||
oauth2_endpoint = f'{base_https}/oauth_token'
|
||||
oauth2_token = 'HW9bB6oYWJywz6mAN_KyIBXlof15Pk'
|
||||
with unittest.mock.patch(
|
||||
'keystoneauth1.plugin.BaseAuthPlugin.'
|
||||
'get_headers') as co_mock:
|
||||
co_mock.return_value = None
|
||||
client_cre = v3.OAuth2ClientCredential(
|
||||
base_https,
|
||||
oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_id=self.TEST_CLIENT_CRED_ID,
|
||||
oauth2_client_secret=self.TEST_CLIENT_CRED_SECRET
|
||||
)
|
||||
oauth2_resp = {
|
||||
'status_code': 200,
|
||||
'json': {
|
||||
'access_token': oauth2_token,
|
||||
'expires_in': 3600,
|
||||
'token_type': 'Bearer'
|
||||
}
|
||||
}
|
||||
self.requests_mock.post(oauth2_endpoint,
|
||||
[oauth2_resp])
|
||||
|
||||
sess = session.Session(auth=client_cre)
|
||||
auth_head = sess.get_auth_headers()
|
||||
self.assertNotIn('X-Auth-Token', auth_head)
|
||||
self.assertEqual(f'Bearer {oauth2_token}',
|
||||
auth_head['Authorization'])
|
||||
|
||||
def test_oauth2_client_credential_method_rm_auth(self):
|
||||
base_https = self.TEST_URL.replace('http:', 'https:')
|
||||
base_http = self.TEST_URL
|
||||
oauth2_endpoint = f'{base_https}/oauth_token'
|
||||
oauth2_token = 'HW9bB6oYWJywz6mAN_KyIBXlof15Pk'
|
||||
self.stub_auth(json=self.TEST_APP_CRED_TOKEN_RESPONSE)
|
||||
client_cre = v3.OAuth2ClientCredential(
|
||||
base_http,
|
||||
oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_id=self.TEST_CLIENT_CRED_ID,
|
||||
oauth2_client_secret=self.TEST_CLIENT_CRED_SECRET
|
||||
)
|
||||
oauth2_resp = {
|
||||
'status_code': 200,
|
||||
'json': {
|
||||
'access_token': oauth2_token,
|
||||
'expires_in': 3600,
|
||||
'token_type': 'Bearer'
|
||||
}
|
||||
}
|
||||
self.requests_mock.post(oauth2_endpoint,
|
||||
[oauth2_resp])
|
||||
|
||||
sess = session.Session(auth=client_cre)
|
||||
initial_cache_id = client_cre.get_cache_id()
|
||||
|
||||
auth_head = sess.get_auth_headers()
|
||||
self.assertEqual(self.TEST_TOKEN, auth_head['X-Auth-Token'])
|
||||
self.assertEqual(f'Bearer {oauth2_token}', auth_head['Authorization'])
|
||||
|
||||
self.assertEqual(sess.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
self.assertEqual(initial_cache_id, client_cre.get_cache_id())
|
||||
|
||||
resp_ok = {
|
||||
'status_code': 200
|
||||
}
|
||||
self.requests_mock.post(f'{base_http}/test_api',
|
||||
[resp_ok])
|
||||
resp = sess.post(f'{base_http}/test_api', authenticated=True)
|
||||
self.assertRequestHeaderEqual('Authorization',
|
||||
f'Bearer {oauth2_token}')
|
||||
self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
def test_oauth2_client_credential_method_other_not_rm_auth(self):
|
||||
base_https = self.TEST_URL.replace('http:', 'https:')
|
||||
other_auth_token = 'HW9bB6oYWJywz6mAN_KyIBXlof15Pk'
|
||||
self.stub_auth(json=self.TEST_APP_CRED_TOKEN_RESPONSE)
|
||||
with unittest.mock.patch(
|
||||
'keystoneauth1.identity.v3.Password.get_headers') as co_mock:
|
||||
co_mock.return_value = {
|
||||
'X-Auth-Token': self.TEST_TOKEN,
|
||||
'Authorization': other_auth_token
|
||||
}
|
||||
pass_auth = v3.Password(base_https,
|
||||
username=self.TEST_USER,
|
||||
password=self.TEST_PASS,
|
||||
include_catalog=False)
|
||||
sess = session.Session(auth=pass_auth)
|
||||
|
||||
resp_ok = {
|
||||
'status_code': 200
|
||||
}
|
||||
self.requests_mock.post(f'{base_https}/test_api',
|
||||
[resp_ok])
|
||||
resp = sess.post(f'{base_https}/test_api', authenticated=True)
|
||||
self.assertRequestHeaderEqual('Authorization', other_auth_token)
|
||||
self.assertRequestHeaderEqual('X-Auth-Token', self.TEST_TOKEN)
|
||||
self.assertEqual(200, resp.status_code)
|
||||
|
||||
def test_oauth2_client_credential_method_500(self):
|
||||
self.TEST_URL = self.TEST_URL.replace('http:', 'https:')
|
||||
base_https = self.TEST_URL
|
||||
oauth2_endpoint = f'{base_https}/oauth_token'
|
||||
self.stub_auth(json=self.TEST_APP_CRED_TOKEN_RESPONSE)
|
||||
client_cre = v3.OAuth2ClientCredential(
|
||||
base_https,
|
||||
oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_id=self.TEST_CLIENT_CRED_ID,
|
||||
oauth2_client_secret=self.TEST_CLIENT_CRED_SECRET
|
||||
)
|
||||
oauth2_resp = {
|
||||
'status_code': 500,
|
||||
'json': {
|
||||
'error': 'other_error',
|
||||
'error_description':
|
||||
'Unknown error is occur.'
|
||||
}
|
||||
}
|
||||
self.requests_mock.post(oauth2_endpoint,
|
||||
[oauth2_resp])
|
||||
|
||||
sess = session.Session(auth=client_cre)
|
||||
err = self.assertRaises(ClientException, sess.get_auth_headers)
|
||||
self.assertEqual('Unknown error is occur.',
|
||||
str(err))
|
||||
|
||||
def test_oauth2_client_credential_reauth_called_https(self):
|
||||
base_https = self.TEST_URL.replace('http:', 'https:')
|
||||
oauth2_endpoint = f'{base_https}/oauth_token'
|
||||
oauth2_token = 'HW9bB6oYWJywz6mAN_KyIBXlof15Pk'
|
||||
auth = v3.OAuth2ClientCredential(
|
||||
base_https,
|
||||
oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_id='clientcredid',
|
||||
oauth2_client_secret='secret'
|
||||
)
|
||||
oauth2_resp = {
|
||||
'status_code': 200,
|
||||
'json': {
|
||||
'access_token': oauth2_token,
|
||||
'expires_in': 3600,
|
||||
'token_type': 'Bearer'
|
||||
}
|
||||
}
|
||||
self.requests_mock.post(oauth2_endpoint,
|
||||
[oauth2_resp])
|
||||
|
||||
sess = session.Session(auth=auth)
|
||||
|
||||
resp_text = json.dumps(self.TEST_APP_CRED_TOKEN_RESPONSE)
|
||||
resp_ok = {
|
||||
'status_code': 200,
|
||||
'headers': {
|
||||
'Content-Type': 'application/json',
|
||||
'x-subject-token': self.TEST_TOKEN
|
||||
},
|
||||
'text': resp_text
|
||||
}
|
||||
self.requests_mock.post(f'{base_https}/auth/tokens',
|
||||
[resp_ok,
|
||||
{'text': 'Failed', 'status_code': 401},
|
||||
resp_ok])
|
||||
|
||||
resp = sess.post(f'{base_https}/auth/tokens', authenticated=True)
|
||||
self.assertRequestHeaderEqual('Authorization',
|
||||
f'Bearer {oauth2_token}')
|
||||
self.assertEqual(200, resp.status_code)
|
||||
self.assertEqual(resp_text, resp.text)
|
||||
|
@ -486,3 +486,56 @@ class MultiFactorTests(utils.TestCase):
|
||||
username=uuid.uuid4().hex,
|
||||
project_name=uuid.uuid4().hex,
|
||||
project_domain_id=uuid.uuid4().hex)
|
||||
|
||||
|
||||
class V3Oauth2ClientCredentialTests(utils.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(V3Oauth2ClientCredentialTests, self).setUp()
|
||||
|
||||
self.auth_url = uuid.uuid4().hex
|
||||
|
||||
def create(self, **kwargs):
|
||||
kwargs.setdefault('auth_url', self.auth_url)
|
||||
loader = loading.get_plugin_loader('v3oauth2clientcredential')
|
||||
return loader.load_from_options(**kwargs)
|
||||
|
||||
def test_basic(self):
|
||||
id = uuid.uuid4().hex
|
||||
secret = uuid.uuid4().hex
|
||||
oauth2_endpoint = "https://localhost/token"
|
||||
|
||||
client_cred = self.create(oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_id=id,
|
||||
oauth2_client_secret=secret)
|
||||
|
||||
client_method = client_cred.auth_methods[0]
|
||||
self.assertEqual(id, client_method.oauth2_client_id)
|
||||
self.assertEqual(secret, client_method.oauth2_client_secret)
|
||||
self.assertEqual(oauth2_endpoint, client_method.oauth2_endpoint)
|
||||
|
||||
self.assertEqual(id, client_cred._oauth2_client_id)
|
||||
self.assertEqual(secret, client_cred._oauth2_client_secret)
|
||||
self.assertEqual(oauth2_endpoint, client_cred._oauth2_endpoint)
|
||||
|
||||
def test_without_oauth2_endpoint(self):
|
||||
id = uuid.uuid4().hex
|
||||
secret = uuid.uuid4().hex
|
||||
self.assertRaises(exceptions.OptionError,
|
||||
self.create,
|
||||
oauth2_client_id=id,
|
||||
oauth2_client_secret=secret)
|
||||
|
||||
def test_without_client_id(self):
|
||||
oauth2_endpoint = "https://localhost/token"
|
||||
self.assertRaises(exceptions.OptionError,
|
||||
self.create,
|
||||
oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_secret=uuid.uuid4().hex)
|
||||
|
||||
def test_without_secret(self):
|
||||
oauth2_endpoint = "https://localhost/token"
|
||||
self.assertRaises(exceptions.OptionError,
|
||||
self.create,
|
||||
oauth2_endpoint=oauth2_endpoint,
|
||||
oauth2_client_id=uuid.uuid4().hex)
|
||||
|
@ -0,0 +1,10 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
[`blueprint oauth2-client-credentials-ext <https://blueprints.launchpad.net/keystone/+spec/oauth2-client-credentials-ext>`_]
|
||||
Added a new OAuth2ClientCredential plugin, accessible via the
|
||||
'v3oauth2clientcredential' entry point, making possible to authenticate
|
||||
using an application credentials as an OAuth2.0 client credentials.
|
||||
Keystoneauth can now be used to access the OpenStack APIs that use the
|
||||
keystone middleware to support OAuth2.0 client credentials authentication
|
||||
through the keystone identity server.
|
@ -61,3 +61,4 @@ keystoneauth1.plugin =
|
||||
v3samlpassword = keystoneauth1.extras._saml2._loading:Saml2Password
|
||||
v3applicationcredential = keystoneauth1.loading._plugins.identity.v3:ApplicationCredential
|
||||
v3multifactor = keystoneauth1.loading._plugins.identity.v3:MultiFactor
|
||||
v3oauth2clientcredential = keystoneauth1.loading._plugins.identity.v3:OAuth2ClientCredential
|
||||
|
Loading…
Reference in New Issue
Block a user