Merge "Allow passing user_id to v2Password plugin"

This commit is contained in:
Jenkins
2014-08-20 22:00:58 +00:00
committed by Gerrit Code Review
2 changed files with 54 additions and 3 deletions

View File

@@ -97,20 +97,39 @@ class Auth(base.BaseIdentityPlugin):
class Password(Auth):
def __init__(self, auth_url, username, password, **kwargs):
@utils.positional(4)
def __init__(self, auth_url, username=None, password=None, user_id=None,
**kwargs):
"""A plugin for authenticating with a username and password.
A username or user_id must be provided.
:param string auth_url: Identity service endpoint for authorization.
:param string username: Username for authentication.
:param string password: Password for authentication.
:param string user_id: User ID for authentication.
:raises TypeError: if a user_id or username is not provided.
"""
super(Password, self).__init__(auth_url, **kwargs)
if not (user_id or username):
msg = 'You need to specify either a username or user_id'
raise TypeError(msg)
self.user_id = user_id
self.username = username
self.password = password
def get_auth_data(self, headers=None):
return {'passwordCredentials': {'username': self.username,
'password': self.password}}
auth = {'password': self.password}
if self.username:
auth['username'] = self.username
elif self.user_id:
auth['userId'] = self.user_id
return {'passwordCredentials': auth}
@classmethod
def get_options(cls):
@@ -121,6 +140,7 @@ class Password(Auth):
dest='username',
deprecated_name='username',
help='Username to login with'),
cfg.StrOpt('user-id', help='User ID to longin with'),
cfg.StrOpt('password', secret=True, help='Password to use'),
])

View File

@@ -110,6 +110,20 @@ class V2IdentityPlugin(utils.TestCase):
self.assertRequestHeaderEqual('Accept', 'application/json')
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_user_id_password(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
password=self.TEST_PASS)
s = session.Session(a)
s.get_token()
req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
'password': self.TEST_PASS}}}
self.assertRequestBodyIs(json=req)
self.assertRequestHeaderEqual('Content-Type', 'application/json')
self.assertRequestHeaderEqual('Accept', 'application/json')
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_username_password_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
@@ -123,6 +137,19 @@ class V2IdentityPlugin(utils.TestCase):
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_user_id_password_scoped(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
s = session.Session(a)
s.get_token()
req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
'password': self.TEST_PASS},
'tenantId': self.TEST_TENANT_ID}}
self.assertRequestBodyIs(json=req)
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
def test_authenticate_with_token(self):
self.stub_auth(json=self.TEST_RESPONSE_DICT)
a = v2.Token(self.TEST_URL, 'foo')
@@ -247,3 +274,7 @@ class V2IdentityPlugin(utils.TestCase):
s = session.Session(auth=a)
self.assertEqual(self.TEST_TOKEN, s.get_token())
self.assertNotIn(password, self.logger.output)
def test_password_with_no_user_id_or_name(self):
self.assertRaises(TypeError,
v2.Password, self.TEST_URL, password=self.TEST_PASS)