Revert to KSC auth plugins

Way back at the Summit I said I'd revert back to the standard
KSC auth plugins.  Well here, they are.

Change-Id: I8acc2be3f2ac7d10b6914f892d9f0bfe734496fd
This commit is contained in:
Terry Howe 2015-02-15 06:00:07 -07:00
parent 3361e7e86c
commit d0fefa03e1
8 changed files with 375 additions and 450 deletions

View File

@ -39,7 +39,10 @@ from openstack import exceptions
class Auth(base.BaseIdentityPlugin):
#: Valid options for this plugin
valid_options = list(set(v2.Auth.valid_options + v3.Auth.valid_options))
valid_options = list(set(v2.Password.valid_options +
v2.Token.valid_options +
v3.Password.valid_options +
v3.Token.valid_options))
def __init__(self, auth_url=None, **auth_args):
"""Construct an Identity Authentication Plugin.
@ -59,9 +62,15 @@ class Auth(base.BaseIdentityPlugin):
raise exceptions.AuthorizationFailure(msg)
endpoint_version = auth_url.split('v')[-1][0]
if endpoint_version == '2':
plugin = v2.Auth
if 'token' in auth_args:
plugin = v2.Token
else:
plugin = v2.Password
else:
plugin = v3.Auth
if 'token' in auth_args:
plugin = v3.Token
else:
plugin = v3.Password
valid_list = plugin.valid_options
args = dict((n, auth_args[n]) for n in valid_list if n in auth_args)
self.auth_plugin = plugin(auth_url, **args)

View File

@ -28,8 +28,11 @@ would also require a password. For example::
accessInfo = auth.authorize(xport)
"""
import abc
import logging
import six
from openstack.auth import access
from openstack.auth.identity import base
from openstack import exceptions
@ -37,73 +40,41 @@ from openstack import exceptions
_logger = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Auth(base.BaseIdentityPlugin):
#: Valid options for this plugin
valid_options = [
'access_info',
'auth_url',
'user_name',
'user_id',
'password',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
]
def __init__(self, auth_url,
access_info=None,
user_name=None,
user_id=None,
password='',
token=None,
trust_id=None,
project_id=None,
project_name=None,
reauthenticate=True,
trust_id=None):
tenant_id=None,
tenant_name=None,
reauthenticate=True):
"""Construct an Identity V2 Authentication Plugin.
A user_name, user_id or token must be provided. More detailed
information on some of the methods can be found in the base class
:class:`~openstack.auth.identity.base.BaseIdentityPlugin`.
:param string auth_url: Identity service endpoint for authorization.
:param string access_info: Access info from previous authentication.
:param string user_name: Username for authentication.
:param string user_id: User ID for authentication.
:param string password: Password for authentication.
:param string project_id: Tenant ID for project scoping.
:param string project_name: Tenant name for project scoping.
:param bool reauthenticate: Get new token if token expires.
:param string token: Existing token for authentication.
:param string trust_id: Trust ID for trust scoping.
:raises :class:`~openstack.exceptions.AuthorizationFailure`: if a
user_id, user_name or token is not provided.
:param string project_id: Project ID for scoping.
:param string project_name: Project name for scoping.
:param string tenant_id: Tenant ID for project scoping.
:param string tenant_name: Tenant name for project scoping.
:param bool reauthenticate: Allow fetching a new token if the current
one is going to expire.
(optional) default True
"""
super(Auth, self).__init__(auth_url=auth_url,
reauthenticate=reauthenticate)
if not (user_id or user_name or token):
msg = 'You need to specify either a user_name, user_id or token'
raise exceptions.AuthorizationFailure(msg)
self.access_info = access_info or None
self.user_id = user_id
self.user_name = user_name
self.password = password
self.token = token or None
self.trust_id = trust_id
self.tenant_id = project_id
if not self.tenant_id:
self.tenant_id = tenant_id
self.tenant_name = project_name
if not self.tenant_name:
self.tenant_name = tenant_name
def authorize(self, transport, **kwargs):
"""Obtain access information from an OpenStack Identity Service."""
if self.token and self.access_info:
return access.AccessInfoV2(**self.access_info)
headers = {'Accept': 'application/json'}
url = self.auth_url.rstrip('/') + '/tokens'
params = {'auth': self.get_auth_data(headers)}
@ -125,22 +96,88 @@ class Auth(base.BaseIdentityPlugin):
return access.AccessInfoV2(**resp_data)
def get_auth_data(self, headers):
"""Identity v2 token authentication data."""
if self.token is None:
auth = {'password': self.password}
if self.user_id:
auth['userId'] = self.user_id
elif self.user_name:
auth['username'] = self.user_name
return {'passwordCredentials': auth}
headers['X-Auth-Token'] = self.token
return {'token': {'id': self.token}}
@abc.abstractmethod
def get_auth_data(self, headers=None):
"""Return the authentication section of an auth plugin.
def invalidate(self):
"""Invalidate the current authentication data."""
if super(Auth, self).invalidate():
self.access_info = None
self.token = None
return True
return False
:param dict headers: The headers that will be sent with the auth
request if a plugin needs to add to them.
:return dict: A dict of authentication data for the auth type.
"""
class Password(Auth):
#: Valid options for Password plugin
valid_options = [
'access_info',
'auth_url',
'user_name',
'user_id',
'password',
'project_id',
'project_name',
'reauthenticate',
'trust_id',
]
def __init__(self, auth_url, user_name=None, password=None, user_id=None,
**kwargs):
"""A plugin for authenticating with a user_name and password.
A user_name or user_id must be provided.
:param string auth_url: Identity service endpoint for authorization.
:param string user_name: Username for authentication.
:param string password: Password for authentication.
:param string user_id: User ID for authentication.
:raises TypeError: if a user_id or user_name is not provided.
"""
super(Password, self).__init__(auth_url, **kwargs)
if not (user_id or user_name):
msg = 'You need to specify either a user_name or user_id'
raise TypeError(msg)
self.user_id = user_id
self.user_name = user_name
self.password = password
def get_auth_data(self, headers=None):
auth = {'password': self.password}
if self.user_name:
auth['username'] = self.user_name
elif self.user_id:
auth['userId'] = self.user_id
return {'passwordCredentials': auth}
class Token(Auth):
#: Valid options for this plugin
valid_options = [
'access_info',
'auth_url',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
]
def __init__(self, auth_url, token, **kwargs):
"""A plugin for authenticating with an existing token.
:param string auth_url: Identity service endpoint for authorization.
:param string token: Existing token for authentication.
"""
super(Token, self).__init__(auth_url, **kwargs)
self.token = token
def get_auth_data(self, headers=None):
if headers is not None:
headers['X-Auth-Token'] = self.token
return {'token': {'id': self.token}}

