Merge "Modify keyring tests to test authentication"
This commit is contained in:
@@ -14,6 +14,8 @@
|
|||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
from keystoneclient import access
|
from keystoneclient import access
|
||||||
from keystoneclient import httpclient
|
from keystoneclient import httpclient
|
||||||
from keystoneclient.openstack.common import timeutils
|
from keystoneclient.openstack.common import timeutils
|
||||||
@@ -47,42 +49,60 @@ class KeyringTest(utils.TestCase):
|
|||||||
'optional package keyring or pickle is not installed')
|
'optional package keyring or pickle is not installed')
|
||||||
|
|
||||||
class MemoryKeyring(keyring.backend.KeyringBackend):
|
class MemoryKeyring(keyring.backend.KeyringBackend):
|
||||||
"""Simple memory keyring with support for multiple keys."""
|
"""A Simple testing keyring.
|
||||||
|
|
||||||
|
This class supports stubbing an initial password to be returned by
|
||||||
|
setting password, and allows easy password and key retrieval. Also
|
||||||
|
records if a password was retrieved.
|
||||||
|
"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.passwords = {}
|
self.key = None
|
||||||
|
self.password = None
|
||||||
|
self.fetched = False
|
||||||
|
self.get_password_called = False
|
||||||
|
self.set_password_called = False
|
||||||
|
|
||||||
def supported(self):
|
def supported(self):
|
||||||
return 1
|
return 1
|
||||||
|
|
||||||
def get_password(self, service, username):
|
def get_password(self, service, username):
|
||||||
|
self.get_password_called = True
|
||||||
key = username + '@' + service
|
key = username + '@' + service
|
||||||
if key not in self.passwords:
|
# make sure we don't get passwords crossed if one is enforced.
|
||||||
|
if self.key and self.key != key:
|
||||||
return None
|
return None
|
||||||
return self.passwords[key]
|
if self.password:
|
||||||
|
self.fetched = True
|
||||||
|
return self.password
|
||||||
|
|
||||||
def set_password(self, service, username, password):
|
def set_password(self, service, username, password):
|
||||||
key = username + '@' + service
|
self.set_password_called = True
|
||||||
self.passwords[key] = password
|
self.key = username + '@' + service
|
||||||
|
self.password = password
|
||||||
|
|
||||||
super(KeyringTest, self).setUp()
|
super(KeyringTest, self).setUp()
|
||||||
keyring.set_keyring(MemoryKeyring())
|
self.memory_keyring = MemoryKeyring()
|
||||||
|
keyring.set_keyring(self.memory_keyring)
|
||||||
|
|
||||||
def test_no_keyring_key(self):
|
def test_no_keyring_key(self):
|
||||||
"""Ensure that we get no value back if we don't have use_keyring
|
"""Ensure that if we don't have use_keyring set in the client that
|
||||||
set in the client.
|
the keyring is never accessed.
|
||||||
"""
|
"""
|
||||||
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
||||||
tenant_id=TENANT_ID, auth_url=AUTH_URL)
|
tenant_id=TENANT_ID, auth_url=AUTH_URL)
|
||||||
|
|
||||||
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
|
# stub and check that a new token is received
|
||||||
auth_url=AUTH_URL,
|
with mock.patch.object(cl, 'get_raw_token_from_identity_service') \
|
||||||
username=USERNAME,
|
as meth:
|
||||||
tenant_name=TENANT,
|
meth.return_value = (True, PROJECT_SCOPED_TOKEN)
|
||||||
tenant_id=TENANT_ID,
|
|
||||||
token=TOKEN)
|
|
||||||
|
|
||||||
self.assertIsNone(keyring_key)
|
self.assertTrue(cl.authenticate())
|
||||||
self.assertIsNone(auth_ref)
|
|
||||||
|
meth.assert_called_once()
|
||||||
|
|
||||||
|
# make sure that we never touched the keyring
|
||||||
|
self.assertFalse(self.memory_keyring.get_password_called)
|
||||||
|
self.assertFalse(self.memory_keyring.set_password_called)
|
||||||
|
|
||||||
def test_build_keyring_key(self):
|
def test_build_keyring_key(self):
|
||||||
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
||||||
@@ -102,43 +122,68 @@ class KeyringTest(utils.TestCase):
|
|||||||
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
||||||
tenant_id=TENANT_ID, auth_url=AUTH_URL,
|
tenant_id=TENANT_ID, auth_url=AUTH_URL,
|
||||||
use_keyring=True)
|
use_keyring=True)
|
||||||
keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
|
|
||||||
username=USERNAME,
|
|
||||||
tenant_name=TENANT,
|
|
||||||
tenant_id=TENANT_ID,
|
|
||||||
token=TOKEN)
|
|
||||||
|
|
||||||
cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
|
# set an expired token into the keyring
|
||||||
|
auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
|
||||||
expired = timeutils.utcnow() - datetime.timedelta(minutes=30)
|
expired = timeutils.utcnow() - datetime.timedelta(minutes=30)
|
||||||
cl.auth_ref['token']['expires'] = timeutils.isotime(expired)
|
auth_ref['token']['expires'] = timeutils.isotime(expired)
|
||||||
cl.store_auth_ref_into_keyring(keyring_key)
|
self.memory_keyring.password = pickle.dumps(auth_ref)
|
||||||
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
|
|
||||||
auth_url=AUTH_URL,
|
|
||||||
username=USERNAME,
|
|
||||||
tenant_name=TENANT,
|
|
||||||
tenant_id=TENANT_ID,
|
|
||||||
token=TOKEN)
|
|
||||||
self.assertIsNone(auth_ref)
|
|
||||||
|
|
||||||
def test_set_and_get_keyring(self):
|
# stub and check that a new token is received, so not using expired
|
||||||
|
with mock.patch.object(cl, 'get_raw_token_from_identity_service') \
|
||||||
|
as meth:
|
||||||
|
meth.return_value = (True, PROJECT_SCOPED_TOKEN)
|
||||||
|
|
||||||
|
self.assertTrue(cl.authenticate())
|
||||||
|
|
||||||
|
meth.assert_called_once()
|
||||||
|
|
||||||
|
# check that a value was returned from the keyring
|
||||||
|
self.assertTrue(self.memory_keyring.fetched)
|
||||||
|
|
||||||
|
# check that the new token has been loaded into the keyring
|
||||||
|
new_auth_ref = pickle.loads(self.memory_keyring.password)
|
||||||
|
self.assertEqual(new_auth_ref['token']['expires'],
|
||||||
|
PROJECT_SCOPED_TOKEN['access']['token']['expires'])
|
||||||
|
|
||||||
|
def test_get_keyring(self):
|
||||||
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
||||||
tenant_id=TENANT_ID, auth_url=AUTH_URL,
|
tenant_id=TENANT_ID, auth_url=AUTH_URL,
|
||||||
use_keyring=True)
|
use_keyring=True)
|
||||||
keyring_key = cl._build_keyring_key(auth_url=AUTH_URL,
|
|
||||||
username=USERNAME,
|
|
||||||
tenant_name=TENANT,
|
|
||||||
tenant_id=TENANT_ID,
|
|
||||||
token=TOKEN)
|
|
||||||
|
|
||||||
cl.auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
|
# set an token into the keyring
|
||||||
expires = timeutils.utcnow() + datetime.timedelta(minutes=30)
|
auth_ref = access.AccessInfo.factory(body=PROJECT_SCOPED_TOKEN)
|
||||||
cl.auth_ref['token']['expires'] = timeutils.isotime(expires)
|
future = timeutils.utcnow() + datetime.timedelta(minutes=30)
|
||||||
cl.store_auth_ref_into_keyring(keyring_key)
|
auth_ref['token']['expires'] = timeutils.isotime(future)
|
||||||
(keyring_key, auth_ref) = cl.get_auth_ref_from_keyring(
|
self.memory_keyring.password = pickle.dumps(auth_ref)
|
||||||
auth_url=AUTH_URL,
|
|
||||||
username=USERNAME,
|
# don't stub get_raw_token so will fail if authenticate happens
|
||||||
tenant_name=TENANT,
|
|
||||||
tenant_id=TENANT_ID,
|
self.assertTrue(cl.authenticate())
|
||||||
token=TOKEN)
|
self.assertTrue(self.memory_keyring.fetched)
|
||||||
self.assertEqual(auth_ref.auth_token, TOKEN)
|
|
||||||
self.assertEqual(auth_ref.username, USERNAME)
|
def test_set_keyring(self):
|
||||||
|
cl = httpclient.HTTPClient(username=USERNAME, password=PASSWORD,
|
||||||
|
tenant_id=TENANT_ID, auth_url=AUTH_URL,
|
||||||
|
use_keyring=True)
|
||||||
|
|
||||||
|
# stub and check that a new token is received
|
||||||
|
with mock.patch.object(cl, 'get_raw_token_from_identity_service') \
|
||||||
|
as meth:
|
||||||
|
meth.return_value = (True, PROJECT_SCOPED_TOKEN)
|
||||||
|
|
||||||
|
self.assertTrue(cl.authenticate())
|
||||||
|
|
||||||
|
meth.assert_called_once()
|
||||||
|
|
||||||
|
# we checked the keyring, but we didn't find anything
|
||||||
|
self.assertTrue(self.memory_keyring.get_password_called)
|
||||||
|
self.assertFalse(self.memory_keyring.fetched)
|
||||||
|
|
||||||
|
# check that the new token has been loaded into the keyring
|
||||||
|
self.assertTrue(self.memory_keyring.set_password_called)
|
||||||
|
new_auth_ref = pickle.loads(self.memory_keyring.password)
|
||||||
|
self.assertEqual(new_auth_ref.auth_token, TOKEN)
|
||||||
|
self.assertEqual(new_auth_ref['token'],
|
||||||
|
PROJECT_SCOPED_TOKEN['access']['token'])
|
||||||
|
self.assertEqual(new_auth_ref.username, USERNAME)
|
||||||
|
Reference in New Issue
Block a user