Privatize auth construction parameters

Privatize some of the automatic auth plugin construction work. At some
point we are going to need to have this pluggable and i'm not sure the
current methods will suffice. It's better to keep this private until we
are sure rather than be stuck with a public API.

Change-Id: I2a10a9b28bef6c094b1330a0524f1c516f5103fd
Closes-Bug: #1287488
This commit is contained in:
Jamie Lennox
2014-03-03 18:48:35 +10:00
parent 19356011e6
commit 0267c98f56
6 changed files with 78 additions and 78 deletions

View File

@@ -25,7 +25,7 @@ from keystoneclient import exceptions
class Auth(base.BaseIdentityPlugin):
@staticmethod
def factory(auth_url, **kwargs):
def _factory(auth_url, **kwargs):
"""Construct a plugin appropriate to your available arguments.
This function should only be used for loading authentication from a

View File

@@ -104,7 +104,7 @@ class Auth(base.BaseIdentityPlugin):
**resp.json()['token'])
@staticmethod
def factory(auth_url, **kwargs):
def _factory(auth_url, **kwargs):
"""Construct a plugin appropriate to your available arguments.
This function is intended as a convenience and backwards compatibility.
@@ -116,11 +116,11 @@ class Auth(base.BaseIdentityPlugin):
# NOTE(jamielennox): kwargs extraction is outside the if statement to
# clear up additional args that might be passed but not valid for type.
method_kwargs = PasswordMethod.extract_kwargs(kwargs)
method_kwargs = PasswordMethod._extract_kwargs(kwargs)
if method_kwargs.get('password'):
methods.append(PasswordMethod(**method_kwargs))
method_kwargs = TokenMethod.extract_kwargs(kwargs)
method_kwargs = TokenMethod._extract_kwargs(kwargs)
if method_kwargs.get('token'):
methods.append(TokenMethod(**method_kwargs))
@@ -144,10 +144,10 @@ class AuthMethod(object):
the factory method and don't work as well with AuthConstructors.
"""
method_parameters = []
_method_parameters = []
def __init__(self, **kwargs):
for param in self.method_parameters:
for param in self._method_parameters:
setattr(self, param, kwargs.pop(param, None))
if kwargs:
@@ -155,10 +155,10 @@ class AuthMethod(object):
raise AttributeError(msg)
@classmethod
def extract_kwargs(cls, kwargs):
def _extract_kwargs(cls, kwargs):
"""Remove parameters related to this method from other kwargs."""
return dict([(p, kwargs.pop(p, None))
for p in cls.method_parameters])
for p in cls._method_parameters])
@abc.abstractmethod
def get_auth_data(self, headers=None):
@@ -172,7 +172,7 @@ class AuthMethod(object):
@six.add_metaclass(abc.ABCMeta)
class AuthConstructor(Auth):
class _AuthConstructor(Auth):
"""AuthConstructor is a means of creating an Auth Plugin that contains
only one authentication method. This is generally the required usage.
@@ -181,21 +181,21 @@ class AuthConstructor(Auth):
creates the auth plugin with only that authentication method.
"""
auth_method_class = None
_auth_method_class = None
def __init__(self, auth_url, *args, **kwargs):
method_kwargs = self.auth_method_class.extract_kwargs(kwargs)
method = self.auth_method_class(*args, **method_kwargs)
super(AuthConstructor, self).__init__(auth_url, [method], **kwargs)
method_kwargs = self._auth_method_class._extract_kwargs(kwargs)
method = self._auth_method_class(*args, **method_kwargs)
super(_AuthConstructor, self).__init__(auth_url, [method], **kwargs)
class PasswordMethod(AuthMethod):
method_parameters = ['user_id',
'username',
'user_domain_id',
'user_domain_name',
'password']
_method_parameters = ['user_id',
'username',
'user_domain_id',
'user_domain_name',
'password']
def __init__(self, **kwargs):
"""Construct a User/Password based authentication method.
@@ -224,13 +224,13 @@ class PasswordMethod(AuthMethod):
return 'password', {'user': user}
class Password(AuthConstructor):
auth_method_class = PasswordMethod
class Password(_AuthConstructor):
_auth_method_class = PasswordMethod
class TokenMethod(AuthMethod):
method_parameters = ['token']
_method_parameters = ['token']
def __init__(self, **kwargs):
"""Construct a Auth plugin to fetch a token from a token.
@@ -244,8 +244,8 @@ class TokenMethod(AuthMethod):
return 'token', {'id': self.token}
class Token(AuthConstructor):
auth_method_class = TokenMethod
class Token(_AuthConstructor):
_auth_method_class = TokenMethod
def __init__(self, auth_url, token, **kwargs):
super(Token, self).__init__(auth_url, token=token, **kwargs)

View File

@@ -49,20 +49,15 @@ class V2IdentityPlugin(utils.TestCase):
},
}
def _plugin(self, auth_url=TEST_URL, **kwargs):
return v2.Auth.factory(auth_url, **kwargs)
def _session(self, **kwargs):
return session.Session(auth=self._plugin(**kwargs))
def stub_auth(self, **kwargs):
self.stub_url(httpretty.POST, ['tokens'], **kwargs)
@httpretty.activate
def test_authenticate_with_username_password(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS)
self.assertIsInstance(s.auth, v2.Password)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(a)
s.get_token()
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
@@ -73,9 +68,9 @@ class V2IdentityPlugin(utils.TestCase):
@httpretty.activate
def test_authenticate_with_username_password_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
tenant_id=self.TEST_TENANT_ID)
self.assertIsInstance(s.auth, v2.Password)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
s = session.Session(a)
s.get_token()
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,
@@ -87,8 +82,8 @@ class V2IdentityPlugin(utils.TestCase):
@httpretty.activate
def test_authenticate_with_token(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(token='foo')
self.assertIsInstance(s.auth, v2.Token)
a = v2.Token(self.TEST_URL, 'foo')
s = session.Session(a)
s.get_token()
req = {'auth': {'token': {'id': 'foo'}}}
@@ -97,13 +92,15 @@ class V2IdentityPlugin(utils.TestCase):
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_missing_auth_params(self):
self.assertRaises(exceptions.NoMatchingPlugin, self._plugin)
self.assertRaises(exceptions.NoMatchingPlugin, v2.Auth._factory,
self.TEST_URL)
@httpretty.activate
def test_with_trust_id(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
trust_id='trust')
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS, trust_id='trust')
s = session.Session(a)
s.get_token()
req = {'auth': {'passwordCredentials': {'username': self.TEST_USER,

View File

@@ -65,12 +65,6 @@ class V3IdentityPlugin(utils.TestCase):
},
}
def _plugin(self, auth_url=TEST_URL, **kwargs):
return v3.Auth.factory(auth_url, **kwargs)
def _session(self, **kwargs):
return session.Session(auth=self._plugin(**kwargs))
def stub_auth(self, subject_token=None, **kwargs):
if not subject_token:
subject_token = self.TEST_TOKEN
@@ -99,8 +93,9 @@ class V3IdentityPlugin(utils.TestCase):
@httpretty.activate
def test_authenticate_with_username_password_domain_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
domain_id=self.TEST_DOMAIN_ID)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS, domain_id=self.TEST_DOMAIN_ID)
s = session.Session(a)
s.get_token()
req = {'auth': {'identity':
@@ -114,8 +109,10 @@ class V3IdentityPlugin(utils.TestCase):
@httpretty.activate
def test_authenticate_with_username_password_project_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
project_id=self.TEST_DOMAIN_ID)
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS,
project_id=self.TEST_DOMAIN_ID)
s = session.Session(a)
s.get_token()
req = {'auth': {'identity':
@@ -142,7 +139,8 @@ class V3IdentityPlugin(utils.TestCase):
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_missing_auth_params(self):
self.assertRaises(exceptions.AuthorizationFailure, self._plugin)
self.assertRaises(exceptions.AuthorizationFailure, v3.Auth._factory,
self.TEST_URL)
@httpretty.activate
def test_with_expired(self):
@@ -151,7 +149,8 @@ class V3IdentityPlugin(utils.TestCase):
d = copy.deepcopy(self.TEST_RESPONSE_DICT)
d['token']['expires_at'] = '2000-01-01T00:00:10.000123Z'
a = self._plugin(username='username', password='password')
a = v3.Password(self.TEST_URL, username='username',
password='password')
a.auth_ref = access.AccessInfo.factory(body=d)
s = session.Session(auth=a)
@@ -161,16 +160,18 @@ class V3IdentityPlugin(utils.TestCase):
self.TEST_RESPONSE_DICT['token']['expires_at'])
def test_with_domain_and_project_scoping(self):
a = self._plugin(username='username', password='password',
project_id='project', domain_id='domain')
a = v3.Password(self.TEST_URL, username='username',
password='password', project_id='project',
domain_id='domain')
self.assertRaises(exceptions.AuthorizationFailure,
a.get_token, None)
@httpretty.activate
def test_with_trust_id(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
trust_id='trust')
a = v3.Password(self.TEST_URL, username=self.TEST_USER,
password=self.TEST_PASS, trust_id='trust')
s = session.Session(a)
s.get_token()
req = {'auth': {'identity':
@@ -184,8 +185,10 @@ class V3IdentityPlugin(utils.TestCase):
@httpretty.activate
def test_with_multiple_mechanisms_factory(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
s = self._session(username=self.TEST_USER, password=self.TEST_PASS,
trust_id='trust', token='foo')
p = v3.PasswordMethod(username=self.TEST_USER, password=self.TEST_PASS)
t = v3.TokenMethod(token='foo')
a = v3.Auth(self.TEST_URL, [p, t], trust_id='trust')
s = session.Session(a)
s.get_token()
req = {'auth': {'identity':

View File

@@ -159,13 +159,13 @@ class Client(httpclient.HTTPClient):
if auth_url is None:
raise ValueError("Cannot authenticate without an auth_url")
a = v2_auth.Auth.factory(auth_url,
username=username,
password=password,
token=token,
trust_id=trust_id,
tenant_id=project_id or tenant_id,
tenant_name=project_name or tenant_name)
a = v2_auth.Auth._factory(auth_url,
username=username,
password=password,
token=token,
trust_id=trust_id,
tenant_id=project_id or tenant_id,
tenant_name=project_name or tenant_name)
return a.get_auth_ref(self.session)
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):

View File

@@ -149,20 +149,20 @@ class Client(httpclient.HTTPClient):
if auth_url is None:
raise ValueError("Cannot authenticate without an auth_url")
a = v3_auth.Auth.factory(auth_url,
username=username,
password=password,
token=token,
trust_id=trust_id,
user_id=user_id,
domain_id=domain_id,
domain_name=domain_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name)
a = v3_auth.Auth._factory(auth_url,
username=username,
password=password,
token=token,
trust_id=trust_id,
user_id=user_id,
domain_id=domain_id,
domain_name=domain_name,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name)
return a.get_auth_ref(self.session)
except (exceptions.AuthorizationFailure, exceptions.Unauthorized):