View File

@ -42,99 +42,42 @@ _logger = logging.getLogger(__name__)
class Auth(base.BaseIdentityPlugin):
#: Valid options for this plugin
valid_options = [
'access_info',
'auth_url',
'domain_id',
'domain_name',
'password',
'project_domain_id',
'project_domain_name',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
'user_domain_id',
'user_domain_name',
'user_id',
'user_name',
]
def __init__(self, auth_url,
access_info=None,
def __init__(self, auth_url, auth_methods,
trust_id=None,
domain_id=None,
domain_name=None,
password='',
project_domain_id=None,
project_domain_name=None,
project_id=None,
project_name=None,
reauthenticate=True,
token=None,
trust_id=None,
user_domain_id=None,
user_domain_name=None,
user_id=None,
user_name=None):
project_domain_id=None,
project_domain_name=None,
reauthenticate=True):
"""Construct an Identity V3 Authentication Plugin.
This authorization plugin should be constructed with a password
and user_id or user_name. It may also be constructed with a token.
More detailed information on some of the methods can be found in the
base class :class:`~openstack.auth.identity.base.BaseIdentityPlugin`.
:param string auth_url: Identity service endpoint for authentication.
:param string access_info: Access info including service catalog.
:param list auth_methods: A collection of methods to authenticate with.
:param string trust_id: Trust ID for trust scoping.
:param string domain_id: Domain ID for domain scoping.
:param string domain_name: Domain name for domain scoping.
:param string password: User password for authentication.
:param string project_domain_id: Project's domain ID for project.
:param string project_domain_name: Project's domain name for project.
:param string project_id: Project ID for project scoping.
:param string project_name: Project name for project scoping.
:param bool reauthenticate: Get new token if token expires.
:param string token: Token to use for authentication.
:param string trust_id: Trust ID for trust scoping.
:param string user_domain_id: User's domain ID for authentication.
:param string user_domain_name: User's domain name for authentication.
:param string user_name: User name for authentication.
:param string user_id: User ID for authentication.
:raises :class:`~openstack.exceptions.AuthorizationFailure`: if a
user_id, user_name or token is not provided.
:param string project_domain_id: Project's domain ID for project.
:param string project_domain_name: Project's domain name for project.
:param bool reauthenticate: Allow fetching a new token if the current
one is going to expire.
(optional) default True
"""
super(Auth, self).__init__(auth_url=auth_url,
reauthenticate=reauthenticate)
if not (user_id or user_name or token):
msg = 'You need to specify either a user_name, user_id or token'
raise exceptions.AuthorizationFailure(msg)
self.access_info = access_info
self.auth_methods = auth_methods
self.trust_id = trust_id
self.domain_id = domain_id
self.domain_name = domain_name
self.project_domain_id = project_domain_id
self.project_domain_name = project_domain_name
self.project_id = project_id
self.project_name = project_name
self.reauthenticate = reauthenticate
self.trust_id = trust_id
self.password_method = PasswordMethod(
password=password,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
user_name=user_name,
user_id=user_id,
)
if token:
self.token_method = TokenMethod(token=token)
self.auth_methods = [self.token_method]
else:
self.token_method = None
self.auth_methods = [self.password_method]
self.project_domain_id = project_domain_id
self.project_domain_name = project_domain_name
@property
def token_url(self):
@ -147,10 +90,6 @@ class Auth(base.BaseIdentityPlugin):
body = {'auth': {'identity': {}}}
ident = body['auth']['identity']
if self.token_method and self.access_info:
return access.AccessInfoV3(self.token_method.token,
**self.access_info)
for method in self.auth_methods:
name, auth_data = method.get_auth_data(transport, self, headers)
ident.setdefault('methods', []).append(name)
@ -198,14 +137,6 @@ class Auth(base.BaseIdentityPlugin):
return access.AccessInfoV3(resp.headers['X-Subject-Token'],
**resp_data)
def invalidate(self):
"""Invalidate the current authentication data."""
if super(Auth, self).invalidate():
self.auth_methods = [self.password_method]
self.access_info = None
return True
return False
@six.add_metaclass(abc.ABCMeta)
class AuthMethod(object):
@ -214,10 +145,27 @@ class AuthMethod(object):
V3 Tokens allow multiple methods to be presented when authentication
against the server. Each one of these methods is implemented by an
AuthMethod.
Note: When implementing an AuthMethod use the method_parameters
and do not use positional arguments. Otherwise they can't be picked up by
the factory method and don't work as well with AuthConstructors.
"""
_method_parameters = []
def __init__(self, **kwargs):
for param in kwargs:
setattr(self, param, kwargs.get(param, None))
for param in self._method_parameters:
setattr(self, param, kwargs.pop(param, None))
if kwargs:
msg = "Unexpected Attributes: %s" % ", ".join(kwargs.keys())
raise AttributeError(msg)
@classmethod
def _extract_kwargs(cls, kwargs):
"""Remove parameters related to this method from other kwargs."""
return dict([(p, kwargs.pop(p, None))
for p in cls._method_parameters])
@abc.abstractmethod
def get_auth_data(self, transport, auth, headers, **kwargs):
@ -232,13 +180,46 @@ class AuthMethod(object):
"""
class PasswordMethod(AuthMethod):
"""Identity v3 password authentication method.
@six.add_metaclass(abc.ABCMeta)
class _AuthConstructor(Auth):
"""AuthConstructor creates an authentication plugin with one method.
The identity v3 authorization password method derived from
:class:`~openstack.auth.identity.v3.AuthMethod`.
AuthConstructor is a means of creating an authentication plugin that
contains only one authentication method. This is generally the required
usage.
An AuthConstructor creates an AuthMethod based on the method's
arguments and the auth_method_class defined by the plugin. It then
creates the auth plugin with only that authentication method.
"""
_auth_method_class = None
def __init__(self, auth_url, *args, **kwargs):
method_kwargs = self._auth_method_class._extract_kwargs(kwargs)
method = self._auth_method_class(*args, **method_kwargs)
super(_AuthConstructor, self).__init__(auth_url, [method], **kwargs)
class PasswordMethod(AuthMethod):
_method_parameters = ['user_id',
'user_name',
'user_domain_id',
'user_domain_name',
'password']
def __init__(self, **kwargs):
"""Construct a User/Password based authentication method.
:param string password: Password for authentication.
:param string user_name: Username for authentication.
:param string user_id: User ID for authentication.
:param string user_domain_id: User's domain ID for authentication.
:param string user_domain_name: User's domain name for authentication.
"""
super(PasswordMethod, self).__init__(**kwargs)
def get_auth_data(self, transport, auth, headers, **kwargs):
"""Identity v3 password authentication data."""
user = {'password': self.password}
@ -256,14 +237,66 @@ class PasswordMethod(AuthMethod):
return 'password', {'user': user}
class TokenMethod(AuthMethod):
"""Identity v3 token authentication method.
class Password(_AuthConstructor):
The identity v3 authorization token method derived from
:class:`~openstack.auth.identity.v3.AuthMethod`.
"""
#: Valid options for this plugin
valid_options = [
'access_info',
'auth_url',
'domain_id',
'domain_name',
'password',
'project_domain_id',
'project_domain_name',
'project_id',
'project_name',
'reauthenticate',
'trust_id',
'user_domain_id',
'user_domain_name',
'user_id',
'user_name',
]
_auth_method_class = PasswordMethod
class TokenMethod(AuthMethod):
_method_parameters = ['token']
def __init__(self, **kwargs):
"""Construct an Auth plugin to fetch a token from a token.
:param string token: Token for authentication.
"""
super(TokenMethod, self).__init__(**kwargs)
def get_auth_data(self, transport, auth, headers, **kwargs):
"""Identity v3 token authentication data."""
headers['X-Auth-Token'] = self.token
return 'token', {'id': self.token}
class Token(_AuthConstructor):
#: Valid options for this plugin
valid_options = [
'access_info',
'auth_url',
'domain_id',
'domain_name',
'project_domain_id',
'project_domain_name',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
'user_domain_id',
'user_domain_name',
]
_auth_method_class = TokenMethod
def __init__(self, auth_url, token, **kwargs):
super(Token, self).__init__(auth_url, token=token, **kwargs)

View File

@ -24,21 +24,13 @@ TEST_RESPONSE_DICT = common.TEST_RESPONSE_DICT_V2
class TestV2Auth(testtools.TestCase):
def test_missing_args(self):
with testtools.ExpectedException(exceptions.AuthorizationFailure):
v2.Auth(TEST_URL)
def test_password(self):
kargs = {
'password': common.TEST_PASS,
'project_id': common.TEST_TENANT_ID,
'project_name': common.TEST_TENANT_NAME,
'trust_id': common.TEST_TRUST_ID,
'user_name': common.TEST_USER,
}
kargs = {'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_TENANT_ID,
'project_name': common.TEST_TENANT_NAME}
sot = v2.Auth(TEST_URL, **kargs)
sot = v2.Password(TEST_URL, common.TEST_USER, common.TEST_PASS,
**kargs)
self.assertEqual(common.TEST_USER, sot.user_name)
self.assertEqual(common.TEST_PASS, sot.password)
@ -47,65 +39,21 @@ class TestV2Auth(testtools.TestCase):
self.assertEqual(common.TEST_TENANT_NAME, sot.tenant_name)
expected = {'passwordCredentials': {'password': common.TEST_PASS,
'username': common.TEST_USER}}
headers = {}
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({}, headers)
def test_empty_token(self):
kargs = {
'password': common.TEST_PASS,
'token': '',
'user_name': common.TEST_USER,
}
sot = v2.Auth(TEST_URL, **kargs)
self.assertEqual(common.TEST_USER, sot.user_name)
self.assertEqual(common.TEST_PASS, sot.password)
self.assertEqual(None, sot.token)
self.assertEqual(expected, sot.get_auth_data())
def test_token(self):
kargs = {
'project_id': common.TEST_TENANT_ID,
'project_name': common.TEST_TENANT_NAME,
'token': common.TEST_TOKEN,
'trust_id': common.TEST_TRUST_ID,
}
kargs = {'trust_id': common.TEST_TRUST_ID,
'tenant_id': common.TEST_TENANT_ID,
'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Auth(TEST_URL, **kargs)
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
self.assertEqual(common.TEST_TOKEN, sot.token)
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(common.TEST_TENANT_ID, sot.tenant_id)
self.assertEqual(common.TEST_TENANT_NAME, sot.tenant_name)
expected = {'token': {'id': common.TEST_TOKEN}}
headers = {}
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({'X-Auth-Token': common.TEST_TOKEN}, headers)
def test_user_id(self):
kargs = {
'password': common.TEST_PASS,
'project_id': common.TEST_TENANT_ID,
'project_name': common.TEST_TENANT_NAME,
'trust_id': common.TEST_TRUST_ID,
'user_name': common.TEST_USER,
'user_id': common.TEST_USER_ID,
}
sot = v2.Auth(TEST_URL, **kargs)
self.assertEqual(common.TEST_USER, sot.user_name)
self.assertEqual(common.TEST_USER_ID, sot.user_id)
self.assertEqual(common.TEST_PASS, sot.password)
self.assertEqual(common.TEST_TRUST_ID, sot.trust_id)
self.assertEqual(common.TEST_TENANT_ID, sot.tenant_id)
self.assertEqual(common.TEST_TENANT_NAME, sot.tenant_name)
expected = {'passwordCredentials': {'password': common.TEST_PASS,
'userId': common.TEST_USER_ID}}
headers = {}
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({}, headers)
self.assertEqual(expected, sot.get_auth_data())
def create_mock_transport(self, xresp):
transport = mock.Mock()
@ -117,13 +65,10 @@ class TestV2Auth(testtools.TestCase):
return transport
def test_authorize_tenant_id(self):
kargs = {
'project_id': common.TEST_TENANT_ID,
'project_name': common.TEST_TENANT_NAME,
'token': common.TEST_TOKEN,
'trust_id': common.TEST_TRUST_ID,
}
sot = v2.Auth(TEST_URL, **kargs)
kargs = {'trust_id': common.TEST_TRUST_ID,
'tenant_id': common.TEST_TENANT_ID,
'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
@ -140,11 +85,8 @@ class TestV2Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_tenant_name(self):
kargs = {
'project_name': common.TEST_TENANT_NAME,
'token': common.TEST_TOKEN,
}
sot = v2.Auth(TEST_URL, **kargs)
kargs = {'tenant_name': common.TEST_TENANT_NAME}
sot = v2.Token(TEST_URL, common.TEST_TOKEN, **kargs)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
@ -160,8 +102,7 @@ class TestV2Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_only(self):
kargs = {'token': common.TEST_TOKEN}
sot = v2.Auth(TEST_URL, **kargs)
sot = v2.Token(TEST_URL, common.TEST_TOKEN)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
@ -175,61 +116,8 @@ class TestV2Auth(testtools.TestCase):
ecatalog['version'] = 'v2.0'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_access_info(self):
ecatalog = TEST_RESPONSE_DICT['access'].copy()
ecatalog['version'] = 'v2.0'
kargs = {
'access_info': ecatalog,
'token': common.TEST_TOKEN,
}
sot = v2.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(TEST_RESPONSE_DICT)
resp = sot.authorize(xport)
self.assertEqual(ecatalog, resp._info)
def test_authorize_bad_response(self):
kargs = {'token': common.TEST_TOKEN}
sot = v2.Auth(TEST_URL, **kargs)
sot = v2.Token(TEST_URL, common.TEST_TOKEN)
xport = self.create_mock_transport({})
self.assertRaises(exceptions.InvalidResponse, sot.authorize, xport)
def test_invalidate(self):
kargs = {
'access_info': {'a': 'b'},
'password': common.TEST_PASS,
'token': common.TEST_TOKEN,
'user_name': common.TEST_USER,
}
sot = v2.Auth(TEST_URL, **kargs)
expected = {'token': {'id': common.TEST_TOKEN}}
headers = {}
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({'X-Auth-Token': common.TEST_TOKEN}, headers)
self.assertEqual(True, sot.invalidate())
expected = {'passwordCredentials': {'password': common.TEST_PASS,
'username': common.TEST_USER}}
headers = {}
self.assertEqual(None, sot.token)
self.assertEqual(None, sot.access_info)
self.assertEqual(expected, sot.get_auth_data(headers))
self.assertEqual({}, headers)
def test_valid_options(self):
expected = [
'access_info',
'auth_url',
'user_name',
'user_id',
'password',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
]
self.assertEqual(expected, v2.Auth.valid_options)

View File

@ -21,23 +21,17 @@ TEST_URL = 'http://127.0.0.1:5000/v3.0'
class TestV3Auth(testtools.TestCase):
def test_missing_args(self):
with testtools.ExpectedException(exceptions.AuthorizationFailure):
v3.Auth(TEST_URL)
def test_password_user_domain(self):
kargs = {
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME,
'user_name': common.TEST_USER,
'user_id': common.TEST_USER_ID,
'user_domain_id': common.TEST_DOMAIN_ID,
'user_domain_name': common.TEST_DOMAIN_NAME,
'password': common.TEST_PASS,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
method = v3.PasswordMethod(user_name=common.TEST_USER,
user_id=common.TEST_USER_ID,
user_domain_id=common.TEST_DOMAIN_ID,
user_domain_name=common.TEST_DOMAIN_NAME,
password=common.TEST_PASS)
sot = v3.Auth(TEST_URL, [method], **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
@ -59,17 +53,16 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(TEST_URL + '/auth/tokens', sot.token_url)
def test_password_domain(self):
kargs = {
'domain_id': common.TEST_DOMAIN_ID,
'domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME,
'user_name': common.TEST_USER,
'user_id': common.TEST_USER_ID,
'password': common.TEST_PASS,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'domain_id': common.TEST_DOMAIN_ID,
'domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
methods = [v3.PasswordMethod(user_name=common.TEST_USER,
user_id=common.TEST_USER_ID,
password=common.TEST_PASS)]
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
@ -91,15 +84,14 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(TEST_URL + '/auth/tokens', sot.token_url)
def test_token_project_domain(self):
kargs = {
'project_domain_id': common.TEST_DOMAIN_ID,
'project_domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'project_domain_id': common.TEST_DOMAIN_ID,
'project_domain_name': common.TEST_DOMAIN_NAME,
'trust_id': common.TEST_TRUST_ID,
'project_id': common.TEST_PROJECT_ID,
'project_name': common.TEST_PROJECT_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
@ -126,8 +118,8 @@ class TestV3Auth(testtools.TestCase):
return transport
def test_authorize_token(self):
kargs = {'token': common.TEST_TOKEN}
sot = v3.Auth(TEST_URL, **kargs)
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -143,27 +135,10 @@ class TestV3Auth(testtools.TestCase):
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_access_info(self):
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
kargs = {
'access_info': ecatalog,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
ecatalog['auth_token'] = common.TEST_TOKEN
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_domain_id(self):
kargs = {
'domain_id': common.TEST_DOMAIN_ID,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'domain_id': common.TEST_DOMAIN_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -181,11 +156,9 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_domain_name(self):
kargs = {
'domain_name': common.TEST_DOMAIN_NAME,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'domain_name': common.TEST_DOMAIN_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -204,11 +177,9 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_id(self):
kargs = {
'project_id': common.TEST_PROJECT_ID,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'project_id': common.TEST_PROJECT_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -227,12 +198,10 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_name(self):
kargs = {
'project_name': common.TEST_PROJECT_NAME,
'project_domain_id': common.TEST_DOMAIN_ID,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'project_name': common.TEST_PROJECT_NAME,
'project_domain_id': common.TEST_DOMAIN_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -253,12 +222,10 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_project_name_domain_name(self):
kargs = {
'project_name': common.TEST_PROJECT_NAME,
'project_domain_name': common.TEST_DOMAIN_NAME,
'token': common.TEST_TOKEN,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'project_name': common.TEST_PROJECT_NAME,
'project_domain_name': common.TEST_DOMAIN_NAME}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -279,11 +246,9 @@ class TestV3Auth(testtools.TestCase):
self.assertEqual(ecatalog, resp._info)
def test_authorize_token_trust_id(self):
kargs = {
'token': common.TEST_TOKEN,
'trust_id': common.TEST_TRUST_ID,
}
sot = v3.Auth(TEST_URL, **kargs)
kargs = {'trust_id': common.TEST_TRUST_ID}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods, **kargs)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
@ -301,74 +266,55 @@ class TestV3Auth(testtools.TestCase):
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_multi_method(self):
methods = [v3.PasswordMethod(user_name=common.TEST_USER,
password=common.TEST_PASS),
v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
xport = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
resp = sot.authorize(xport)
eurl = TEST_URL + '/auth/tokens'
eheaders = {'Accept': 'application/json',
'X-Auth-Token': common.TEST_TOKEN}
up = {'password': common.TEST_PASS, 'name': common.TEST_USER}
ejson = {'auth': {'identity': {'token': {'id': common.TEST_TOKEN},
'password': {'user': up},
'methods': ['password', 'token']}}}
xport.post.assert_called_with(eurl, headers=eheaders, json=ejson)
ecatalog = common.TEST_RESPONSE_DICT_V3['token'].copy()
ecatalog['auth_token'] = common.TEST_SUBJECT
ecatalog['version'] = 'v3'
self.assertEqual(ecatalog, resp._info)
def test_authorize_mutually_exclusive(self):
x = self.create_mock_transport(common.TEST_RESPONSE_DICT_V3)
kargs = {'token': common.TEST_TOKEN}
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, **kargs)
sot.domain_id = 'a'
sot.project_id = 'b'
kargs = {'domain_id': 'a',
'project_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
sot = v3.Auth(TEST_URL, **kargs)
sot.domain_name = 'a'
sot.project_name = 'b'
kargs = {'domain_name': 'a',
'project_name': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
sot = v3.Auth(TEST_URL, **kargs)
sot.domain_name = 'a'
sot.trust_id = 'b'
kargs = {'domain_name': 'a',
'trust_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
sot = v3.Auth(TEST_URL, **kargs)
sot.project_id = 'a'
sot.trust_id = 'b'
kargs = {'project_id': 'a',
'trust_id': 'b'}
sot = v3.Auth(TEST_URL, methods, **kargs)
self.assertRaises(exceptions.AuthorizationFailure, sot.authorize, x)
def test_authorize_bad_response(self):
kargs = {'token': common.TEST_TOKEN}
sot = v3.Auth(TEST_URL, **kargs)
methods = [v3.TokenMethod(token=common.TEST_TOKEN)]
sot = v3.Auth(TEST_URL, methods)
xport = self.create_mock_transport({})
self.assertRaises(exceptions.InvalidResponse, sot.authorize, xport)
def test_invalidate(self):
kargs = {
'user_name': common.TEST_USER,
'password': common.TEST_PASS,
'token': common.TEST_TOKEN,
'access_info': {},
}
sot = v3.Auth(TEST_URL, **kargs)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_TOKEN, auther.token)
self.assertEqual(True, sot.invalidate())
self.assertEqual(None, sot.access_info)
self.assertEqual(1, len(sot.auth_methods))
auther = sot.auth_methods[0]
self.assertEqual(common.TEST_USER, auther.user_name)
self.assertEqual(common.TEST_PASS, auther.password)
def test_valid_options(self):
expected = [
'access_info',
'auth_url',
'domain_id',
'domain_name',
'password',
'project_domain_id',
'project_domain_name',
'project_id',
'project_name',
'reauthenticate',
'token',
'trust_id',
'user_domain_id',
'user_domain_name',
'user_id',
'user_name',
]
self.assertEqual(expected, v3.Auth.valid_options)

View File

@ -26,7 +26,7 @@ class TestConnection(base.TestCase):
def setUp(self):
super(TestConnection, self).setUp()
self.xport = transport.Transport()
self.auth = v2.Auth(auth_url='http://127.0.0.1/v2', token='b')
self.auth = v2.Token(auth_url='http://127.0.0.1/v2', token='b')
self.pref = user_preference.UserPreference()
self.conn = connection.Connection(authenticator=mock.MagicMock(),
transport=mock.MagicMock())
@ -44,7 +44,8 @@ class TestConnection(base.TestCase):
'user_name': '1',
'password': '2',
}
conn = connection.Connection(transport='0', auth_plugin='identity_v2',
conn = connection.Connection(transport='0',
auth_plugin='identity_v2_password',
**auth_args)
self.assertEqual('0', conn.authenticator.auth_url)
self.assertEqual('1', conn.authenticator.user_name)
@ -56,11 +57,12 @@ class TestConnection(base.TestCase):
'user_name': '1',
'password': '2',
}
conn = connection.Connection(transport='0', auth_plugin='identity_v3',
conn = connection.Connection(transport='0',
auth_plugin='identity_v3_password',
**auth_args)
self.assertEqual('0', conn.authenticator.auth_url)
self.assertEqual('1', conn.authenticator.password_method.user_name)
self.assertEqual('2', conn.authenticator.password_method.password)
self.assertEqual('1', conn.authenticator.auth_methods[0].user_name)
self.assertEqual('2', conn.authenticator.auth_methods[0].password)
def test_create_authenticator_discoverable(self):
auth_args = {
@ -73,10 +75,10 @@ class TestConnection(base.TestCase):
self.assertEqual('0', conn.authenticator.auth_url)
self.assertEqual(
'1',
conn.authenticator.auth_plugin.password_method.user_name)
conn.authenticator.auth_plugin.auth_methods[0].user_name)
self.assertEqual(
'2',
conn.authenticator.auth_plugin.password_method.password)
conn.authenticator.auth_plugin.auth_methods[0].password)
def test_create_authenticator_no_name(self):
auth_args = {

View File

@ -17,11 +17,17 @@ from openstack.tests import base
class TestModuleLoader(base.TestCase):
def test_load_identity_v2(self):
plugin = module_loader.ModuleLoader().get_auth_plugin('identity_v2')
loader = module_loader.ModuleLoader()
plugin = loader.get_auth_plugin('identity_v2_password')
self.assertEqual('openstack.auth.identity.v2', str(plugin.__module__))
plugin = loader.get_auth_plugin('identity_v2_token')
self.assertEqual('openstack.auth.identity.v2', str(plugin.__module__))
def test_load_identity_v3(self):
plugin = module_loader.ModuleLoader().get_auth_plugin('identity_v3')
loader = module_loader.ModuleLoader()
plugin = loader.get_auth_plugin('identity_v3_password')
self.assertEqual('openstack.auth.identity.v3', str(plugin.__module__))
plugin = loader.get_auth_plugin('identity_v3_token')
self.assertEqual('openstack.auth.identity.v3', str(plugin.__module__))
def test_load_identity_discoverable(self):
@ -45,4 +51,6 @@ class TestModuleLoader(base.TestCase):
def test_list_auth_plugins(self):
plugins = sorted(module_loader.ModuleLoader().list_auth_plugins())
self.assertEqual(['identity', 'identity_v2', 'identity_v3'], plugins)
expected = ['identity', 'identity_v2_password', 'identity_v2_token',
'identity_v3_password', 'identity_v3_token']
self.assertEqual(expected, plugins)

View File

@ -51,6 +51,8 @@ universal = 1
[entry_points]
openstack.auth.plugin =
identity_v2 = openstack.auth.identity.v2:Auth
identity_v3 = openstack.auth.identity.v3:Auth
identity_v2_password = openstack.auth.identity.v2:Password
identity_v2_token = openstack.auth.identity.v2:Token
identity_v3_password = openstack.auth.identity.v3:Password
identity_v3_token = openstack.auth.identity.v3:Token
identity = openstack.auth.identity.discoverable:Auth