Create V3 Auth Plugins

Extract the authentication code from a v3 client and move it to a series
of auth plugins. As v3 authentication can contain multiple
authentication methods this concept is represented by an AuthMethod. An
auth plugin then is provided with multiple mechanisms to authenticate
with.

There is also some helper class for the standard case where you only
need to authenticate with one method.

When a v3 client wants to do authentication it will create a new v3 auth
plugin, do the authentication and then take that result for the client
to use.

Change-Id: I5fa6a6e1c2e114e1428e35b723700c63a3cbed44
blueprint: auth-plugins
This commit is contained in:
Jamie Lennox
2014-01-21 11:40:28 +10:00
parent 96b8e81b1d
commit 7f1881211d
3 changed files with 493 additions and 98 deletions

View File

@@ -0,0 +1,251 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from keystoneclient import access
from keystoneclient.auth.identity import base
from keystoneclient import exceptions
_logger = logging.getLogger(__name__)
class Auth(base.BaseIdentityPlugin):
def __init__(self, auth_url, auth_methods,
trust_id=None,
domain_id=None,
domain_name=None,
project_id=None,
project_name=None,
project_domain_id=None,
project_domain_name=None):
"""Construct an Identity V3 Authentication Plugin.
:param string auth_url: Identity service endpoint for authentication.
:param list auth_methods: A collection of methods to authenticate with.
:param string trust_id: Trust ID for trust scoping.
:param string domain_id: Domain ID for domain scoping.
:param string domain_name: Domain name for domain scoping.
:param string project_id: Project ID for project scoping.
:param string project_name: Project name for project scoping.
:param string project_domain_id: Project's domain ID for project.
:param string project_domain_name: Project's domain name for project.
"""
super(Auth, self).__init__(auth_url=auth_url)
self.auth_methods = auth_methods
self.trust_id = trust_id
self.domain_id = domain_id
self.domain_name = domain_name
self.project_id = project_id
self.project_name = project_name
self.project_domain_id = project_domain_id
self.project_domain_name = project_domain_name
def get_auth_ref(self, session, **kwargs):
headers = {}
url = self.auth_url + "/auth/tokens"
body = {'auth': {'identity': {}}}
ident = body['auth']['identity']
for method in self.auth_methods:
method, auth_data = method.get_auth_data(headers)
ident.setdefault('methods', []).append(method)
ident[method] = auth_data
if not ident:
raise exceptions.AuthorizationFailure('Authentication method '
'required (e.g. password)')
if ((self.domain_id or self.domain_name) and
(self.project_id or self.project_name)):
raise exceptions.AuthorizationFailure('Authentication cannot be '
'scoped to both domain '
'and project.')
if self.domain_id:
body['auth']['scope'] = {'domain': {'id': self.domain_id}}
elif self.domain_name:
body['auth']['scope'] = {'domain': {'name': self.domain_name}}
elif self.project_id:
body['auth']['scope'] = {'project': {'id': self.project_id}}
elif self.project_name:
scope = body['auth']['scope'] = {'project': {}}
scope['project']['name'] = self.project_name
if self.project_domain_id:
scope['project']['domain'] = {'id': self.project_domain_id}
elif self.project_domain_name:
scope['project']['domain'] = {'name': self.project_domain_name}
if self.trust_id:
scope = body['auth'].setdefault('scope', {})
scope['OS-TRUST:trust'] = {'id': self.trust_id}
resp = session.post(url, json=body, headers=headers,
authenticated=False)
return access.AccessInfoV3(resp.headers['X-Subject-Token'],
**resp.json()['token'])
@staticmethod
def factory(auth_url, **kwargs):
"""Construct a plugin appropriate to your available arguments.
This function is intended as a convenience and backwards compatibility.
If you know the style of authorization you require then you should
construct that plugin directly.
"""
methods = []
# NOTE(jamielennox): kwargs extraction is outside the if statement to
# clear up additional args that might be passed but not valid for type.
method_kwargs = PasswordMethod.extract_kwargs(kwargs)
if method_kwargs.get('password'):
methods.append(PasswordMethod(**method_kwargs))
method_kwargs = TokenMethod.extract_kwargs(kwargs)
if method_kwargs.get('token'):
methods.append(TokenMethod(**method_kwargs))
if not methods:
msg = 'A username and password or token is required.'
raise exceptions.AuthorizationFailure(msg)
return Auth(auth_url, methods, **kwargs)
@six.add_metaclass(abc.ABCMeta)
class AuthMethod(object):
"""One part of a V3 Authentication strategy.
V3 Tokens allow multiple methods to be presented when authentication
against the server. Each one of these methods is implemented by an
AuthMethod.
Note: When implementing an AuthMethod use the method_parameters
and do not use positional arguments. Otherwise they can't be picked up by
the factory method and don't work as well with AuthConstructors.
"""
method_parameters = []
def __init__(self, **kwargs):
for param in self.method_parameters:
setattr(self, param, kwargs.pop(param, None))
if kwargs:
msg = "Unexpected Attributes: %s" % ", ".join(kwargs.keys())
raise AttributeError(msg)
@classmethod
def extract_kwargs(cls, kwargs):
"""Remove parameters related to this method from other kwargs."""
return dict([(p, kwargs.pop(p, None))
for p in cls.method_parameters])
@abc.abstractmethod
def get_auth_data(self, headers=None):
"""Return the authentication section of an auth plugin.
:param dict headers: The headers that will be sent with the auth
request if a plugin needs to add to them.
:return tuple(string, dict): The identifier of this plugin and a dict
of authentication data for the auth type.
"""
@six.add_metaclass(abc.ABCMeta)
class AuthConstructor(Auth):
"""AuthConstructor is a means of creating an Auth Plugin that contains
only one authentication method. This is generally the required usage.
An AuthConstructor creates an AuthMethod based on the method's
arguments and the auth_method_class defined by the plugin. It then
creates the auth plugin with only that authentication method.
"""
auth_method_class = None
def __init__(self, auth_url, *args, **kwargs):
method_kwargs = self.auth_method_class.extract_kwargs(kwargs)
method = self.auth_method_class(*args, **method_kwargs)
super(AuthConstructor, self).__init__(auth_url, [method], **kwargs)
class PasswordMethod(AuthMethod):
method_parameters = ['user_id',
'username',
'user_domain_id',
'user_domain_name',
'password']
def __init__(self, **kwargs):
"""Construct a User/Password based authentication method.
:param string password: Password for authentication.
:param string username: Username for authentication.
:param string user_id: User ID for authentication.
:param string user_domain_id: User's domain ID for authentication.
:param string user_domain_name: User's domain name for authentication.
"""
super(PasswordMethod, self).__init__(**kwargs)
def get_auth_data(self, headers=None):
user = {'password': self.password}
if self.user_id:
user['id'] = self.user_id
elif self.username:
user['name'] = self.username
if self.user_domain_id:
user['domain'] = {'id': self.user_domain_id}
elif self.user_domain_name:
user['domain'] = {'name': self.user_domain_name}
return 'password', {'user': user}
class Password(AuthConstructor):
auth_method_class = PasswordMethod
class TokenMethod(AuthMethod):
method_parameters = ['token']
def __init__(self, **kwargs):
"""Construct a Auth plugin to fetch a token from a token.
:param string token: Token for authentication.
"""
super(TokenMethod, self).__init__(**kwargs)
def get_auth_data(self, headers=None):
headers['X-Auth-Token'] = self.token
return 'token', {'id': self.token}
class Token(AuthConstructor):
auth_method_class = TokenMethod
def __init__(self, auth_url, token, **kwargs):
super(Token, self).__init__(auth_url, token=token, **kwargs)

