Auth plugin for X.509 tokenless authentication

An auth plugin that allows service clients to be authenticated
with the X.509 tokenless authentication. Please find typical configured
options in authentication-plugins.rst

implements bp keystone-tokenless-authz-with-x509-ssl-client-cert

Change-Id: Ie0298f0ef7f3891cfc81072ab9ef9e501773fe5f
This commit is contained in:
chioleong 2016-02-23 18:07:02 -08:00 committed by Sam Leong
parent bebe214534
commit e203d61092
8 changed files with 370 additions and 3 deletions

View File

@ -57,6 +57,8 @@ this V3 defines a number of different
a V3 identity service using an existing token.
- :py:class:`~keystoneauth1.identity.v3.TOTPMethod`: Authenticate against
a V3 identity service using Time-Based One-Time Password (TOTP).
- :py:class:`~keystoneauth1.identity.v3.TokenlessAuth`: Authenticate against
a V3 identity service using tokenless authentication.
- :py:class:`~keystoneauth1.extras.kerberos.KerberosMethod`: Authenticate
against a V3 identity service using Kerberos.
@ -165,6 +167,30 @@ access token's key and secret. For example::
>>> s = session.Session(auth=a)
Tokenless Auth
==============
A plugin for tokenless authentication also exists. It provides a means to
authorize client operations within the Identity server by using an X.509
TLS client certificate without having to issue a token. We provide a
tokenless authentication plugin at:
- :class:`~keystoneauth1.identity.v3.TokenlessAuth`
It is mostly used by service clients for token validation and here is
an example of how this plugin would be used in practice::
>>> keystoneauth1 import session
>>> keystoneauth1.identity import v3
>>> auth = v3.TokenlessAuth(auth_url='https://keystone:5000/v3',
... domain_name='my_service_domain')
>>> sess = session.Session(
... auth=auth,
... cert=('/opt/service_client.crt',
... '/opt/service_client.key'),
... verify='/opt/ca.crt')
Loading Plugins by Name
=======================
@ -181,7 +207,7 @@ authentication plugins that are available in `keystoneauth` are:
- v3token: :py:class:`keystoneauth1.identity.v3.Token`
- v3totp: :py:class:`keystoneauth1.identity.v3.TOTP`
- v3kerberos: :py:class:`keystoneauth1.extras.kerberos.Kerberos`
- v3tokenlessauth: :py:class:`keystoneauth1.identity.v3.TokenlessAuth`
Creating Authentication Plugins
===============================

View File

@ -49,6 +49,9 @@ V3OidcAccessToken = oidc.OidcAccessToken
V3TOTP = v3.TOTP
"""See :class:`keystoneauth1.identity.v3.TOTP`"""
V3TokenlessAuth = v3.TokenlessAuth
"""See :class:`keystoneauth1.identity.v3.TokenlessAuth`"""
__all__ = ('BaseIdentityPlugin',
'Password',
'Token',
@ -59,4 +62,5 @@ __all__ = ('BaseIdentityPlugin',
'V3OidcPassword',
'V3OidcAuthorizationCode',
'V3OidcAccessToken',
'V3TOTP')
'V3TOTP',
'V3TokenlessAuth')

View File

@ -17,6 +17,7 @@ from keystoneauth1.identity.v3.oidc import * # noqa
from keystoneauth1.identity.v3.password import * # noqa
from keystoneauth1.identity.v3.token import * # noqa
from keystoneauth1.identity.v3.totp import * # noqa
from keystoneauth1.identity.v3.tokenless_auth import * # noqa
__all__ = ('Auth',
@ -38,4 +39,6 @@ __all__ = ('Auth',
'OidcPassword',
'TOTPMethod',
'TOTP')
'TOTP',
'TokenlessAuth')

View File

