New auth plugin v3oidcdeviceauthz

OAuth 2.0 Device Authorization Grant

https://www.rfc-editor.org/rfc/rfc8628

Signed-off-by: Arvid Requate <requate@univention.de>
Change-Id: I8344ee5c9730c1533d58d7ccb04ddc3d2d517ade
This commit is contained in:
Arvid Requate 2022-11-29 21:56:14 +01:00
parent aa9c5d230f
commit 44e5b2deef
8 changed files with 229 additions and 5 deletions

View File

@ -426,6 +426,7 @@ authentication plugins that are available in `keystoneauth` are:
- v3oauth1: :py:class:`keystoneauth1.extras.oauth1.v3.OAuth1`
- v3oidcaccesstoken: :py:class:`keystoneauth1.identity.v3:OpenIDConnectAccessToken`
- v3oidcauthcode: :py:class:`keystoneauth1.identity.v3:OpenIDConnectAuthorizationCode`
- v3oidcdeviceauthz: :py:class:`keystoneauth1.loading._plugins.identity.v3:OpenIDConnectDeviceAuthorization`
- v3oidcclientcredentials: :py:class:`keystoneauth1.identity.v3:OpenIDConnectClientCredentials`
- v3oidcpassword: :py:class:`keystoneauth1.identity.v3:OpenIDConnectPassword`
- v3samlpassword: :py:class:`keystoneauth1.extras._saml2.v3.Password`

View File

@ -36,6 +36,15 @@ class OidcAuthorizationEndpointNotFound(auth_plugins.AuthPluginException):
message = "OpenID Connect authorization endpoint not provided."
class OidcDeviceAuthorizationEndpointNotFound(
auth_plugins.AuthPluginException):
message = "OpenID Connect device authorization endpoint not provided."
class OidcDeviceAuthorizationTimeOut(auth_plugins.AuthPluginException):
message = "Timeout for OpenID Connect device authorization."
class OidcGrantTypeMissmatch(auth_plugins.AuthPluginException):
message = "Missmatch between OpenID Connect plugin and grant_type argument"

View File

