openstacksdk/openstack/auth/identity/v3.py

224 lines
8.3 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import logging
import six
from openstack.auth import access
from openstack.auth.identity import base
from openstack import exceptions
_logger = logging.getLogger(__name__)
class Auth(base.BaseIdentityPlugin):
valid_options = [
'auth_url',
'domain_id',
'domain_name',
'password',
'project_domain_id',
'project_domain_name',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
'user_domain_id',
'user_domain_name',
'user_id',
'user_name',
]
def __init__(self, auth_url,
domain_id=None,
domain_name=None,
password='',
project_domain_id=None,
project_domain_name=None,
project_id=None,
project_name=None,
reauthenticate=True,
token=None,
trust_id=None,
user_domain_id=None,
user_domain_name=None,
user_id=None,
user_name=None):
"""Construct an Identity V3 Authentication Plugin.
This authorization plugin should be constructed with a password
and user_id or user_name. It may also be constructed with a token.
:param string auth_url: Identity service endpoint for authentication.
:param string domain_id: Domain ID for domain scoping.
:param string domain_name: Domain name for domain scoping.
:param string password: User password for authentication.
:param string project_domain_id: Project's domain ID for project.
:param string project_domain_name: Project's domain name for project.
:param string project_id: Project ID for project scoping.
:param string project_name: Project name for project scoping.
:param bool reauthenticate: Get new token if token expires.
:param string token: Token to use for authentication.
:param string trust_id: Trust ID for trust scoping.
:param string user_domain_id: User's domain ID for authentication.
:param string user_domain_name: User's domain name for authentication.
:param string user_name: User name for authentication.
:param string user_id: User ID for authentication.
:raises TypeError: if a user_id, user_name or token is not provided.
"""
super(Auth, self).__init__(auth_url=auth_url,
reauthenticate=reauthenticate)
if not (user_id or user_name or token):
msg = 'You need to specify either a user_name, user_id or token'
raise TypeError(msg)
self.domain_id = domain_id
self.domain_name = domain_name
self.project_domain_id = project_domain_id
self.project_domain_name = project_domain_name
self.project_id = project_id
self.project_name = project_name
self.reauthenticate = reauthenticate
self.trust_id = trust_id
self.password_method = PasswordMethod(
password=password,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
user_name=user_name,
user_id=user_id,
)
if token:
self.token_method = TokenMethod(token=token)
self.auth_methods = [self.token_method]
else:
self.auth_methods = [self.password_method]
@property
def token_url(self):
"""The full URL where we will send authentication data."""
return '%s/auth/tokens' % self.auth_url.rstrip('/')
def authorize(self, transport, **kwargs):
headers = {'Accept': 'application/json'}
body = {'auth': {'identity': {}}}
ident = body['auth']['identity']
for method in self.auth_methods:
name, auth_data = method.get_auth_data(transport, self, headers)
ident.setdefault('methods', []).append(name)
ident[name] = auth_data
if not ident:
raise exceptions.AuthorizationFailure('Authentication method '
'required (e.g. password)')
mutual_exclusion = [bool(self.domain_id or self.domain_name),
bool(self.project_id or self.project_name),
bool(self.trust_id)]
if sum(mutual_exclusion) > 1:
raise exceptions.AuthorizationFailure('Authentication cannot be '
'scoped to multiple '
'targets. Pick one of: '
'project, domain or trust')
if self.domain_id:
body['auth']['scope'] = {'domain': {'id': self.domain_id}}
elif self.domain_name:
body['auth']['scope'] = {'domain': {'name': self.domain_name}}
elif self.project_id:
body['auth']['scope'] = {'project': {'id': self.project_id}}
elif self.project_name:
scope = body['auth']['scope'] = {'project': {}}
scope['project']['name'] = self.project_name
if self.project_domain_id:
scope['project']['domain'] = {'id': self.project_domain_id}
elif self.project_domain_name:
scope['project']['domain'] = {'name': self.project_domain_name}
elif self.trust_id:
body['auth']['scope'] = {'OS-TRUST:trust': {'id': self.trust_id}}
_logger.debug('Making authentication request to %s', self.token_url)
resp = transport.post(self.token_url, json=body, headers=headers)
try:
resp_data = resp.json()['token']
except (KeyError, ValueError):
raise exceptions.InvalidResponse(response=resp)
return access.AccessInfoV3(resp.headers['X-Subject-Token'],
**resp_data)
def invalidate(self):
if super(Auth, self).invalidate():
self.auth_methods = [self.password_method]
return True
return False
@six.add_metaclass(abc.ABCMeta)
class AuthMethod(object):
"""One part of a V3 Authentication strategy.
V3 Tokens allow multiple methods to be presented when authentication
against the server. Each one of these methods is implemented by an
AuthMethod.
"""
def __init__(self, **kwargs):
for param in kwargs:
setattr(self, param, kwargs.get(param, None))
@abc.abstractmethod
def get_auth_data(self, transport, auth, headers, **kwargs):
"""Return the authentication section of an auth plugin.
:param Transport transport: The communication transport.
:param Auth auth: The auth plugin calling the method.
:param dict headers: The headers that will be sent with the auth
request if a plugin needs to add to them.
:return tuple(string, dict): The identifier of this plugin and a dict
of authentication data for the auth type.
"""
class PasswordMethod(AuthMethod):
def get_auth_data(self, transport, auth, headers, **kwargs):
user = {'password': self.password}
if self.user_id:
user['id'] = self.user_id
elif self.user_name:
user['name'] = self.user_name
if self.user_domain_id:
user['domain'] = {'id': self.user_domain_id}
elif self.user_domain_name:
user['domain'] = {'name': self.user_domain_name}
return 'password', {'user': user}
class TokenMethod(AuthMethod):
def get_auth_data(self, transport, auth, headers, **kwargs):
headers['X-Auth-Token'] = self.token
return 'token', {'id': self.token}