@ -0,0 +1,115 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from keystoneauth1 import _utils as utils
from keystoneauth1 import plugin
LOG = utils.get_logger(__name__)
@six.add_metaclass(abc.ABCMeta)
class TokenlessAuth(plugin.BaseAuthPlugin):
"""A plugin for authenticating with Tokenless Auth.
This is for Tokenless Authentication. Scoped information
like domain name and project ID will be passed in the headers and
token validation request will be authenticated based on
the provided HTTPS certificate along with the scope information.
"""
def __init__(self, auth_url,
domain_id=None,
domain_name=None,
project_id=None,
project_name=None,
project_domain_id=None,
project_domain_name=None):
"""A init method for TokenlessAuth.
:param string auth_url: Identity service endpoint for authentication.
The URL must include a version or any request
will result in a 404 NotFound error.
:param string domain_id: Domain ID for domain scoping.
:param string domain_name: Domain name for domain scoping.
:param string project_id: Project ID for project scoping.
:param string project_name: Project name for project scoping.
:param string project_domain_id: Project's domain ID for project.
:param string project_domain_name: Project's domain name for project.
"""
self.auth_url = auth_url
self.domain_id = domain_id
self.domain_name = domain_name
self.project_id = project_id
self.project_name = project_name
self.project_domain_id = project_domain_id
self.project_domain_name = project_domain_name
def get_headers(self, session, **kwargs):
"""Fetch authentication headers for message.
This is to override the default get_headers method to provide
tokenless auth scope headers if token is not provided in the
session.
:param session: The session object that the auth_plugin belongs to.
:type session: keystoneauth1.session.Session
:returns: Headers that are set to authenticate a message or None for
failure. Note that when checking this value that the empty
dict is a valid, non-failure response.
:rtype: dict
"""
scope_headers = {}
if self.project_id:
scope_headers['X-Project-Id'] = self.project_id
elif self.project_name:
scope_headers['X-Project-Name'] = self.project_name
if self.project_domain_id:
scope_headers['X-Project-Domain-Id'] = (
self.project_domain_id)
elif self.project_domain_name:
scope_headers['X-Project-Domain-Name'] = (
self.project_domain_name)
else:
LOG.warning(
'Neither Project Domain ID nor Project Domain Name was '
'provided.')
return None
elif self.domain_id:
scope_headers['X-Domain-Id'] = self.domain_id
elif self.domain_name:
scope_headers['X-Domain-Name'] = self.domain_name
else:
LOG.warning(
'Neither Project nor Domain scope was provided.')
return None
return scope_headers
def get_endpoint(self, session, service_type=None, **kwargs):
"""Return a valid endpoint for a service.
:param session: A session object that can be used for communication.
:type session: keystoneauth1.session.Session
:param string service_type: The type of service to lookup the endpoint
for. This plugin will return None (failure)
if service_type is not provided.
:return: A valid endpoint URL or None if not available.
:rtype: string or None
"""
if (service_type is plugin.AUTH_INTERFACE
or service_type.lower() == 'identity'):
return self.auth_url
return None

View File

@ -172,3 +172,46 @@ class TOTP(loading.BaseV3Loader):
_assert_identity_options(kwargs)
return super(TOTP, self).load_from_options(**kwargs)
class TokenlessAuth(loading.BaseLoader):
@property
def plugin_class(self):
return identity.V3TokenlessAuth
def get_options(self):
options = super(TokenlessAuth, self).get_options()
options.extend([
loading.Opt('auth-url', required=True,
help='Authentication URL'),
loading.Opt('domain-id', help='Domain ID to scope to'),
loading.Opt('domain-name', help='Domain name to scope to'),
loading.Opt('project-id', help='Project ID to scope to'),
loading.Opt('project-name', help='Project name to scope to'),
loading.Opt('project-domain-id',
help='Domain ID containing project'),
loading.Opt('project-domain-name',
help='Domain name containing project'),
])
return options
def load_from_options(self, **kwargs):
if (not kwargs.get('domain_id') and
not kwargs.get('domain_name') and
not kwargs.get('project_id') and
not kwargs.get('project_name') or
(kwargs.get('project_name') and
not (kwargs.get('project_domain_name') or
kwargs.get('project_domain_id')))):
m = ('You need to provide either a domain_name, domain_id, '
'project_id or project_name. '
'If you have provided a project_name, in the V3 identity '
'API a project_name is only unique within a domain so '
'you must also provide either a project_domain_id or '
'project_domain_name.')
raise exceptions.OptionError(m)
return super(TokenlessAuth, self).load_from_options(**kwargs)

View File