View File

@@ -0,0 +1,218 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import httpretty
from keystoneclient import access
from keystoneclient.auth.identity import v3
from keystoneclient import exceptions
from keystoneclient import session
from keystoneclient.tests import utils
class V3IdentityPlugin(utils.TestCase):
TEST_ROOT_URL = 'http://127.0.0.1:5000/'
TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3')
TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/'
TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3')
TEST_PASS = 'password'
TEST_SERVICE_CATALOG = []
def setUp(self):
super(V3IdentityPlugin, self).setUp()
self.TEST_RESPONSE_DICT = {
"token": {
"methods": [
"token",
"password"
],
"expires_at": "2020-01-01T00:00:10.000123Z",
"project": {
"domain": {
"id": self.TEST_DOMAIN_ID,
"name": self.TEST_DOMAIN_NAME
},
"id": self.TEST_TENANT_ID,
"name": self.TEST_TENANT_NAME
},
"user": {
"domain": {
"id": self.TEST_DOMAIN_ID,
"name": self.TEST_DOMAIN_NAME
},
"id": self.TEST_USER,
"name": self.TEST_USER
},
"issued_at": "2013-05-29T16:55:21.468960Z",
"catalog": self.TEST_SERVICE_CATALOG
},
}
def _plugin(self, auth_url=TEST_URL, **kwargs):
return v3.Auth.factory(auth_url, **kwargs)
def _session(self, **kwargs):
return session.Session(auth=self._plugin(**kwargs))
def stub_auth(self, subject_token=None, **kwargs):
if not subject_token:
subject_token = self.TEST_TOKEN
self.stub_url(httpretty.POST, ['auth', 'tokens'],
X_Subject_Token=subject_token, **kwargs)
@httpretty.activate
def test_authenticate_with_username_password(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Password(self.TEST_URL,
username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(auth=a)
s.get_token()
req = {'auth': {'identity':
{'methods': ['password'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
@httpretty.activate
def test_authenticate_with_username_password_domain_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
domain_id=self.TEST_DOMAIN_ID)
s.get_token()
req = {'auth': {'identity':
{'methods': ['password'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}}},
'scope': {'domain': {'id': self.TEST_DOMAIN_ID}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
@httpretty.activate
def test_authenticate_with_username_password_project_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
project_id=self.TEST_DOMAIN_ID)
s.get_token()
req = {'auth': {'identity':
{'methods': ['password'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}}},
'scope': {'project': {'id': self.TEST_DOMAIN_ID}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
self.assertEqual(s.auth.auth_ref.project_id, self.TEST_DOMAIN_ID)
@httpretty.activate
def test_authenticate_with_token(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v3.Token(self.TEST_URL, self.TEST_TOKEN)
s = session.Session(auth=a)
s.get_token()
req = {'auth': {'identity':
{'methods': ['token'],
'token': {'id': self.TEST_TOKEN}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_missing_auth_params(self):
self.assertRaises(exceptions.AuthorizationFailure, self._plugin)
@httpretty.activate
def test_with_expired(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
d = copy.deepcopy(self.TEST_RESPONSE_DICT)
d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z'
a = self._plugin(username='username', password='password')
a.auth_ref = access.AccessInfo.factory(body=d)
s = session.Session(auth=a)
s.get_token()
self.assertEqual(a.auth_ref['expires_at'],
self.TEST_RESPONSE_DICT['token']['expires_at'])
def test_with_domain_and_project_scoping(self):
a = self._plugin(username='username', password='password',
project_id='project', domain_id='domain')
self.assertRaises(exceptions.AuthorizationFailure,
a.get_token, None)
@httpretty.activate
def test_with_trust_id(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
trust_id='trust')
s.get_token()
req = {'auth': {'identity':
{'methods': ['password'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}}},
'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
@httpretty.activate
def test_with_multiple_mechanisms_factory(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
trust_id='trust', token='foo')
s.get_token()
req = {'auth': {'identity':
{'methods': ['password', 'token'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}},
'token': {'id': 'foo'}},
'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
@httpretty.activate
def test_with_multiple_mechanisms(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
p = v3.PasswordMethod(username=self.TEST_USER,
password=self.TEST_PASS)
t = v3.TokenMethod(token='foo')
a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
s = session.Session(auth=a)
s.get_token()
req = {'auth': {'identity':
{'methods': ['password', 'token'],
'password': {'user': {'name': self.TEST_USER,
'password': self.TEST_PASS}},
'token': {'id': 'foo'}},
'scope': {'OS-TRUST:trust': {'id': 'trust'}}}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)

View File

@@ -15,6 +15,7 @@
import logging
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient import exceptions
from keystoneclient import httpclient
from keystoneclient.openstack.common import jsonutils
@@ -102,7 +103,9 @@ class Client(httpclient.HTTPClient):
self.users = users.UserManager(self)
self.trusts = trusts.TrustManager(self)
if self.management_url is None:
# DEPRECATED: if session is passed then we go to the new behaviour of
# authenticating on the first required call.
if 'session' not in kwargs and self.management_url is None:
self.authenticate()
def serialize(self, entity):
@@ -136,112 +139,35 @@ class Client(httpclient.HTTPClient):
**kwargs):
"""Authenticate against the v3 Identity API.
:returns: (``resp``, ``body``) if authentication was successful.
:returns: access.AccessInfo if authentication was successful.
:raises: AuthorizationFailure if unable to authenticate or validate
the existing authorization token
:raises: Unauthorized if authentication fails due to invalid token
"""
try:
return self._do_auth(
auth_url,
user_id=user_id,
username=username,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
password=password,
domain_id=domain_id,
domain_name=domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
token=token,
trust_id=trust_id)
if auth_url is None:
raise ValueError("Cannot authenticate without an auth_url")
a = v3_auth.Auth.factory(auth_url,
username=username,
password=password,
token=token,
trust_id=trust_id,
user_id=user_id,
domain_id=domain_id,
domain_name=domain_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name)
return a.get_auth_ref(self.session)
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
_logger.debug('Authorization failed.')
raise
except Exception as e:
raise exceptions.AuthorizationFailure('Authorization failed: '
'%s' % e)
def _do_auth(self, auth_url, user_id=None, username=None,
user_domain_id=None, user_domain_name=None, password=None,
domain_id=None, domain_name=None,
project_id=None, project_name=None, project_domain_id=None,
project_domain_name=None, token=None, trust_id=None):
headers = {}
if auth_url is None:
raise ValueError("Cannot authenticate without a valid auth_url")
url = auth_url + "/auth/tokens"
body = {'auth': {'identity': {}}}
ident = body['auth']['identity']
if token:
headers['X-Auth-Token'] = token
ident['methods'] = ['token']
ident['token'] = {}
ident['token']['id'] = token
if password:
ident['methods'] = ['password']
ident['password'] = {}
ident['password']['user'] = {}
user = ident['password']['user']
user['password'] = password
if user_id:
user['id'] = user_id
elif username:
user['name'] = username
if user_domain_id or user_domain_name:
user['domain'] = {}
if user_domain_id:
user['domain']['id'] = user_domain_id
elif user_domain_name:
user['domain']['name'] = user_domain_name
if (domain_id or domain_name) and (project_id or project_name):
raise ValueError('Authentication cannot be scoped to both domain'
' and project.')
if domain_id or domain_name:
body['auth']['scope'] = {}
scope = body['auth']['scope']
scope['domain'] = {}
if domain_id:
scope['domain']['id'] = domain_id
elif domain_name:
scope['domain']['name'] = domain_name
if project_id or project_name:
body['auth']['scope'] = {}
scope = body['auth']['scope']
scope['project'] = {}
if project_id:
scope['project']['id'] = project_id
elif project_name:
scope['project']['name'] = project_name
if project_domain_id or project_domain_name:
scope['project']['domain'] = {}
if project_domain_id:
scope['project']['domain']['id'] = project_domain_id
elif project_domain_name:
scope['project']['domain']['name'] = project_domain_name
if trust_id:
body['auth']['scope'] = {}
scope = body['auth']['scope']
scope['OS-TRUST:trust'] = {}
scope['OS-TRUST:trust']['id'] = trust_id
if not (ident or token):
raise ValueError('Authentication method required (e.g. password)')
resp, body = self.request(url, 'POST', body=body, headers=headers)
return resp, body