Merge "Privatize auth construction parameters"
This commit is contained in:
@@ -25,7 +25,7 @@ from keystoneclient import exceptions
|
||||
class Auth(base.BaseIdentityPlugin):
|
||||
|
||||
@staticmethod
|
||||
def factory(auth_url, **kwargs):
|
||||
def _factory(auth_url, **kwargs):
|
||||
"""Construct a plugin appropriate to your available arguments.
|
||||
|
||||
This function should only be used for loading authentication from a
|
||||
|
@@ -104,7 +104,7 @@ class Auth(base.BaseIdentityPlugin):
|
||||
**resp.json()['token'])
|
||||
|
||||
@staticmethod
|
||||
def factory(auth_url, **kwargs):
|
||||
def _factory(auth_url, **kwargs):
|
||||
"""Construct a plugin appropriate to your available arguments.
|
||||
|
||||
This function is intended as a convenience and backwards compatibility.
|
||||
@@ -116,11 +116,11 @@ class Auth(base.BaseIdentityPlugin):
|
||||
|
||||
# NOTE(jamielennox): kwargs extraction is outside the if statement to
|
||||
# clear up additional args that might be passed but not valid for type.
|
||||
method_kwargs = PasswordMethod.extract_kwargs(kwargs)
|
||||
method_kwargs = PasswordMethod._extract_kwargs(kwargs)
|
||||
if method_kwargs.get('password'):
|
||||
methods.append(PasswordMethod(**method_kwargs))
|
||||
|
||||
method_kwargs = TokenMethod.extract_kwargs(kwargs)
|
||||
method_kwargs = TokenMethod._extract_kwargs(kwargs)
|
||||
if method_kwargs.get('token'):
|
||||
methods.append(TokenMethod(**method_kwargs))
|
||||
|
||||
@@ -144,10 +144,10 @@ class AuthMethod(object):
|
||||
the factory method and don't work as well with AuthConstructors.
|
||||
"""
|
||||
|
||||
method_parameters = []
|
||||
_method_parameters = []
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
for param in self.method_parameters:
|
||||
for param in self._method_parameters:
|
||||
setattr(self, param, kwargs.pop(param, None))
|
||||
|
||||
if kwargs:
|
||||
@@ -155,10 +155,10 @@ class AuthMethod(object):
|
||||
raise AttributeError(msg)
|
||||
|
||||
@classmethod
|
||||
def extract_kwargs(cls, kwargs):
|
||||
def _extract_kwargs(cls, kwargs):
|
||||
"""Remove parameters related to this method from other kwargs."""
|
||||
return dict([(p, kwargs.pop(p, None))
|
||||
for p in cls.method_parameters])
|
||||
for p in cls._method_parameters])
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_auth_data(self, headers=None):
|
||||
@@ -172,7 +172,7 @@ class AuthMethod(object):
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class AuthConstructor(Auth):
|
||||
class _AuthConstructor(Auth):
|
||||
"""AuthConstructor is a means of creating an Auth Plugin that contains
|
||||
only one authentication method. This is generally the required usage.
|
||||
|
||||
@@ -181,21 +181,21 @@ class AuthConstructor(Auth):
|
||||
creates the auth plugin with only that authentication method.
|
||||
"""
|
||||
|
||||
auth_method_class = None
|
||||
_auth_method_class = None
|
||||
|
||||
def __init__(self, auth_url, *args, **kwargs):
|
||||
method_kwargs = self.auth_method_class.extract_kwargs(kwargs)
|
||||
method = self.auth_method_class(*args, **method_kwargs)
|
||||
super(AuthConstructor, self).__init__(auth_url, [method], **kwargs)
|
||||
method_kwargs = self._auth_method_class._extract_kwargs(kwargs)
|
||||
method = self._auth_method_class(*args, **method_kwargs)
|
||||
super(_AuthConstructor, self).__init__(auth_url, [method], **kwargs)
|
||||
|
||||
|
||||
class PasswordMethod(AuthMethod):
|
||||
|
||||
method_parameters = ['user_id',
|
||||
'username',
|
||||
'user_domain_id',
|
||||
'user_domain_name',
|
||||
'password']
|
||||
_method_parameters = ['user_id',
|
||||
'username',
|
||||
'user_domain_id',
|
||||
'user_domain_name',
|
||||
'password']
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Construct a User/Password based authentication method.
|
||||
@@ -224,13 +224,13 @@ class PasswordMethod(AuthMethod):
|
||||
return 'password', {'user': user}
|
||||
|
||||
|
||||
class Password(AuthConstructor):
|
||||
auth_method_class = PasswordMethod
|
||||
class Password(_AuthConstructor):
|
||||
_auth_method_class = PasswordMethod
|
||||
|
||||
|
||||
class TokenMethod(AuthMethod):
|
||||
|
||||
method_parameters = ['token']
|
||||
_method_parameters = ['token']
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Construct a Auth plugin to fetch a token from a token.
|
||||
@@ -244,8 +244,8 @@ class TokenMethod(AuthMethod):
|
||||
return 'token', {'id': self.token}
|
||||
|
||||
|
||||
class Token(AuthConstructor):
|
||||
auth_method_class = TokenMethod
|
||||
class Token(_AuthConstructor):
|
||||
_auth_method_class = TokenMethod
|
||||
|
||||
def __init__(self, auth_url, token, **kwargs):
|
||||
super(Token, self).__init__(auth_url, token=token, **kwargs)
|
||||
|
@@ -49,20 +49,15 @@ class V2IdentityPlugin(utils.TestCase):
|
||||
},
|
||||
}
|
||||
|
||||
def _plugin(self, auth_url=TEST_URL, **kwargs):
|
||||
return v2.Auth.factory(auth_url, **kwargs)
|
||||
|
||||
def _session(self, **kwargs):
|
||||
return session.Session(auth=self._plugin(**kwargs))
|
||||
|
||||
def stub_auth(self, **kwargs):
|
||||
self.stub_url(httpretty.POST, ['tokens'], **kwargs)
|
||||
|
||||
@httpretty.activate
|
||||
def test_authenticate_with_username_password(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
s = self._session(username=self.TEST_USER, password=self.TEST_PASS)
|
||||
self.assertIsInstance(s.auth, v2.Password)
|
||||
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
|
||||
password=self.TEST_PASS)
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
|
||||
@@ -73,9 +68,9 @@ class V2IdentityPlugin(utils.TestCase):
|
||||
@httpretty.activate
|
||||
def test_authenticate_with_username_password_scoped(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
|
||||
tenant_id=self.TEST_TENANT_ID)
|
||||
self.assertIsInstance(s.auth, v2.Password)
|
||||
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
|
||||
password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
|
||||
@@ -87,8 +82,8 @@ class V2IdentityPlugin(utils.TestCase):
|
||||
@httpretty.activate
|
||||
def test_authenticate_with_token(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
s = self._session(token='foo')
|
||||
self.assertIsInstance(s.auth, v2.Token)
|
||||
a = v2.Token(self.TEST_URL, 'foo')
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'token': {'id': 'foo'}}}
|
||||
@@ -97,13 +92,15 @@ class V2IdentityPlugin(utils.TestCase):
|
||||
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
|
||||
def test_missing_auth_params(self):
|
||||
self.assertRaises(exceptions.NoMatchingPlugin, self._plugin)
|
||||
self.assertRaises(exceptions.NoMatchingPlugin, v2.Auth._factory,
|
||||
self.TEST_URL)
|
||||
|
||||
@httpretty.activate
|
||||
def test_with_trust_id(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
|
||||
trust_id='trust')
|
||||
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
|
||||
password=self.TEST_PASS, trust_id='trust')
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
|
||||
|
@@ -65,12 +65,6 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
},
|
||||
}
|
||||
|
||||
def _plugin(self, auth_url=TEST_URL, **kwargs):
|
||||
return v3.Auth.factory(auth_url, **kwargs)
|
||||
|
||||
def _session(self, **kwargs):
|
||||
return session.Session(auth=self._plugin(**kwargs))
|
||||
|
||||
def stub_auth(self, subject_token=None, **kwargs):
|
||||
if not subject_token:
|
||||
subject_token = self.TEST_TOKEN
|
||||
@@ -99,8 +93,9 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
@httpretty.activate
|
||||
def test_authenticate_with_username_password_domain_scoped(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
|
||||
domain_id=self.TEST_DOMAIN_ID)
|
||||
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
|
||||
password=self.TEST_PASS, domain_id=self.TEST_DOMAIN_ID)
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'identity':
|
||||
@@ -114,8 +109,10 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
@httpretty.activate
|
||||
def test_authenticate_with_username_password_project_scoped(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
|
||||
project_id=self.TEST_DOMAIN_ID)
|
||||
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
|
||||
password=self.TEST_PASS,
|
||||
project_id=self.TEST_DOMAIN_ID)
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'identity':
|
||||
@@ -142,7 +139,8 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
|
||||
def test_missing_auth_params(self):
|
||||
self.assertRaises(exceptions.AuthorizationFailure, self._plugin)
|
||||
self.assertRaises(exceptions.AuthorizationFailure, v3.Auth._factory,
|
||||
self.TEST_URL)
|
||||
|
||||
@httpretty.activate
|
||||
def test_with_expired(self):
|
||||
@@ -151,7 +149,8 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
d = copy.deepcopy(self.TEST_RESPONSE_DICT)
|
||||
d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z'
|
||||
|
||||
a = self._plugin(username='username', password='password')
|
||||
a = v3.Password(self.TEST_URL, username='username',
|
||||
password='password')
|
||||
a.auth_ref = access.AccessInfo.factory(body=d)
|
||||
s = session.Session(auth=a)
|
||||
|
||||
@@ -161,16 +160,18 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
self.TEST_RESPONSE_DICT['token']['expires_at'])
|
||||
|
||||
def test_with_domain_and_project_scoping(self):
|
||||
a = self._plugin(username='username', password='password',
|
||||
project_id='project', domain_id='domain')
|
||||
a = v3.Password(self.TEST_URL, username='username',
|
||||
password='password', project_id='project',
|
||||
domain_id='domain')
|
||||
self.assertRaises(exceptions.AuthorizationFailure,
|
||||
a.get_token, None)
|
||||
|
||||
@httpretty.activate
|
||||
def test_with_trust_id(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
|
||||
trust_id='trust')
|
||||
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
|
||||
password=self.TEST_PASS, trust_id='trust')
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'identity':
|
||||
@@ -184,8 +185,10 @@ class V3IdentityPlugin(utils.TestCase):
|
||||
@httpretty.activate
|
||||
def test_with_multiple_mechanisms_factory(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
|
||||
trust_id='trust', token='foo')
|
||||
p = v3.PasswordMethod(username=self.TEST_USER, password=self.TEST_PASS)
|
||||
t = v3.TokenMethod(token='foo')
|
||||
a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'identity':
|
||||
|
@@ -159,13 +159,13 @@ class Client(httpclient.HTTPClient):
|
||||
if auth_url is None:
|
||||
raise ValueError("Cannot authenticate without an auth_url")
|
||||
|
||||
a = v2_auth.Auth.factory(auth_url,
|
||||
username=username,
|
||||
password=password,
|
||||
token=token,
|
||||
trust_id=trust_id,
|
||||
tenant_id=project_id or tenant_id,
|
||||
tenant_name=project_name or tenant_name)
|
||||
a = v2_auth.Auth._factory(auth_url,
|
||||
username=username,
|
||||
password=password,
|
||||
token=token,
|
||||
trust_id=trust_id,
|
||||
tenant_id=project_id or tenant_id,
|
||||
tenant_name=project_name or tenant_name)
|
||||
|
||||
return a.get_auth_ref(self.session)
|
||||
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
|
||||
|
@@ -149,20 +149,20 @@ class Client(httpclient.HTTPClient):
|
||||
if auth_url is None:
|
||||
raise ValueError("Cannot authenticate without an auth_url")
|
||||
|
||||
a = v3_auth.Auth.factory(auth_url,
|
||||
username=username,
|
||||
password=password,
|
||||
token=token,
|
||||
trust_id=trust_id,
|
||||
user_id=user_id,
|
||||
domain_id=domain_id,
|
||||
domain_name=domain_name,
|
||||
user_domain_id=user_domain_id,
|
||||
user_domain_name=user_domain_name,
|
||||
project_id=project_id,
|
||||
project_name=project_name,
|
||||
project_domain_id=project_domain_id,
|
||||
project_domain_name=project_domain_name)
|
||||
a = v3_auth.Auth._factory(auth_url,
|
||||
username=username,
|
||||
password=password,
|
||||
token=token,
|
||||
trust_id=trust_id,
|
||||
user_id=user_id,
|
||||
domain_id=domain_id,
|
||||
domain_name=domain_name,
|
||||
user_domain_id=user_domain_id,
|
||||
user_domain_name=user_domain_name,
|
||||
project_id=project_id,
|
||||
project_name=project_name,
|
||||
project_domain_id=project_domain_id,
|
||||
project_domain_name=project_domain_name)
|
||||
|
||||
return a.get_auth_ref(self.session)
|
||||
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):
|
||||
|
Reference in New Issue
Block a user