@ -0,0 +1,105 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from keystoneauth1 import exceptions
from keystoneauth1.identity.v3 import tokenless_auth
from keystoneauth1 import session
from keystoneauth1.tests.unit import utils
class TokenlessAuthTest(utils.TestCase):
TEST_URL = 'http://server/prefix'
def create(self, auth_url,
domain_id=None,
domain_name=None,
project_id=None,
project_name=None,
project_domain_id=None,
project_domain_name=None):
self.requests_mock.get(self.TEST_URL)
auth = tokenless_auth.TokenlessAuth(
auth_url=self.TEST_URL,
domain_id=domain_id,
domain_name=domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name)
return auth, session.Session(auth=auth)
def test_domain_id_scope_header_pass(self):
domain_id = uuid.uuid4().hex
auth, session = self.create(auth_url=self.TEST_URL,
domain_id=domain_id)
session.get(self.TEST_URL, authenticated=True)
self.assertRequestHeaderEqual('X-Domain-Id', domain_id)
def test_domain_name_scope_header_pass(self):
domain_name = uuid.uuid4().hex
auth, session = self.create(auth_url=self.TEST_URL,
domain_name=domain_name)
session.get(self.TEST_URL, authenticated=True)
self.assertRequestHeaderEqual('X-Domain-Name', domain_name)
def test_project_id_scope_header_pass(self):
project_id = uuid.uuid4().hex
auth, session = self.create(auth_url=self.TEST_URL,
project_id=project_id)
session.get(self.TEST_URL, authenticated=True)
self.assertRequestHeaderEqual('X-Project-Id', project_id)
def test_project_of_domain_id_scope_header_pass(self):
project_name = uuid.uuid4().hex
project_domain_id = uuid.uuid4().hex
auth, session = self.create(auth_url=self.TEST_URL,
project_name=project_name,
project_domain_id=project_domain_id)
session.get(self.TEST_URL, authenticated=True)
self.assertRequestHeaderEqual('X-Project-Name', project_name)
self.assertRequestHeaderEqual('X-Project-Domain-Id', project_domain_id)
def test_project_of_domain__name_scope_header_pass(self):
project_name = uuid.uuid4().hex
project_domain_name = uuid.uuid4().hex
auth, session = self.create(auth_url=self.TEST_URL,
project_name=project_name,
project_domain_name=project_domain_name)
session.get(self.TEST_URL, authenticated=True)
self.assertRequestHeaderEqual('X-Project-Name', project_name)
self.assertRequestHeaderEqual('X-Project-Domain-Name',
project_domain_name)
def test_no_scope_header_fail(self):
auth, session = self.create(auth_url=self.TEST_URL)
self.assertIsNone(auth.get_headers(session))
msg = 'No valid authentication is available'
self.assertRaisesRegex(exceptions.AuthorizationFailure,
msg,
session.get,
self.TEST_URL,
authenticated=True)
def test_project_name_scope_only_header_fail(self):
project_name = uuid.uuid4().hex
auth, session = self.create(auth_url=self.TEST_URL,
project_name=project_name)
self.assertIsNone(auth.get_headers(session))
msg = 'No valid authentication is available'
self.assertRaisesRegex(exceptions.AuthorizationFailure,
msg,
session.get,
self.TEST_URL,
authenticated=True)

View File

@ -251,3 +251,73 @@ class OpenIDConnectAccessToken(utils.TestCase):
self.assertEqual(identity_provider, oidc.identity_provider)
self.assertEqual(protocol, oidc.protocol)
self.assertEqual(access_token, oidc.access_token)
class V3TokenlessAuthTests(utils.TestCase):
def setUp(self):
super(V3TokenlessAuthTests, self).setUp()
self.auth_url = uuid.uuid4().hex
def create(self, **kwargs):
kwargs.setdefault('auth_url', self.auth_url)
loader = loading.get_plugin_loader('v3tokenlessauth')
return loader.load_from_options(**kwargs)
def test_basic(self):
domain_id = uuid.uuid4().hex
domain_name = uuid.uuid4().hex
project_id = uuid.uuid4().hex
project_name = uuid.uuid4().hex
project_domain_id = uuid.uuid4().hex
project_domain_name = uuid.uuid4().hex
tla = self.create(domain_id=domain_id,
domain_name=domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name)
self.assertEqual(domain_id, tla.domain_id)
self.assertEqual(domain_name, tla.domain_name)
self.assertEqual(project_id, tla.project_id)
self.assertEqual(project_name, tla.project_name)
self.assertEqual(project_domain_id, tla.project_domain_id)
self.assertEqual(project_domain_name, tla.project_domain_name)
def test_missing_parameters(self):
self.assertRaises(exceptions.OptionError,
self.create,
domain_id=None)
self.assertRaises(exceptions.OptionError,
self.create,
domain_name=None)
self.assertRaises(exceptions.OptionError,
self.create,
project_id=None)
self.assertRaises(exceptions.OptionError,
self.create,
project_name=None)
self.assertRaises(exceptions.OptionError,
self.create,
project_domain_id=None)
self.assertRaises(exceptions.OptionError,
self.create,
project_domain_name=None)
# only when a project_name is provided, project_domain_id will
# be use to uniquely identify the project. It's an invalid
# option when it's just by itself.
self.assertRaises(exceptions.OptionError,
self.create,
project_domain_id=uuid.uuid4().hex)
# only when a project_name is provided, project_domain_name will
# be use to uniquely identify the project. It's an invalid
# option when it's just by itself.
self.assertRaises(exceptions.OptionError,
self.create,
project_domain_name=uuid.uuid4().hex)
self.assertRaises(exceptions.OptionError,
self.create,
project_name=uuid.uuid4().hex)

View File

@ -51,6 +51,7 @@ keystoneauth1.plugin =
v3kerberos = keystoneauth1.extras.kerberos._loading:Kerberos
v3totp = keystoneauth1.loading._plugins.identity.v3:TOTP
v3fedkerb = keystoneauth1.extras.kerberos._loading:MappedKerberos
v3tokenlessauth = keystoneauth1.loading._plugins.identity.v3:TokenlessAuth
[build_sphinx]
source-dir = doc/source