diff --git a/keystoneclient/auth/identity/v3.py b/keystoneclient/auth/identity/v3.py new file mode 100644 index 000000000..5f3e7e10f --- /dev/null +++ b/keystoneclient/auth/identity/v3.py @@ -0,0 +1,251 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import logging + +import six + +from keystoneclient import access +from keystoneclient.auth.identity import base +from keystoneclient import exceptions + +_logger = logging.getLogger(__name__) + + +class Auth(base.BaseIdentityPlugin): + + def __init__(self, auth_url, auth_methods, + trust_id=None, + domain_id=None, + domain_name=None, + project_id=None, + project_name=None, + project_domain_id=None, + project_domain_name=None): + """Construct an Identity V3 Authentication Plugin. + + :param string auth_url: Identity service endpoint for authentication. + :param list auth_methods: A collection of methods to authenticate with. + :param string trust_id: Trust ID for trust scoping. + :param string domain_id: Domain ID for domain scoping. + :param string domain_name: Domain name for domain scoping. + :param string project_id: Project ID for project scoping. + :param string project_name: Project name for project scoping. + :param string project_domain_id: Project's domain ID for project. + :param string project_domain_name: Project's domain name for project. + """ + + super(Auth, self).__init__(auth_url=auth_url) + + self.auth_methods = auth_methods + self.trust_id = trust_id + self.domain_id = domain_id + self.domain_name = domain_name + self.project_id = project_id + self.project_name = project_name + self.project_domain_id = project_domain_id + self.project_domain_name = project_domain_name + + def get_auth_ref(self, session, **kwargs): + headers = {} + url = self.auth_url + "/auth/tokens" + body = {'auth': {'identity': {}}} + ident = body['auth']['identity'] + + for method in self.auth_methods: + method, auth_data = method.get_auth_data(headers) + ident.setdefault('methods', []).append(method) + ident[method] = auth_data + + if not ident: + raise exceptions.AuthorizationFailure('Authentication method ' + 'required (e.g. password)') + + if ((self.domain_id or self.domain_name) and + (self.project_id or self.project_name)): + raise exceptions.AuthorizationFailure('Authentication cannot be ' + 'scoped to both domain ' + 'and project.') + + if self.domain_id: + body['auth']['scope'] = {'domain': {'id': self.domain_id}} + elif self.domain_name: + body['auth']['scope'] = {'domain': {'name': self.domain_name}} + elif self.project_id: + body['auth']['scope'] = {'project': {'id': self.project_id}} + elif self.project_name: + scope = body['auth']['scope'] = {'project': {}} + scope['project']['name'] = self.project_name + + if self.project_domain_id: + scope['project']['domain'] = {'id': self.project_domain_id} + elif self.project_domain_name: + scope['project']['domain'] = {'name': self.project_domain_name} + + if self.trust_id: + scope = body['auth'].setdefault('scope', {}) + scope['OS-TRUST:trust'] = {'id': self.trust_id} + + resp = session.post(url, json=body, headers=headers, + authenticated=False) + return access.AccessInfoV3(resp.headers['X-Subject-Token'], + **resp.json()['token']) + + @staticmethod + def factory(auth_url, **kwargs): + """Construct a plugin appropriate to your available arguments. + + This function is intended as a convenience and backwards compatibility. + If you know the style of authorization you require then you should + construct that plugin directly. + """ + + methods = [] + + # NOTE(jamielennox): kwargs extraction is outside the if statement to + # clear up additional args that might be passed but not valid for type. + method_kwargs = PasswordMethod.extract_kwargs(kwargs) + if method_kwargs.get('password'): + methods.append(PasswordMethod(**method_kwargs)) + + method_kwargs = TokenMethod.extract_kwargs(kwargs) + if method_kwargs.get('token'): + methods.append(TokenMethod(**method_kwargs)) + + if not methods: + msg = 'A username and password or token is required.' + raise exceptions.AuthorizationFailure(msg) + + return Auth(auth_url, methods, **kwargs) + + +@six.add_metaclass(abc.ABCMeta) +class AuthMethod(object): + """One part of a V3 Authentication strategy. + + V3 Tokens allow multiple methods to be presented when authentication + against the server. Each one of these methods is implemented by an + AuthMethod. + + Note: When implementing an AuthMethod use the method_parameters + and do not use positional arguments. Otherwise they can't be picked up by + the factory method and don't work as well with AuthConstructors. + """ + + method_parameters = [] + + def __init__(self, **kwargs): + for param in self.method_parameters: + setattr(self, param, kwargs.pop(param, None)) + + if kwargs: + msg = "Unexpected Attributes: %s" % ", ".join(kwargs.keys()) + raise AttributeError(msg) + + @classmethod + def extract_kwargs(cls, kwargs): + """Remove parameters related to this method from other kwargs.""" + return dict([(p, kwargs.pop(p, None)) + for p in cls.method_parameters]) + + @abc.abstractmethod + def get_auth_data(self, headers=None): + """Return the authentication section of an auth plugin. + + :param dict headers: The headers that will be sent with the auth + request if a plugin needs to add to them. + :return tuple(string, dict): The identifier of this plugin and a dict + of authentication data for the auth type. + """ + + +@six.add_metaclass(abc.ABCMeta) +class AuthConstructor(Auth): + """AuthConstructor is a means of creating an Auth Plugin that contains + only one authentication method. This is generally the required usage. + + An AuthConstructor creates an AuthMethod based on the method's + arguments and the auth_method_class defined by the plugin. It then + creates the auth plugin with only that authentication method. + """ + + auth_method_class = None + + def __init__(self, auth_url, *args, **kwargs): + method_kwargs = self.auth_method_class.extract_kwargs(kwargs) + method = self.auth_method_class(*args, **method_kwargs) + super(AuthConstructor, self).__init__(auth_url, [method], **kwargs) + + +class PasswordMethod(AuthMethod): + + method_parameters = ['user_id', + 'username', + 'user_domain_id', + 'user_domain_name', + 'password'] + + def __init__(self, **kwargs): + """Construct a User/Password based authentication method. + + :param string password: Password for authentication. + :param string username: Username for authentication. + :param string user_id: User ID for authentication. + :param string user_domain_id: User's domain ID for authentication. + :param string user_domain_name: User's domain name for authentication. + """ + super(PasswordMethod, self).__init__(**kwargs) + + def get_auth_data(self, headers=None): + user = {'password': self.password} + + if self.user_id: + user['id'] = self.user_id + elif self.username: + user['name'] = self.username + + if self.user_domain_id: + user['domain'] = {'id': self.user_domain_id} + elif self.user_domain_name: + user['domain'] = {'name': self.user_domain_name} + + return 'password', {'user': user} + + +class Password(AuthConstructor): + auth_method_class = PasswordMethod + + +class TokenMethod(AuthMethod): + + method_parameters = ['token'] + + def __init__(self, **kwargs): + """Construct a Auth plugin to fetch a token from a token. + + :param string token: Token for authentication. + """ + super(TokenMethod, self).__init__(**kwargs) + + def get_auth_data(self, headers=None): + headers['X-Auth-Token'] = self.token + return 'token', {'id': self.token} + + +class Token(AuthConstructor): + auth_method_class = TokenMethod + + def __init__(self, auth_url, token, **kwargs): + super(Token, self).__init__(auth_url, token=token, **kwargs) diff --git a/keystoneclient/tests/auth/test_identity_v3.py b/keystoneclient/tests/auth/test_identity_v3.py new file mode 100644 index 000000000..974b228eb --- /dev/null +++ b/keystoneclient/tests/auth/test_identity_v3.py @@ -0,0 +1,218 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy + +import httpretty + +from keystoneclient import access +from keystoneclient.auth.identity import v3 +from keystoneclient import exceptions +from keystoneclient import session +from keystoneclient.tests import utils + + +class V3IdentityPlugin(utils.TestCase): + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + TEST_URL = '%s%s' % (TEST_ROOT_URL, 'v3') + TEST_ROOT_ADMIN_URL = 'http://127.0.0.1:35357/' + TEST_ADMIN_URL = '%s%s' % (TEST_ROOT_ADMIN_URL, 'v3') + + TEST_PASS = 'password' + + TEST_SERVICE_CATALOG = [] + + def setUp(self): + super(V3IdentityPlugin, self).setUp() + self.TEST_RESPONSE_DICT = { + "token": { + "methods": [ + "token", + "password" + ], + + "expires_at": "2020-01-01T00:00:10.000123Z", + "project": { + "domain": { + "id": self.TEST_DOMAIN_ID, + "name": self.TEST_DOMAIN_NAME + }, + "id": self.TEST_TENANT_ID, + "name": self.TEST_TENANT_NAME + }, + "user": { + "domain": { + "id": self.TEST_DOMAIN_ID, + "name": self.TEST_DOMAIN_NAME + }, + "id": self.TEST_USER, + "name": self.TEST_USER + }, + "issued_at": "2013-05-29T16:55:21.468960Z", + "catalog": self.TEST_SERVICE_CATALOG + }, + } + + def _plugin(self, auth_url=TEST_URL, **kwargs): + return v3.Auth.factory(auth_url, **kwargs) + + def _session(self, **kwargs): + return session.Session(auth=self._plugin(**kwargs)) + + def stub_auth(self, subject_token=None, **kwargs): + if not subject_token: + subject_token = self.TEST_TOKEN + + self.stub_url(httpretty.POST, ['auth', 'tokens'], + X_Subject_Token=subject_token, **kwargs) + + @httpretty.activate + def test_authenticate_with_username_password(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Password(self.TEST_URL, + username=self.TEST_USER, + password=self.TEST_PASS) + s = session.Session(auth=a) + + s.get_token() + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}}} + + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + @httpretty.activate + def test_authenticate_with_username_password_domain_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + s = self._session(username=self.TEST_USER, password=self.TEST_PASS, + domain_id=self.TEST_DOMAIN_ID) + s.get_token() + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'domain': {'id': self.TEST_DOMAIN_ID}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + @httpretty.activate + def test_authenticate_with_username_password_project_scoped(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + s = self._session(username=self.TEST_USER, password=self.TEST_PASS, + project_id=self.TEST_DOMAIN_ID) + s.get_token() + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'project': {'id': self.TEST_DOMAIN_ID}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + self.assertEqual(s.auth.auth_ref.project_id, self.TEST_DOMAIN_ID) + + @httpretty.activate + def test_authenticate_with_token(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + a = v3.Token(self.TEST_URL, self.TEST_TOKEN) + s = session.Session(auth=a) + s.get_token() + + req = {'auth': {'identity': + {'methods': ['token'], + 'token': {'id': self.TEST_TOKEN}}}} + + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + def test_missing_auth_params(self): + self.assertRaises(exceptions.AuthorizationFailure, self._plugin) + + @httpretty.activate + def test_with_expired(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + + d = copy.deepcopy(self.TEST_RESPONSE_DICT) + d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z' + + a = self._plugin(username='username', password='password') + a.auth_ref = access.AccessInfo.factory(body=d) + s = session.Session(auth=a) + + s.get_token() + + self.assertEqual(a.auth_ref['expires_at'], + self.TEST_RESPONSE_DICT['token']['expires_at']) + + def test_with_domain_and_project_scoping(self): + a = self._plugin(username='username', password='password', + project_id='project', domain_id='domain') + self.assertRaises(exceptions.AuthorizationFailure, + a.get_token, None) + + @httpretty.activate + def test_with_trust_id(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + s = self._session(username=self.TEST_USER, password=self.TEST_PASS, + trust_id='trust') + s.get_token() + + req = {'auth': {'identity': + {'methods': ['password'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + @httpretty.activate + def test_with_multiple_mechanisms_factory(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + s = self._session(username=self.TEST_USER, password=self.TEST_PASS, + trust_id='trust', token='foo') + s.get_token() + + req = {'auth': {'identity': + {'methods': ['password', 'token'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}, + 'token': {'id': 'foo'}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) + + @httpretty.activate + def test_with_multiple_mechanisms(self): + self.stub_auth(json=self.TEST_RESPONSE_DICT) + p = v3.PasswordMethod(username=self.TEST_USER, + password=self.TEST_PASS) + t = v3.TokenMethod(token='foo') + a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust') + s = session.Session(auth=a) + + s.get_token() + + req = {'auth': {'identity': + {'methods': ['password', 'token'], + 'password': {'user': {'name': self.TEST_USER, + 'password': self.TEST_PASS}}, + 'token': {'id': 'foo'}}, + 'scope': {'OS-TRUST:trust': {'id': 'trust'}}}} + self.assertRequestBodyIs(json=req) + self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN) diff --git a/keystoneclient/v3/client.py b/keystoneclient/v3/client.py index 244f82317..58df82b6c 100644 --- a/keystoneclient/v3/client.py +++ b/keystoneclient/v3/client.py @@ -15,6 +15,7 @@ import logging +from keystoneclient.auth.identity import v3 as v3_auth from keystoneclient import exceptions from keystoneclient import httpclient from keystoneclient.openstack.common import jsonutils @@ -102,7 +103,9 @@ class Client(httpclient.HTTPClient): self.users = users.UserManager(self) self.trusts = trusts.TrustManager(self) - if self.management_url is None: + # DEPRECATED: if session is passed then we go to the new behaviour of + # authenticating on the first required call. + if 'session' not in kwargs and self.management_url is None: self.authenticate() def serialize(self, entity): @@ -136,112 +139,35 @@ class Client(httpclient.HTTPClient): **kwargs): """Authenticate against the v3 Identity API. - :returns: (``resp``, ``body``) if authentication was successful. + :returns: access.AccessInfo if authentication was successful. :raises: AuthorizationFailure if unable to authenticate or validate the existing authorization token :raises: Unauthorized if authentication fails due to invalid token """ try: - return self._do_auth( - auth_url, - user_id=user_id, - username=username, - user_domain_id=user_domain_id, - user_domain_name=user_domain_name, - password=password, - domain_id=domain_id, - domain_name=domain_name, - project_id=project_id, - project_name=project_name, - project_domain_id=project_domain_id, - project_domain_name=project_domain_name, - token=token, - trust_id=trust_id) + if auth_url is None: + raise ValueError("Cannot authenticate without an auth_url") + + a = v3_auth.Auth.factory(auth_url, + username=username, + password=password, + token=token, + trust_id=trust_id, + user_id=user_id, + domain_id=domain_id, + domain_name=domain_name, + user_domain_id=user_domain_id, + user_domain_name=user_domain_name, + project_id=project_id, + project_name=project_name, + project_domain_id=project_domain_id, + project_domain_name=project_domain_name) + + return a.get_auth_ref(self.session) except (exceptions.AuthorizationFailure, exceptions.Unauthorized): _logger.debug('Authorization failed.') raise except Exception as e: raise exceptions.AuthorizationFailure('Authorization failed: ' '%s' % e) - - def _do_auth(self, auth_url, user_id=None, username=None, - user_domain_id=None, user_domain_name=None, password=None, - domain_id=None, domain_name=None, - project_id=None, project_name=None, project_domain_id=None, - project_domain_name=None, token=None, trust_id=None): - headers = {} - if auth_url is None: - raise ValueError("Cannot authenticate without a valid auth_url") - url = auth_url + "/auth/tokens" - body = {'auth': {'identity': {}}} - ident = body['auth']['identity'] - - if token: - headers['X-Auth-Token'] = token - - ident['methods'] = ['token'] - ident['token'] = {} - ident['token']['id'] = token - - if password: - ident['methods'] = ['password'] - ident['password'] = {} - ident['password']['user'] = {} - user = ident['password']['user'] - user['password'] = password - - if user_id: - user['id'] = user_id - elif username: - user['name'] = username - - if user_domain_id or user_domain_name: - user['domain'] = {} - if user_domain_id: - user['domain']['id'] = user_domain_id - elif user_domain_name: - user['domain']['name'] = user_domain_name - - if (domain_id or domain_name) and (project_id or project_name): - raise ValueError('Authentication cannot be scoped to both domain' - ' and project.') - - if domain_id or domain_name: - body['auth']['scope'] = {} - scope = body['auth']['scope'] - scope['domain'] = {} - - if domain_id: - scope['domain']['id'] = domain_id - elif domain_name: - scope['domain']['name'] = domain_name - - if project_id or project_name: - body['auth']['scope'] = {} - scope = body['auth']['scope'] - scope['project'] = {} - - if project_id: - scope['project']['id'] = project_id - elif project_name: - scope['project']['name'] = project_name - - if project_domain_id or project_domain_name: - scope['project']['domain'] = {} - if project_domain_id: - scope['project']['domain']['id'] = project_domain_id - elif project_domain_name: - scope['project']['domain']['name'] = project_domain_name - - if trust_id: - body['auth']['scope'] = {} - scope = body['auth']['scope'] - scope['OS-TRUST:trust'] = {} - scope['OS-TRUST:trust']['id'] = trust_id - - if not (ident or token): - raise ValueError('Authentication method required (e.g. password)') - - resp, body = self.request(url, 'POST', body=body, headers=headers) - return resp, body