Merge "OAuth2.0 Client Credentials Grant Flow Support"

This commit is contained in:
Zuul 2022-08-26 17:09:42 +00:00 committed by Gerrit Code Review
commit 051aca8e8a
12 changed files with 843 additions and 4 deletions

View File

@ -22,6 +22,7 @@ from keystone.api import os_ep_filter
from keystone.api import os_federation
from keystone.api import os_inherit
from keystone.api import os_oauth1
from keystone.api import os_oauth2
from keystone.api import os_revoke
from keystone.api import os_simple_cert
from keystone.api import policy
@ -50,6 +51,7 @@ __all__ = (
'os_federation',
'os_inherit',
'os_oauth1',
'os_oauth2',
'os_revoke',
'os_simple_cert',
'policy',
@ -79,6 +81,7 @@ __apis__ = (
os_federation,
os_inherit,
os_oauth1,
os_oauth2,
os_revoke,
os_simple_cert,
policy,

View File

@ -45,6 +45,14 @@ os_oauth1_parameter_rel_func = functools.partial(
json_home.build_v3_extension_parameter_relation,
extension_name='OS-OAUTH1', extension_version='1.0')
# OS-OAUTH2 "extension"
os_oauth2_resource_rel_func = functools.partial(
json_home.build_v3_extension_resource_relation,
extension_name='OS-OAUTH2', extension_version='1.0')
os_oauth2_parameter_rel_func = functools.partial(
json_home.build_v3_extension_parameter_relation,
extension_name='OS-OAUTH2', extension_version='1.0')
# OS-REVOKE "extension"
os_revoke_resource_rel_func = functools.partial(
json_home.build_v3_extension_resource_relation,

188
keystone/api/os_oauth2.py Normal file
View File

@ -0,0 +1,188 @@
# Copyright 2022 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import flask
from flask import make_response
import http.client
from oslo_log import log
from keystone.api._shared import authentication
from keystone.api._shared import json_home_relations
from keystone.conf import CONF
from keystone import exception
from keystone.i18n import _
from keystone.server import flask as ks_flask
LOG = log.getLogger(__name__)
_build_resource_relation = json_home_relations.os_oauth2_resource_rel_func
class AccessTokenResource(ks_flask.ResourceBase):
def _method_not_allowed(self):
"""Raise a method not allowed error"""
raise exception.OAuth2OtherError(
int(http.client.METHOD_NOT_ALLOWED),
http.client.responses[http.client.METHOD_NOT_ALLOWED],
_('The method is not allowed for the requested URL.'))
@ks_flask.unenforced_api
def get(self):
"""The method is not allowed"""
self._method_not_allowed()
@ks_flask.unenforced_api
def head(self):
"""The method is not allowed"""
self._method_not_allowed()
@ks_flask.unenforced_api
def put(self):
"""The method is not allowed"""
self._method_not_allowed()
@ks_flask.unenforced_api
def patch(self):
"""The method is not allowed"""
self._method_not_allowed()
@ks_flask.unenforced_api
def delete(self):
"""The method is not allowed"""
self._method_not_allowed()
@ks_flask.unenforced_api
def post(self):
"""Get an OAuth2.0 Access Token.
POST /v3/OS-OAUTH2/token
"""
client_auth = flask.request.authorization
if not client_auth:
error = exception.OAuth2InvalidClient(
int(http.client.UNAUTHORIZED),
http.client.responses[http.client.UNAUTHORIZED],
_('OAuth2.0 client authorization is required.'))
LOG.info('Get OAuth2.0 Access Token API: '
'field \'authorization\' is not found in HTTP Headers.')
raise error
if client_auth.type != 'basic':
error = exception.OAuth2InvalidClient(
int(http.client.UNAUTHORIZED),
http.client.responses[http.client.UNAUTHORIZED],
_('OAuth2.0 client authorization type %s is not supported.')
% client_auth.type)
LOG.info('Get OAuth2.0 Access Token API: '
f'{error.message_format}')
raise error
client_id = client_auth.username
client_secret = client_auth.password
if not client_id:
error = exception.OAuth2InvalidClient(
int(http.client.UNAUTHORIZED),
http.client.responses[http.client.UNAUTHORIZED],
_('OAuth2.0 client authorization is invalid.'))
LOG.info('Get OAuth2.0 Access Token API: '
'client_id is not found in authorization.')
raise error
if not client_secret:
error = exception.OAuth2InvalidClient(
int(http.client.UNAUTHORIZED),
http.client.responses[http.client.UNAUTHORIZED],
_('OAuth2.0 client authorization is invalid.'))
LOG.info('Get OAuth2.0 Access Token API: '
'client_secret is not found in authorization.')
raise error
grant_type = flask.request.form.get('grant_type')
if grant_type is None:
error = exception.OAuth2InvalidRequest(
int(http.client.BAD_REQUEST),
http.client.responses[http.client.BAD_REQUEST],
_('The parameter grant_type is required.'))
LOG.info('Get OAuth2.0 Access Token API: '
f'{error.message_format}')
raise error
if grant_type != 'client_credentials':
error = exception.OAuth2UnsupportedGrantType(
int(http.client.BAD_REQUEST),
http.client.responses[http.client.BAD_REQUEST],
_('The parameter grant_type %s is not supported.'
) % grant_type)
LOG.info('Get OAuth2.0 Access Token API: '
f'{error.message_format}')
raise error
auth_data = {
'identity': {
'methods': ['application_credential'],
'application_credential': {
'id': client_id,
'secret': client_secret
}
}
}
try:
token = authentication.authenticate_for_token(auth_data)
except exception.Error as error:
if error.code == 401:
error = exception.OAuth2InvalidClient(
error.code, error.title,
str(error))
elif error.code == 400:
error = exception.OAuth2InvalidRequest(
error.code, error.title,
str(error))
else:
error = exception.OAuth2OtherError(
error.code, error.title,
'An unknown error occurred and failed to get an OAuth2.0 '
'access token.')
LOG.exception(error)
raise error
except Exception as error:
error = exception.OAuth2OtherError(
int(http.client.INTERNAL_SERVER_ERROR),
http.client.responses[http.client.INTERNAL_SERVER_ERROR],
str(error))
LOG.exception(error)
raise error
resp = make_response({
'access_token': token.id,
'token_type': 'Bearer',
'expires_in': CONF.token.expiration
})
resp.status = '200 OK'
return resp
class OSAuth2API(ks_flask.APIBase):
_name = 'OS-OAUTH2'
_import_name = __name__
_api_url_prefix = '/OS-OAUTH2'
resource_mapping = [
ks_flask.construct_resource_map(
resource=AccessTokenResource,
url='/token',
rel='token',
resource_kwargs={},
resource_relation_func=_build_resource_relation
)]
APIs = (OSAuth2API,)

View File

@ -722,3 +722,36 @@ class ResourceDeleteForbidden(ForbiddenNotSecurity):
message_format = _('Unable to delete immutable %(type)s resource: '
'`%(resource_id)s. Set resource option "immutable" '
'to false first.')
class OAuth2Error(Error):
def __init__(self, code, title, error_title, message):
self.code = code
self.title = title
self.error_title = error_title
self.message_format = message
class OAuth2InvalidClient(OAuth2Error):
def __init__(self, code, title, message):
error_title = 'invalid_client'
super().__init__(code, title, error_title, message)
class OAuth2InvalidRequest(OAuth2Error):
def __init__(self, code, title, message):
error_title = 'invalid_request'
super().__init__(code, title, error_title, message)
class OAuth2UnsupportedGrantType(OAuth2Error):
def __init__(self, code, title, message):
error_title = 'unsupported_grant_type'
super().__init__(code, title, error_title, message)
class OAuth2OtherError(OAuth2Error):
def __init__(self, code, title, message):
error_title = 'other_error'
super().__init__(code, title, error_title, message)

View File

View File

@ -0,0 +1,30 @@
# Copyright 2022 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import flask
from keystone.server import flask as ks_flask
def build_response(error):
response = flask.make_response((
{
'error': error.error_title,
'error_description': error.message_format
},
f"{error.code} {error.title}"))
if error.code == 401:
response.headers['WWW-Authenticate'] = \
'Keystone uri="%s"' % ks_flask.base_url()
return response

View File

@ -27,12 +27,12 @@ except ImportError:
import keystone.api
from keystone import exception
from keystone.oauth2 import handlers as oauth2_handlers
from keystone.receipt import handlers as receipt_handlers
from keystone.server.flask import common as ks_flask
from keystone.server.flask.request_processing import json_body
from keystone.server.flask.request_processing import req_logging
from keystone.receipt import handlers as receipt_handlers
LOG = log.getLogger(__name__)
@ -75,6 +75,8 @@ def _handle_keystone_exception(error):
# TODO(adriant): register this with its own specific handler:
if isinstance(error, exception.InsufficientAuthMethods):
return receipt_handlers.build_receipt(error)
elif isinstance(error, exception.OAuth2Error):
return oauth2_handlers.build_response(error)
# Handle logging
if isinstance(error, exception.Unauthorized):

View File

@ -29,6 +29,13 @@ def json_body_before_request():
# exit if there is nothing to be done, (no body)
if not flask.request.get_data():
return None
elif flask.request.path and flask.request.path.startswith(
'/v3/OS-OAUTH2/'):
# When the user makes a request to the OAuth2.0 token endpoint,
# the user should use the "application/x-www-form-urlencoded" format
# with a character encoding of UTF-8 in the HTTP request entity-body.
# At the scenario there is nothing to be done and exit.
return None
try:
# flask does loading for us for json, use the flask default loader

View File

@ -13,12 +13,11 @@
# under the License.
import datetime
import uuid
import http.client
import oslo_context.context
from oslo_serialization import jsonutils
from testtools import matchers
import uuid
import webtest
from keystone.common import authorization
@ -1238,6 +1237,13 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
return environment
class OAuth2RestfulTestCase(RestfulTestCase):
def assertValidErrorResponse(self, response):
resp = response.result
self.assertIsNotNone(resp.get('error'))
self.assertIsNotNone(resp.get('error_description'))
class VersionTestCase(RestfulTestCase):
def test_get_version(self):
pass

View File

@ -0,0 +1,550 @@
# Copyright 2022 openStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from base64 import b64encode
from http import client
from oslo_log import log
from oslo_serialization import jsonutils
from unittest import mock
from urllib import parse
from keystone import conf
from keystone import exception
from keystone.tests.unit import test_v3
LOG = log.getLogger(__name__)
CONF = conf.CONF
class FakeUserAppCredListCreateResource(mock.Mock):
pass
class OAuth2Tests(test_v3.OAuth2RestfulTestCase):
APP_CRED_CREATE_URL = '/users/%(user_id)s/application_credentials'
APP_CRED_LIST_URL = '/users/%(user_id)s/application_credentials'
APP_CRED_DELETE_URL = '/users/%(user_id)s/application_credentials/' \
'%(app_cred_id)s'
APP_CRED_SHOW_URL = '/users/%(user_id)s/application_credentials/' \
'%(app_cred_id)s'
ACCESS_TOKEN_URL = '/OS-OAUTH2/token'
def setUp(self):
super(OAuth2Tests, self).setUp()
log.set_defaults(
logging_context_format_string='%(asctime)s.%(msecs)03d %('
'color)s%(levelname)s %(name)s [^[['
'01;36m%(request_id)s ^[[00;36m%('
'project_name)s %(user_name)s%('
'color)s] ^[[01;35m%(instance)s%('
'color)s%(message)s^[[00m',
default_log_levels=log.DEBUG)
CONF.log_opt_values(LOG, log.DEBUG)
LOG.debug(f'is_debug_enabled: {log.is_debug_enabled(CONF)}')
LOG.debug(f'get_default_log_levels: {log.get_default_log_levels()}')
def _assert_error_resp(self, error_resp, error_msg, error_description):
resp_keys = (
'error', 'error_description'
)
for key in resp_keys:
self.assertIsNotNone(error_resp.get(key, None))
self.assertEqual(error_msg, error_resp.get('error'))
self.assertEqual(error_description,
error_resp.get('error_description'))
def _create_app_cred(self, user_id, app_cred_name):
resp = self.post(
self.APP_CRED_CREATE_URL % {'user_id': user_id},
body={'application_credential': {'name': app_cred_name}}
)
LOG.debug(f'resp: {resp}')
app_ref = resp.result['application_credential']
return app_ref
def _delete_app_cred(self, user_id, app_cred_id):
resp = self.delete(
self.APP_CRED_CREATE_URL % {'user_id': user_id,
'app_cred_id': app_cred_id})
LOG.debug(f'resp: {resp}')
def _get_access_token(self, app_cred, b64str, headers, data,
expected_status):
if b64str is None:
client_id = app_cred.get('id')
client_secret = app_cred.get('secret')
b64str = b64encode(
f'{client_id}:{client_secret}'.encode()).decode().strip()
if headers is None:
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {b64str}'
}
if data is None:
data = {
'grant_type': 'client_credentials'
}
data = parse.urlencode(data).encode()
resp = self.post(
self.ACCESS_TOKEN_URL,
headers=headers,
convert=False,
body=data,
expected_status=expected_status)
return resp
class AccessTokenTests(OAuth2Tests):
def setUp(self):
super(AccessTokenTests, self).setUp()
def _create_access_token(self, client):
pass
def _get_access_token_method_not_allowed(self, app_cred,
http_func):
client_id = app_cred.get('id')
client_secret = app_cred.get('secret')
b64str = b64encode(
f'{client_id}:{client_secret}'.encode()).decode().strip()
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {b64str}'
}
data = {
'grant_type': 'client_credentials'
}
data = parse.urlencode(data).encode()
resp = http_func(
self.ACCESS_TOKEN_URL,
headers=headers,
convert=False,
body=data,
expected_status=client.METHOD_NOT_ALLOWED)
LOG.debug(f'response: {resp}')
json_resp = jsonutils.loads(resp.body)
return json_resp
def test_get_access_token(self):
"""Test case when an access token can be successfully obtain."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
resp = self._get_access_token(
app_cred,
b64str=None,
headers=None,
data=None,
expected_status=client.OK)
json_resp = jsonutils.loads(resp.body)
self.assertIn('access_token', json_resp)
self.assertEqual('Bearer', json_resp['token_type'])
self.assertEqual(3600, json_resp['expires_in'])
def test_get_access_token_without_client_auth(self):
"""Test case when there is no client authorization."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
error = 'invalid_client'
error_description = 'OAuth2.0 client authorization is required.'
resp = self._get_access_token(app_cred,
b64str=None,
headers=headers,
data=None,
expected_status=client.UNAUTHORIZED)
self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
self.assertEqual('Keystone uri="http://localhost/v3"',
resp.headers.get("WWW-Authenticate"))
json_resp = jsonutils.loads(resp.body)
LOG.debug(f'error: {json_resp.get("error")}')
LOG.debug(f'error_description: {json_resp.get("error_description")}')
self.assertEqual(error,
json_resp.get('error'))
self.assertEqual(error_description,
json_resp.get('error_description'))
def test_get_access_token_auth_type_is_not_basic(self):
"""Test case when auth_type is not basic."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
client_id = app_cred.get('id')
base = 'username="%s", realm="%s", nonce="%s", uri="%s", ' \
'response="%s"' % (
client_id, 'realm', 'nonce', 'path', 'responding')
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Digest {base}'
}
error = 'invalid_client'
error_description = 'OAuth2.0 client authorization type ' \
'digest is not supported.'
resp = self._get_access_token(app_cred,
b64str=None,
headers=headers,
data=None,
expected_status=client.UNAUTHORIZED)
self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
self.assertEqual('Keystone uri="http://localhost/v3"',
resp.headers.get("WWW-Authenticate"))
json_resp = jsonutils.loads(resp.body)
LOG.debug(f'error: {json_resp.get("error")}')
LOG.debug(f'error_description: {json_resp.get("error_description")}')
self.assertEqual(error,
json_resp.get('error'))
self.assertEqual(error_description,
json_resp.get('error_description'))
def test_get_access_token_without_client_id(self):
"""Test case when there is no client_id."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
client_secret = app_cred.get('secret')
b64str = b64encode(
f':{client_secret}'.encode()).decode().strip()
error = 'invalid_client'
error_description = 'OAuth2.0 client authorization is invalid.'
resp = self._get_access_token(app_cred,
b64str=b64str,
headers=None,
data=None,
expected_status=client.UNAUTHORIZED)
self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
self.assertEqual('Keystone uri="http://localhost/v3"',
resp.headers.get("WWW-Authenticate"))
json_resp = jsonutils.loads(resp.body)
LOG.debug(f'error: {json_resp.get("error")}')
LOG.debug(f'error_description: {json_resp.get("error_description")}')
self.assertEqual(error,
json_resp.get('error'))
self.assertEqual(error_description,
json_resp.get('error_description'))
def test_get_access_token_without_client_secret(self):
"""Test case when there is no client_secret."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
client_id = app_cred.get('id')
b64str = b64encode(
f'{client_id}:'.encode()).decode().strip()
error = 'invalid_client'
error_description = 'OAuth2.0 client authorization is invalid.'
resp = self._get_access_token(app_cred,
b64str=b64str,
headers=None,
data=None,
expected_status=client.UNAUTHORIZED)
self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
self.assertEqual('Keystone uri="http://localhost/v3"',
resp.headers.get("WWW-Authenticate"))
json_resp = jsonutils.loads(resp.body)
LOG.debug(f'error: {json_resp.get("error")}')
LOG.debug(f'error_description: {json_resp.get("error_description")}')
self.assertEqual(error,
json_resp.get('error'))
self.assertEqual(error_description,
json_resp.get('error_description'))
def test_get_access_token_without_grant_type(self):
"""Test case when there is no grant_type."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
data = {}
error = 'invalid_request'
error_description = 'The parameter grant_type is required.'
resp = self._get_access_token(app_cred,
b64str=None,
headers=None,
data=data,
expected_status=client.BAD_REQUEST)
json_resp = jsonutils.loads(resp.body)
LOG.debug(f'error: {json_resp.get("error")}')
LOG.debug(f'error_description: {json_resp.get("error_description")}')
self.assertEqual(error,
json_resp.get('error'))
self.assertEqual(error_description,
json_resp.get('error_description'))
def test_get_access_token_blank_grant_type(self):
"""Test case when grant_type is blank."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
data = {
'grant_type': ''
}
error = 'unsupported_grant_type'
error_description = 'The parameter grant_type ' \
' is not supported.'
resp = self._get_access_token(app_cred,
b64str=None,
headers=None,
data=data,
expected_status=client.BAD_REQUEST)
json_resp = jsonutils.loads(resp.body)
LOG.debug(f'error: {json_resp.get("error")}')
LOG.debug(f'error_description: {json_resp.get("error_description")}')
self.assertEqual(error,
json_resp.get('error'))
self.assertEqual(error_description,
json_resp.get('error_description'))
def test_get_access_token_grant_type_is_not_client_credentials(self):
"""Test case when grant_type is not client_credentials."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
data = {
'grant_type': 'not_client_credentials'
}
error = 'unsupported_grant_type'
error_description = 'The parameter grant_type ' \
'not_client_credentials is not supported.'
resp = self._get_access_token(app_cred,
b64str=None,
headers=None,
data=data,
expected_status=client.BAD_REQUEST)
json_resp = jsonutils.loads(resp.body)
LOG.debug(f'error: {json_resp.get("error")}')
LOG.debug(f'error_description: {json_resp.get("error_description")}')
self.assertEqual(error,
json_resp.get('error'))
self.assertEqual(error_description,
json_resp.get('error_description'))
def test_get_access_token_failed_401(self):
"""Test case when client authentication failed."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
error = 'invalid_client'
client_id = app_cred.get('id')
client_secret = app_cred.get('secret')
b64str = b64encode(
f'{client_id}:{client_secret}'.encode()).decode().strip()
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {b64str}'
}
data = {
'grant_type': 'client_credentials'
}
data = parse.urlencode(data).encode()
with mock.patch(
'keystone.api._shared.authentication.'
'authenticate_for_token') as co_mock:
co_mock.side_effect = exception.Unauthorized(
'client is unauthorized')
resp = self.post(
self.ACCESS_TOKEN_URL,
headers=headers,
convert=False,
body=data,
noauth=True,
expected_status=client.UNAUTHORIZED)
self.assertNotEmpty(resp.headers.get("WWW-Authenticate"))
self.assertEqual('Keystone uri="http://localhost/v3"',
resp.headers.get("WWW-Authenticate"))
LOG.debug(f'response: {resp}')
json_resp = jsonutils.loads(resp.body)
self.assertEqual(error,
json_resp.get('error'))
LOG.debug(f'error: {json_resp.get("error")}')
def test_get_access_token_failed_400(self):
"""Test case when the called API is incorrect."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
error = 'invalid_request'
client_id = app_cred.get('id')
client_secret = app_cred.get('secret')
b64str = b64encode(
f'{client_id}:{client_secret}'.encode()).decode().strip()
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {b64str}'
}
data = {
'grant_type': 'client_credentials'
}
data = parse.urlencode(data).encode()
with mock.patch(
'keystone.api._shared.authentication.'
'authenticate_for_token') as co_mock:
co_mock.side_effect = exception.ValidationError(
'Auth method is invalid')
resp = self.post(
self.ACCESS_TOKEN_URL,
headers=headers,
convert=False,
body=data,
noauth=True,
expected_status=client.BAD_REQUEST)
LOG.debug(f'response: {resp}')
json_resp = jsonutils.loads(resp.body)
self.assertEqual(error,
json_resp.get('error'))
LOG.debug(f'error: {json_resp.get("error")}')
def test_get_access_token_failed_500_other(self):
"""Test case when unexpected error."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
error = 'other_error'
client_id = app_cred.get('id')
client_secret = app_cred.get('secret')
b64str = b64encode(
f'{client_id}:{client_secret}'.encode()).decode().strip()
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {b64str}'
}
data = {
'grant_type': 'client_credentials'
}
data = parse.urlencode(data).encode()
with mock.patch(
'keystone.api._shared.authentication.'
'authenticate_for_token') as co_mock:
co_mock.side_effect = exception.UnexpectedError(
'unexpected error.')
resp = self.post(
self.ACCESS_TOKEN_URL,
headers=headers,
convert=False,
body=data,
noauth=True,
expected_status=client.INTERNAL_SERVER_ERROR)
LOG.debug(f'response: {resp}')
json_resp = jsonutils.loads(resp.body)
self.assertEqual(error,
json_resp.get('error'))
def test_get_access_token_failed_500(self):
"""Test case when internal server error."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
error = 'other_error'
client_id = app_cred.get('id')
client_secret = app_cred.get('secret')
b64str = b64encode(
f'{client_id}:{client_secret}'.encode()).decode().strip()
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {b64str}'
}
data = {
'grant_type': 'client_credentials'
}
data = parse.urlencode(data).encode()
with mock.patch(
'keystone.api._shared.authentication.'
'authenticate_for_token') as co_mock:
co_mock.side_effect = Exception(
'Internal server is invalid')
resp = self.post(
self.ACCESS_TOKEN_URL,
headers=headers,
convert=False,
body=data,
noauth=True,
expected_status=client.INTERNAL_SERVER_ERROR)
LOG.debug(f'response: {resp}')
json_resp = jsonutils.loads(resp.body)
self.assertEqual(error,
json_resp.get('error'))
def test_get_access_token_method_get_not_allowed(self):
"""Test case when the request is get method that is not allowed."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
json_resp = self._get_access_token_method_not_allowed(
app_cred, self.get)
self.assertEqual('other_error',
json_resp.get('error'))
self.assertEqual('The method is not allowed for the requested URL.',
json_resp.get('error_description'))
def test_get_access_token_method_patch_not_allowed(self):
"""Test case when the request is patch method that is not allowed."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
json_resp = self._get_access_token_method_not_allowed(
app_cred, self.patch)
self.assertEqual('other_error',
json_resp.get('error'))
self.assertEqual('The method is not allowed for the requested URL.',
json_resp.get('error_description'))
def test_get_access_token_method_put_not_allowed(self):
"""Test case when the request is put method that is not allowed."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
json_resp = self._get_access_token_method_not_allowed(
app_cred, self.put)
self.assertEqual('other_error',
json_resp.get('error'))
self.assertEqual('The method is not allowed for the requested URL.',
json_resp.get('error_description'))
def test_get_access_token_method_delete_not_allowed(self):
"""Test case when the request is delete method that is not allowed."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
json_resp = self._get_access_token_method_not_allowed(
app_cred, self.delete)
self.assertEqual('other_error',
json_resp.get('error'))
self.assertEqual('The method is not allowed for the requested URL.',
json_resp.get('error_description'))
def test_get_access_token_method_head_not_allowed(self):
"""Test case when the request is head method that is not allowed."""
client_name = 'client_name_test'
app_cred = self._create_app_cred(self.user_id, client_name)
client_id = app_cred.get('id')
client_secret = app_cred.get('secret')
b64str = b64encode(
f'{client_id}:{client_secret}'.encode()).decode().strip()
headers = {
'Content-Type': 'application/x-www-form-urlencoded',
'Authorization': f'Basic {b64str}'
}
self.head(
self.ACCESS_TOKEN_URL,
headers=headers,
convert=False,
expected_status=client.METHOD_NOT_ALLOWED)

View File

@ -371,6 +371,9 @@ V3_JSON_HOME_RESOURCES = {
'href-template': '/users/{user_id}/projects',
'href-vars': {'user_id': json_home.Parameters.USER_ID, }},
json_home.build_v3_resource_relation('users'): {'href': '/users'},
json_home.build_v3_extension_resource_relation(
'OS-OAUTH2', '1.0', 'token'): {
'href': '/OS-OAUTH2/token'},
_build_federation_rel(resource_name='domains'): {
'href': '/auth/domains'},
_build_federation_rel(resource_name='websso'): {

View File

@ -0,0 +1,9 @@
---
features:
- |
[`blueprint oauth2-client-credentials-ext <https://blueprints.launchpad.net/keystone/+spec/oauth2-client-credentials-ext>`_]
Users can now use the OAuth2.0 Access Token API to get an access token
from the keystone identity server with application credentials. Then the
users can use the access token to access the OpenStack APIs that use the
keystone middleware to support OAuth2.0 client credentials authentication
through the keystone identity server.