Allow passing user_id to v2Password plugin
Whilst this is undocumented it is supported by keystone and relied upon by other services. Change-Id: Idf8be75e2e0b275d9c9840082079100dd13a70ff
This commit is contained in:
@@ -97,20 +97,39 @@ class Auth(base.BaseIdentityPlugin):
|
||||
|
||||
class Password(Auth):
|
||||
|
||||
def __init__(self, auth_url, username, password, **kwargs):
|
||||
@utils.positional(4)
|
||||
def __init__(self, auth_url, username=None, password=None, user_id=None,
|
||||
**kwargs):
|
||||
"""A plugin for authenticating with a username and password.
|
||||
|
||||
A username or user_id must be provided.
|
||||
|
||||
:param string auth_url: Identity service endpoint for authorization.
|
||||
:param string username: Username for authentication.
|
||||
:param string password: Password for authentication.
|
||||
:param string user_id: User ID for authentication.
|
||||
|
||||
:raises TypeError: if a user_id or username is not provided.
|
||||
"""
|
||||
super(Password, self).__init__(auth_url, **kwargs)
|
||||
|
||||
if not (user_id or username):
|
||||
msg = 'You need to specify either a username or user_id'
|
||||
raise TypeError(msg)
|
||||
|
||||
self.user_id = user_id
|
||||
self.username = username
|
||||
self.password = password
|
||||
|
||||
def get_auth_data(self, headers=None):
|
||||
return {'passwordCredentials': {'username': self.username,
|
||||
'password': self.password}}
|
||||
auth = {'password': self.password}
|
||||
|
||||
if self.username:
|
||||
auth['username'] = self.username
|
||||
elif self.user_id:
|
||||
auth['userId'] = self.user_id
|
||||
|
||||
return {'passwordCredentials': auth}
|
||||
|
||||
@classmethod
|
||||
def get_options(cls):
|
||||
@@ -121,6 +140,7 @@ class Password(Auth):
|
||||
dest='username',
|
||||
deprecated_name='username',
|
||||
help='Username to login with'),
|
||||
cfg.StrOpt('user-id', help='User ID to longin with'),
|
||||
cfg.StrOpt('password', secret=True, help='Password to use'),
|
||||
])
|
||||
|
||||
|
@@ -110,6 +110,20 @@ class V2IdentityPlugin(utils.TestCase):
|
||||
self.assertRequestHeaderEqual('Accept', 'application/json')
|
||||
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
|
||||
def test_authenticate_with_user_id_password(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
|
||||
password=self.TEST_PASS)
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
|
||||
'password': self.TEST_PASS}}}
|
||||
self.assertRequestBodyIs(json=req)
|
||||
self.assertRequestHeaderEqual('Content-Type', 'application/json')
|
||||
self.assertRequestHeaderEqual('Accept', 'application/json')
|
||||
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
|
||||
def test_authenticate_with_username_password_scoped(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
a = v2.Password(self.TEST_URL, username=self.TEST_USER,
|
||||
@@ -123,6 +137,19 @@ class V2IdentityPlugin(utils.TestCase):
|
||||
self.assertRequestBodyIs(json=req)
|
||||
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
|
||||
def test_authenticate_with_user_id_password_scoped(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
a = v2.Password(self.TEST_URL, user_id=self.TEST_USER,
|
||||
password=self.TEST_PASS, tenant_id=self.TEST_TENANT_ID)
|
||||
s = session.Session(a)
|
||||
s.get_token()
|
||||
|
||||
req = {'auth': {'passwordCredentials': {'userId': self.TEST_USER,
|
||||
'password': self.TEST_PASS},
|
||||
'tenantId': self.TEST_TENANT_ID}}
|
||||
self.assertRequestBodyIs(json=req)
|
||||
self.assertEqual(s.auth.auth_ref.auth_token, self.TEST_TOKEN)
|
||||
|
||||
def test_authenticate_with_token(self):
|
||||
self.stub_auth(json=self.TEST_RESPONSE_DICT)
|
||||
a = v2.Token(self.TEST_URL, 'foo')
|
||||
@@ -247,3 +274,7 @@ class V2IdentityPlugin(utils.TestCase):
|
||||
s = session.Session(auth=a)
|
||||
self.assertEqual(self.TEST_TOKEN, s.get_token())
|
||||
self.assertNotIn(password, self.logger.output)
|
||||
|
||||
def test_password_with_no_user_id_or_name(self):
|
||||
self.assertRaises(TypeError,
|
||||
v2.Password, self.TEST_URL, password=self.TEST_PASS)
|
||||
|
Reference in New Issue
Block a user