@ -49,6 +49,9 @@ V3OidcAuthorizationCode = oidc.OidcAuthorizationCode
V3OidcAccessToken = oidc.OidcAccessToken
"""See :class:`keystoneauth1.identity.v3.oidc.OidcAccessToken`"""
V3OidcDeviceAuthorization = oidc.OidcDeviceAuthorization
"""See :class:`keystoneauth1.identity.v3.oidc.OidcDeviceAuthorization`"""
V3TOTP = v3.TOTP
"""See :class:`keystoneauth1.identity.v3.TOTP`"""
@ -74,6 +77,7 @@ __all__ = ('BaseIdentityPlugin',
'V3OidcPassword',
'V3OidcAuthorizationCode',
'V3OidcAccessToken',
'V3OidcDeviceAuthorization',
'V3TOTP',
'V3TokenlessAuth',
'V3ApplicationCredential',

View File

@ -11,6 +11,8 @@
# under the License.
import abc
import time
from urllib import parse as urlparse
import warnings
import six
@ -131,8 +133,8 @@ class _OidcBase(federation.FederationBaseAuth):
otherwise it will return an empty dict.
:rtype: dict
"""
if (self.discovery_endpoint is not None and
not self._discovery_document):
if (self.discovery_endpoint is not None
and not self._discovery_document):
try:
resp = session.get(self.discovery_endpoint,
authenticated=False)
@ -247,9 +249,8 @@ class _OidcBase(federation.FederationBaseAuth):
# First of all, check if the grant type is supported
discovery = self._get_discovery_document(session)
grant_types = discovery.get("grant_types_supported")
if (grant_types and
self.grant_type is not None and
self.grant_type not in grant_types):
if (grant_types and self.grant_type is not None
and self.grant_type not in grant_types):
raise exceptions.OidcPluginNotSupported()
# Get the payload
@ -473,3 +474,135 @@ class OidcAccessToken(_OidcBase):
"""
response = self._get_keystone_token(session, self.access_token)
return access.create(resp=response)
class OidcDeviceAuthorization(_OidcBase):
"""Implementation for OAuth 2.0 Device Authorization Grant."""
grant_type = "urn:ietf:params:oauth:grant-type:device_code"
HEADER_X_FORM = {"Content-Type": "application/x-www-form-urlencoded"}
def __init__(self, auth_url, identity_provider, protocol, # nosec
client_id, client_secret,
access_token_endpoint=None,
device_authorization_endpoint=None,
discovery_endpoint=None,
**kwargs):
"""The OAuth 2.0 Device Authorization plugin expects the following.
:param device_authorization_endpoint: OAuth 2.0 Device Authorization
Endpoint, for example:
https://localhost:8020/oidc/authorize/device
Note that if a discovery document is
provided this value will override
the discovered one.
:type device_authorization_endpoint: string
"""
# RFC 8628 only allows to retrieve an access_token
self.access_token_type = 'access_token'
self.device_authorization_endpoint = device_authorization_endpoint
super(OidcDeviceAuthorization, self).__init__(
auth_url=auth_url,
identity_provider=identity_provider,
protocol=protocol,
client_id=client_id,
client_secret=client_secret,
access_token_endpoint=access_token_endpoint,
discovery_endpoint=discovery_endpoint,
access_token_type=self.access_token_type,
**kwargs)
def _get_device_authorization_endpoint(self, session):
"""Get the endpoint for the OAuth 2.0 Device Authorization flow.
This method will return the correct device authorization endpoint to
be used.
If the user has explicitly passed an device_authorization_endpoint to
the constructor that will be returned. If there is no explicit endpoint
and a discovery url is provided, it will try to get it from the
discovery document. If nothing is found, an exception will be raised.
:param session: a session object to send out HTTP requests.
:type session: keystoneauth1.session.Session
:return: the endpoint to use
:rtype: string or None if no endpoint is found
"""
if self.device_authorization_endpoint is not None:
return self.device_authorization_endpoint
discovery = self._get_discovery_document(session)
endpoint = discovery.get("device_authorization_endpoint")
if endpoint is None:
raise exceptions.oidc.OidcDeviceAuthorizationEndpointNotFound()
return endpoint
def get_payload(self, session):
"""Get an authorization grant for the "device_code" grant type.
:param session: a session object to send out HTTP requests.
:type session: keystoneauth1.session.Session
:returns: a python dictionary containing the payload to be exchanged
:rtype: dict
"""
client_auth = (self.client_id, self.client_secret)
device_authz_endpoint = \
self._get_device_authorization_endpoint(session)
op_response = session.post(device_authz_endpoint,
requests_auth=client_auth,
data={},
authenticated=False)
self.expires_in = int(op_response.json()["expires_in"])
self.timeout = time.time() + self.expires_in
self.device_code = op_response.json()["device_code"]
self.interval = int(op_response.json()["interval"])
self.user_code = op_response.json()["user_code"]
self.verification_uri = op_response.json()["verification_uri"]
self.verification_uri_complete = \
op_response.json()["verification_uri_complete"]
payload = {'device_code': self.device_code}
return payload
def _get_access_token(self, session, payload):
"""Poll token endpoint for an access token.
:param session: a session object to send out HTTP requests.
:type session: keystoneauth1.session.Session
:param payload: a dict containing various OpenID Connect values,
for example::
{'grant_type': 'urn:ietf:params:oauth:grant-type:device_code',
'device_code': self.device_code}
:type payload: dict
"""
print(f"\nTo authenticate please go to: "
f"{self.verification_uri_complete}")
client_auth = (self.client_id, self.client_secret)
access_token_endpoint = self._get_access_token_endpoint(session)
encoded_payload = urlparse.urlencode(payload)
while time.time() < self.timeout:
try:
op_response = session.post(access_token_endpoint,
requests_auth=client_auth,
data=encoded_payload,
headers=self.HEADER_X_FORM,
authenticated=False)
except exceptions.http.BadRequest as exc:
error = exc.response.json().get("error")
if error != "authorization_pending":
raise
time.sleep(self.interval)
continue
break
else:
if error == "authorization_pending":
raise exceptions.oidc.OidcDeviceAuthorizationTimeOut()
access_token = op_response.json()[self.access_token_type]
return access_token

View File

@ -191,6 +191,29 @@ class OpenIDConnectAccessToken(loading.BaseFederationLoader):
return options
class OpenIDConnectDeviceAuthorization(_OpenIDConnectBase):
@property
def plugin_class(self):
return identity.V3OidcDeviceAuthorization
def get_options(self):
options = super(OpenIDConnectDeviceAuthorization, self).get_options()
# RFC 8628 doesn't support id_token
options = [opt for opt in options if opt.name != 'access-token-type']
options.extend([
loading.Opt('device-authorization-endpoint',
help='OAuth 2.0 Device Authorization Endpoint. Note '
'that if a discovery document is being passed this '
'option will override the endpoint provided by the '
'server in the discovery document.'),
])
return options
class TOTP(loading.BaseV3Loader):
@property

View File

@ -295,6 +295,56 @@ class OpenIDConnectAccessToken(utils.TestCase):
self.assertEqual(access_token, oidc.access_token)
class OpenIDConnectDeviceAuthorizationTests(utils.TestCase):
plugin_name = "v3oidcdeviceauthz"
def setUp(self):
super(OpenIDConnectDeviceAuthorizationTests, self).setUp()
self.auth_url = uuid.uuid4().hex
def create(self, **kwargs):
kwargs.setdefault('auth_url', self.auth_url)
loader = loading.get_plugin_loader(self.plugin_name)
return loader.load_from_options(**kwargs)
def test_options(self):
options = loading.get_plugin_loader(self.plugin_name).get_options()
self.assertTrue(
set(['client-id', 'client-secret', 'access-token-endpoint',
'openid-scope', 'discovery-endpoint',
'device-authorization-endpoint']).issubset(
set([o.name for o in options]))
)
def test_basic(self):
access_token_endpoint = uuid.uuid4().hex
device_authorization_endpoint = uuid.uuid4().hex
scope = uuid.uuid4().hex
identity_provider = uuid.uuid4().hex
protocol = uuid.uuid4().hex
client_id = uuid.uuid4().hex
client_secret = uuid.uuid4().hex
dev_authz_endpt = device_authorization_endpoint
oidc = self.create(identity_provider=identity_provider,
protocol=protocol,
access_token_endpoint=access_token_endpoint,
device_authorization_endpoint=dev_authz_endpt,
client_id=client_id,
client_secret=client_secret,
scope=scope)
self.assertEqual(dev_authz_endpt, oidc.device_authorization_endpoint)
self.assertEqual(identity_provider, oidc.identity_provider)
self.assertEqual(protocol, oidc.protocol)
self.assertEqual(access_token_endpoint, oidc.access_token_endpoint)
self.assertEqual(client_id, oidc.client_id)
self.assertEqual(client_secret, oidc.client_secret)
self.assertEqual(scope, oidc.scope)
class V3TokenlessAuthTests(utils.TestCase):
def setUp(self):

View File

@ -77,6 +77,7 @@ DISCOVERY_DOCUMENT = {
"grant_types_supported": [
"authorization_code",
"password",
"urn:ietf:params:oauth:grant-type:device_code",
],
"introspection_endpoint": "https://localhost:8020/oidc/introspect",
"issuer": "https://localhost:8020/oidc/",
@ -88,6 +89,8 @@ DISCOVERY_DOCUMENT = {
"service_documentation": "https://localhost:8020/oidc/about",
"token_endpoint": "https://localhost:8020/oidc/token",
"userinfo_endpoint": "https://localhost:8020/oidc/userinfo",
"device_authorization_endpoint":
"https://localhost:8020/oidc/authorize/device",
"token_endpoint_auth_methods_supported": [
"client_secret_post",
"client_secret_basic",

View File

@ -50,6 +50,7 @@ keystoneauth1.plugin =
v3token = keystoneauth1.loading._plugins.identity.v3:Token
v3oidcclientcredentials = keystoneauth1.loading._plugins.identity.v3:OpenIDConnectClientCredentials
v3oidcpassword = keystoneauth1.loading._plugins.identity.v3:OpenIDConnectPassword
v3oidcdeviceauthz = keystoneauth1.loading._plugins.identity.v3:OpenIDConnectDeviceAuthorization
v3oidcauthcode = keystoneauth1.loading._plugins.identity.v3:OpenIDConnectAuthorizationCode
v3oidcaccesstoken = keystoneauth1.loading._plugins.identity.v3:OpenIDConnectAccessToken
v3oauth1 = keystoneauth1.extras.oauth1._loading:V3OAuth1