Add an ability to cache auth token
Add option --os-cache, which defaults to OS_CACHE variable (by default - False). When this option is specified, manilaclient will use auth token saved in keyring. First time when command will be called, auth token and user password will be saved in keyring. Using --os-reset-cache option password and auth_token stored in the keyring can be deleted. Implements bp cache-auth-token Change-Id: Ia8959655528acce726baa616e66901c7a96fd35c
This commit is contained in:
parent
3332d01391
commit
02fb7a62d1
@ -55,7 +55,8 @@ class HTTPClient(object):
|
||||
share_service_name=None,
|
||||
retries=None,
|
||||
http_log_debug=False,
|
||||
cacert=None):
|
||||
cacert=None,
|
||||
os_cache=False):
|
||||
self.user = user
|
||||
self.password = password
|
||||
self.projectid = projectid
|
||||
@ -75,6 +76,8 @@ class HTTPClient(object):
|
||||
self.proxy_token = proxy_token
|
||||
self.proxy_tenant_id = proxy_tenant_id
|
||||
|
||||
self.os_cache = os_cache
|
||||
|
||||
if insecure:
|
||||
self.verify_cert = False
|
||||
else:
|
||||
@ -160,6 +163,7 @@ class HTTPClient(object):
|
||||
self.authenticate()
|
||||
kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
|
||||
if self.projectid:
|
||||
|
||||
kwargs['headers']['X-Auth-Project-Id'] = self.projectid
|
||||
try:
|
||||
resp, body = self.request(self.management_url + url, method,
|
||||
@ -268,6 +272,14 @@ class HTTPClient(object):
|
||||
return self._extract_service_catalog(url, resp, body,
|
||||
extract_token=False)
|
||||
|
||||
def _save_keys(self):
|
||||
# Store the token/mgmt url in the keyring for later requests.
|
||||
if (self.os_cache):
|
||||
self.keyring_saver.save(self.auth_token,
|
||||
self.management_url,
|
||||
self.tenant_id)
|
||||
self.keyring_saver.save_password()
|
||||
|
||||
def authenticate(self):
|
||||
magic_tuple = urlparse.urlsplit(self.auth_url)
|
||||
scheme, netloc, path, query, frag = magic_tuple
|
||||
@ -315,6 +327,8 @@ class HTTPClient(object):
|
||||
auth_url = auth_url + '/v2.0'
|
||||
self._v2_auth(auth_url)
|
||||
|
||||
self._save_keys()
|
||||
|
||||
def _v1_auth(self, url):
|
||||
if self.proxy_token:
|
||||
raise exceptions.NoTokenLookupException()
|
||||
|
@ -21,6 +21,7 @@ Command-line interface to the OpenStack Manila API.
|
||||
from __future__ import print_function
|
||||
|
||||
import argparse
|
||||
import getpass
|
||||
import glob
|
||||
import imp
|
||||
import itertools
|
||||
@ -29,6 +30,7 @@ import os
|
||||
import pkgutil
|
||||
import sys
|
||||
|
||||
import keyring
|
||||
import six
|
||||
|
||||
from manilaclient import client
|
||||
@ -68,6 +70,188 @@ class ManilaClientArgumentParser(argparse.ArgumentParser):
|
||||
'subp': progparts[2]})
|
||||
|
||||
|
||||
class ManilaKeyring(keyring.backends.file.EncryptedKeyring):
|
||||
def delete_password(self, keyring_keys):
|
||||
"""Delete passwords from keyring.
|
||||
|
||||
Delete the passwords for given usernames of the services.
|
||||
|
||||
:param keyring_key: dictionary containing pairs {service:username}
|
||||
"""
|
||||
if self._check_file():
|
||||
self._unlock()
|
||||
for key, value in six.iteritems(keyring_keys):
|
||||
super(ManilaKeyring, self).delete_password(key, value)
|
||||
|
||||
|
||||
class SecretsHelper(object):
|
||||
"""Helper for working with keyring."""
|
||||
|
||||
def __init__(self, args, client):
|
||||
self.args = args
|
||||
self.client = client
|
||||
self.key = None
|
||||
self.keyring = ManilaKeyring()
|
||||
|
||||
def _validate_string(self, text):
|
||||
if text is None or len(text) == 0:
|
||||
return False
|
||||
return True
|
||||
|
||||
def _make_key(self):
|
||||
if self.key is not None:
|
||||
return self.key
|
||||
keys = [
|
||||
self.client.auth_url,
|
||||
self.client.user,
|
||||
self.client.projectid,
|
||||
self.client.region_name,
|
||||
self.client.endpoint_type,
|
||||
self.client.service_type,
|
||||
self.client.service_name,
|
||||
self.client.share_service_name,
|
||||
]
|
||||
for (index, key) in enumerate(keys):
|
||||
if key is None:
|
||||
keys[index] = '?'
|
||||
else:
|
||||
keys[index] = str(keys[index])
|
||||
self.key = "/".join(keys)
|
||||
return self.key
|
||||
|
||||
def _prompt_password(self, verify=True):
|
||||
"""Suggest user to enter password from keyboard."""
|
||||
pw = None
|
||||
if hasattr(sys.stdin, 'isatty') and sys.stdin.isatty():
|
||||
# Check for Ctl-D
|
||||
try:
|
||||
while True:
|
||||
pw1 = getpass.getpass('OS Password: ')
|
||||
if verify:
|
||||
pw2 = getpass.getpass('Please verify: ')
|
||||
else:
|
||||
pw2 = pw1
|
||||
if pw1 == pw2 and self._validate_string(pw1):
|
||||
pw = pw1
|
||||
break
|
||||
except EOFError:
|
||||
pass
|
||||
return pw
|
||||
|
||||
def save(self, auth_token, management_url, tenant_id):
|
||||
"""Save auth token, management url and tenant id in keyring.
|
||||
|
||||
If params are different from already cached ones, save new auth token,
|
||||
management url and tenant id in keyring.
|
||||
|
||||
Raise ValueError in case of empty auth_token, management_url or
|
||||
tenant_id.
|
||||
"""
|
||||
if (auth_token == self.auth_token and
|
||||
management_url == self.management_url):
|
||||
# Nothing changed....
|
||||
return
|
||||
if not all([management_url, auth_token, tenant_id]):
|
||||
raise ValueError("Unable to save empty management url/auth "
|
||||
"token")
|
||||
value = "|".join([str(auth_token),
|
||||
str(management_url),
|
||||
str(tenant_id)])
|
||||
self.keyring.set_password('manilaclient_auth', self._make_key(), value)
|
||||
|
||||
def reset(self):
|
||||
"""Delete cached password and auth token."""
|
||||
args = {'openstack': self.args.os_username,
|
||||
'manilaclient_auth': self._make_key()}
|
||||
self.keyring.delete_password(args)
|
||||
|
||||
def save_password(self):
|
||||
self.keyring.set_password('openstack',
|
||||
self.args.os_username, self.password)
|
||||
|
||||
def check_cached_password(self):
|
||||
"""Check if os_password is equal to cached password."""
|
||||
if self.args.os_cache:
|
||||
cached_password = self.keyring.get_password('openstack',
|
||||
self.args.os_username)
|
||||
cached_token = self.keyring.get_password('manilaclient_auth',
|
||||
self._make_key())
|
||||
if cached_password and self.password != cached_password:
|
||||
return False
|
||||
if cached_token and not cached_password:
|
||||
return False
|
||||
return True
|
||||
|
||||
@property
|
||||
def password(self):
|
||||
"""Return user password.
|
||||
|
||||
Return os_password value or suggest user to enter new password from
|
||||
keyboard.
|
||||
"""
|
||||
password = None
|
||||
if self._validate_string(self.args.os_password):
|
||||
password = self.args.os_password
|
||||
else:
|
||||
verify_pass = strutils.bool_from_string(
|
||||
utils.env("OS_VERIFY_PASSWORD", default=False))
|
||||
password = self._prompt_password(verify_pass)
|
||||
if not password:
|
||||
raise exc.CommandError(
|
||||
'Expecting a password provided via either '
|
||||
'--os-password, env[OS_PASSWORD], or '
|
||||
'prompted response')
|
||||
return password
|
||||
|
||||
@property
|
||||
def management_url(self):
|
||||
"""Return cached management url.
|
||||
|
||||
If os_cache enabled and management url already cached, return
|
||||
management url. Otherwise return None.
|
||||
"""
|
||||
if not self.args.os_cache:
|
||||
return None
|
||||
management_url = None
|
||||
block = self.keyring.get_password('manilaclient_auth',
|
||||
self._make_key())
|
||||
if block:
|
||||
_token, management_url, _tenant_id = block.split('|', 2)
|
||||
return management_url
|
||||
|
||||
@property
|
||||
def auth_token(self):
|
||||
"""Return cached auth token.
|
||||
|
||||
If os_cache enabled and auth token already cached, return auth token.
|
||||
Otherwise return None.
|
||||
"""
|
||||
if not self.args.os_cache:
|
||||
return None
|
||||
token = None
|
||||
block = self.keyring.get_password('manilaclient_auth',
|
||||
self._make_key())
|
||||
if block:
|
||||
token, _management_url, _tenant_id = block.split('|', 2)
|
||||
return token
|
||||
|
||||
@property
|
||||
def tenant_id(self):
|
||||
"""Return cached tenant id.
|
||||
|
||||
If os_cache enabled and tenant id already cached, return tenant id.
|
||||
Otherwise return None.
|
||||
"""
|
||||
if not self.args.os_cache:
|
||||
return None
|
||||
tenant_id = None
|
||||
block = self.keyring.get_password('manilaclient_auth',
|
||||
self._make_key())
|
||||
if block:
|
||||
_token, _management_url, tenant_id = block.split('|', 2)
|
||||
return tenant_id
|
||||
|
||||
|
||||
class OpenStackManilaShell(object):
|
||||
|
||||
def get_base_parser(self):
|
||||
@ -95,6 +279,17 @@ class OpenStackManilaShell(object):
|
||||
default=False),
|
||||
help="Print debugging output")
|
||||
|
||||
parser.add_argument('--os-cache',
|
||||
default=utils.env('OS_CACHE', default=False),
|
||||
action='store_true',
|
||||
help='Use the auth token cache. '
|
||||
'Defaults to env[OS_CACHE].')
|
||||
|
||||
parser.add_argument('--os-reset-cache',
|
||||
default=False,
|
||||
action='store_true',
|
||||
help='Delete cached password and auth token.')
|
||||
|
||||
parser.add_argument('--os-username',
|
||||
metavar='<auth-user-name>',
|
||||
default=utils.env('OS_USERNAME',
|
||||
@ -340,12 +535,12 @@ class OpenStackManilaShell(object):
|
||||
(os_username, os_password, os_tenant_name, os_auth_url,
|
||||
os_region_name, os_tenant_id, endpoint_type, insecure,
|
||||
service_type, service_name, share_service_name,
|
||||
cacert) = (
|
||||
cacert, os_cache, os_reset_cache) = (
|
||||
args.os_username, args.os_password, args.os_tenant_name,
|
||||
args.os_auth_url, args.os_region_name, args.os_tenant_id,
|
||||
args.endpoint_type, args.insecure, args.service_type,
|
||||
args.service_name, args.share_service_name,
|
||||
args.os_cacert)
|
||||
args.os_cacert, args.os_cache, args.os_reset_cache)
|
||||
|
||||
if not endpoint_type:
|
||||
endpoint_type = DEFAULT_MANILA_ENDPOINT_TYPE
|
||||
@ -363,11 +558,6 @@ class OpenStackManilaShell(object):
|
||||
"You must provide a username "
|
||||
"via either --os-username or env[OS_USERNAME]")
|
||||
|
||||
if not os_password:
|
||||
raise exc.CommandError("You must provide a password "
|
||||
"via either --os-password or via "
|
||||
"env[OS_PASSWORD]")
|
||||
|
||||
if not (os_tenant_name or os_tenant_id):
|
||||
raise exc.CommandError("You must provide a tenant_id "
|
||||
"via either --os-tenant-id or "
|
||||
@ -399,15 +589,28 @@ class OpenStackManilaShell(object):
|
||||
share_service_name=share_service_name,
|
||||
retries=options.retries,
|
||||
http_log_debug=args.debug,
|
||||
cacert=cacert)
|
||||
|
||||
try:
|
||||
if not utils.isunauthenticated(args.func):
|
||||
self.cs.authenticate()
|
||||
except exc.Unauthorized:
|
||||
raise exc.CommandError("Invalid OpenStack Manila credentials.")
|
||||
except exc.AuthorizationFailure:
|
||||
raise exc.CommandError("Unable to authorize user")
|
||||
cacert=cacert,
|
||||
os_cache=os_cache)
|
||||
if not utils.isunauthenticated(args.func):
|
||||
helper = SecretsHelper(args, self.cs.client)
|
||||
if os_reset_cache:
|
||||
helper.reset()
|
||||
self.cs.client.keyring_saver = helper
|
||||
if (helper.tenant_id and helper.auth_token and
|
||||
helper.management_url and
|
||||
helper.check_cached_password()):
|
||||
self.cs.client.tenant_id = helper.tenant_id
|
||||
self.cs.client.auth_token = helper.auth_token
|
||||
self.cs.client.management_url = helper.management_url
|
||||
else:
|
||||
self.cs.client.password = helper.password
|
||||
try:
|
||||
self.cs.authenticate()
|
||||
except exc.Unauthorized:
|
||||
raise exc.CommandError("Invalid OpenStack Manila "
|
||||
"credentials.")
|
||||
except exc.AuthorizationFailure:
|
||||
raise exc.CommandError("Unable to authorize user")
|
||||
|
||||
args.func(self.cs, args)
|
||||
|
||||
|
@ -31,7 +31,7 @@ class Client(object):
|
||||
service_type='share', service_name=None,
|
||||
share_service_name=None, retries=None,
|
||||
http_log_debug=False,
|
||||
cacert=None):
|
||||
cacert=None, os_cache=False):
|
||||
# FIXME(comstud): Rename the api_key argument above when we
|
||||
# know it's not being used as keyword argument
|
||||
password = api_key
|
||||
@ -73,7 +73,8 @@ class Client(object):
|
||||
share_service_name=share_service_name,
|
||||
retries=retries,
|
||||
http_log_debug=http_log_debug,
|
||||
cacert=cacert)
|
||||
cacert=cacert,
|
||||
os_cache=os_cache)
|
||||
|
||||
def authenticate(self):
|
||||
"""Authenticate against the server.
|
||||
|
@ -1,7 +1,9 @@
|
||||
pbr>=0.6,!=0.7,<1.0
|
||||
argparse
|
||||
iso8601>=0.1.9
|
||||
keyring>=2.1
|
||||
PrettyTable>=0.7,<0.8
|
||||
pycrypto>=2.6
|
||||
requests>=1.1
|
||||
simplejson>=2.0.9
|
||||
Babel>=1.3
|
||||
|
@ -15,10 +15,13 @@
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
import requests
|
||||
|
||||
from manilaclient import client
|
||||
from manilaclient import exceptions
|
||||
from manilaclient.openstack.common import jsonutils
|
||||
from manilaclient import shell
|
||||
from manilaclient.v1 import client as client_v1
|
||||
from manilaclient.v1 import shell as shell_v1
|
||||
from tests import utils
|
||||
from tests.v1 import fakes
|
||||
@ -53,7 +56,8 @@ class ShellTest(utils.TestCase):
|
||||
# testing a SystemExit to be thrown and object self.shell has
|
||||
# no time to get instantatiated which is OK in this case, so
|
||||
# we make sure the method is there before launching it.
|
||||
if hasattr(self.shell, 'cs'):
|
||||
if hasattr(self.shell, 'cs') and hasattr(self.shell.cs,
|
||||
'clear_callstack'):
|
||||
self.shell.cs.clear_callstack()
|
||||
|
||||
#HACK(bcwaldon): replace this when we start using stubs
|
||||
@ -279,3 +283,267 @@ class ShellTest(utils.TestCase):
|
||||
}
|
||||
}
|
||||
self.assert_called("POST", "/shares", body=expected)
|
||||
|
||||
@mock.patch.object(fakes.FakeClient, 'authenticate', mock.Mock())
|
||||
@mock.patch.object(shell.SecretsHelper, '_make_key', mock.Mock())
|
||||
@mock.patch.object(shell.SecretsHelper, 'password', mock.Mock())
|
||||
@mock.patch.object(shell.SecretsHelper, 'check_cached_password',
|
||||
mock.Mock())
|
||||
def test_os_cache_enabled_keys_saved_password_not_changed(self):
|
||||
shell.SecretsHelper.tenant_id = \
|
||||
shell.SecretsHelper.auth_token = \
|
||||
shell.SecretsHelper.management_url = 'fake'
|
||||
self.run_command('--os-cache list')
|
||||
self.assertFalse(shell.SecretsHelper.password.called)
|
||||
self.assertFalse(fakes.FakeClient.authenticate.called)
|
||||
|
||||
@mock.patch.object(shell.SecretsHelper, '_validate_string', mock.Mock())
|
||||
@mock.patch.object(shell.SecretsHelper, '_prompt_password', mock.Mock())
|
||||
@mock.patch.object(shell.ManilaKeyring, 'get_password', mock.Mock())
|
||||
def test_os_cache_enabled_keys_not_saved_with_password(self):
|
||||
shell.SecretsHelper._validate_string.return_value = True
|
||||
shell.ManilaKeyring.get_password.return_value = None
|
||||
shell.SecretsHelper.tenant_id = \
|
||||
shell.SecretsHelper.auth_token = \
|
||||
shell.SecretsHelper.management_url = None
|
||||
self.run_command('--os-cache list')
|
||||
self.assertFalse(shell.SecretsHelper._prompt_password.called)
|
||||
shell.SecretsHelper._validate_string.\
|
||||
assert_called_once_with('password')
|
||||
|
||||
@mock.patch.object(shell.SecretsHelper, '_prompt_password', mock.Mock())
|
||||
@mock.patch.object(shell.ManilaKeyring, 'get_password', mock.Mock())
|
||||
def test_os_cache_enabled_keys_not_saved_no_password(self):
|
||||
shell.ManilaKeyring.get_password.return_value = None
|
||||
shell.SecretsHelper._prompt_password.return_value = 'password'
|
||||
self.useFixture(fixtures.EnvironmentVariable('MANILA_PASSWORD', ''))
|
||||
shell.SecretsHelper.tenant_id = \
|
||||
shell.SecretsHelper.auth_token = \
|
||||
shell.SecretsHelper.management_url = None
|
||||
self.run_command('--os-cache list')
|
||||
self.assertTrue(shell.SecretsHelper._prompt_password.called)
|
||||
|
||||
@mock.patch.object(shell.SecretsHelper, '_validate_string', mock.Mock())
|
||||
@mock.patch.object(shell.ManilaKeyring, 'get_password', mock.Mock())
|
||||
@mock.patch.object(shell.SecretsHelper, 'reset', mock.Mock())
|
||||
def test_os_cache_enabled_keys_reset_cached_password(self):
|
||||
shell.ManilaKeyring.get_password.return_value = 'old_password'
|
||||
shell.SecretsHelper._validate_string.return_value = True
|
||||
shell.SecretsHelper.tenant_id = \
|
||||
shell.SecretsHelper.auth_token = \
|
||||
shell.SecretsHelper.management_url = None
|
||||
self.run_command('--os-cache --os-reset-cache list')
|
||||
shell.SecretsHelper._validate_string.\
|
||||
assert_called_once_with('password')
|
||||
shell.SecretsHelper.reset.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(shell.SecretsHelper, '_validate_string', mock.Mock())
|
||||
@mock.patch.object(shell.SecretsHelper, 'reset', mock.Mock())
|
||||
def test_os_cache_disabled_keys_reset_cached_password(self):
|
||||
shell.SecretsHelper._validate_string.return_value = True
|
||||
self.run_command('--os-reset-cache list')
|
||||
shell.SecretsHelper._validate_string.\
|
||||
assert_called_once_with('password')
|
||||
shell.SecretsHelper.reset.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(fakes.FakeClient, 'authenticate', mock.Mock())
|
||||
@mock.patch.object(shell.ManilaKeyring, 'get_password', mock.Mock())
|
||||
@mock.patch.object(shell.SecretsHelper, '_make_key', mock.Mock())
|
||||
def test_os_cache_enabled_os_password_differs_from_the_cached_one(self):
|
||||
def _fake_get_password(service, username):
|
||||
if service == 'openstack':
|
||||
return 'old_cached_password'
|
||||
else:
|
||||
return 'old_cached_token'
|
||||
shell.SecretsHelper.tenant_id = \
|
||||
shell.SecretsHelper.auth_token = \
|
||||
shell.SecretsHelper.management_url = 'fake'
|
||||
self.run_command('--os-cache list')
|
||||
fakes.FakeClient.authenticate.assert_called_once_with()
|
||||
|
||||
@mock.patch.object(requests, 'request', mock.Mock())
|
||||
@mock.patch.object(client.HTTPClient, '_save_keys', mock.Mock())
|
||||
def test_os_cache_token_expired(self):
|
||||
def _fake_request(method, url, **kwargs):
|
||||
headers = None
|
||||
if url == 'new_url/shares/detail':
|
||||
resp_text = {"shares": []}
|
||||
return utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"text": jsonutils.dumps(resp_text),
|
||||
})
|
||||
elif url == 'fake/shares/detail':
|
||||
resp_text = {"unauthorized": {"message": "Unauthorized",
|
||||
"code": "401"}}
|
||||
return utils.TestResponse({
|
||||
"status_code": 401,
|
||||
"text": jsonutils.dumps(resp_text),
|
||||
})
|
||||
else:
|
||||
headers = {
|
||||
'x-server-management-url': 'new_url',
|
||||
'x-auth-token': 'new_token',
|
||||
}
|
||||
resp_text = 'some_text'
|
||||
return utils.TestResponse({
|
||||
"status_code": 200,
|
||||
"text": jsonutils.dumps(resp_text),
|
||||
"headers": headers
|
||||
})
|
||||
|
||||
client.get_client_class = lambda *_: client_v1.Client
|
||||
shell.SecretsHelper.tenant_id = \
|
||||
shell.SecretsHelper.auth_token = \
|
||||
shell.SecretsHelper.management_url = 'fake'
|
||||
requests.request.side_effect = _fake_request
|
||||
|
||||
self.run_command('--os-cache list')
|
||||
|
||||
client.HTTPClient._save_keys.assert_called_once_with()
|
||||
expected_headers = {
|
||||
'X-Auth-Project-Id': 'project_id',
|
||||
'User-Agent': 'python-manilaclient',
|
||||
'Accept': 'application/json',
|
||||
'X-Auth-Token': 'new_token'}
|
||||
requests.request.assert_called_with('GET', 'new_url/shares/detail',
|
||||
headers=expected_headers, verify=True)
|
||||
|
||||
|
||||
class SecretsHelperTestCase(utils.TestCase):
|
||||
def setUp(self):
|
||||
super(SecretsHelperTestCase, self).setUp()
|
||||
self.cs = client.Client(1, 'user', 'password',
|
||||
project_id='project',
|
||||
auth_url='http://111.11.11.11:5000',
|
||||
region_name='region',
|
||||
endpoint_type='publicURL',
|
||||
service_type='share',
|
||||
service_name='fake',
|
||||
share_service_name='fake')
|
||||
self.args = mock.Mock()
|
||||
self.args.os_cache = True
|
||||
self.args.reset_cached_password = False
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
mock.patch.object(shell.ManilaKeyring, 'set_password',
|
||||
mock.Mock()).start()
|
||||
mock.patch.object(shell.ManilaKeyring, 'get_password', mock.Mock(
|
||||
return_value='fake_token|fake_url|fake_tenant_id')).start()
|
||||
self.addCleanup(mock.patch.stopall)
|
||||
|
||||
def test_validate_string_void_string(self):
|
||||
self.assertFalse(self.helper._validate_string(''))
|
||||
|
||||
def test_validate_string_void_string(self):
|
||||
self.assertFalse(self.helper._validate_string(None))
|
||||
|
||||
def test_validate_string_good_string(self):
|
||||
self.assertTrue(self.helper._validate_string('this is a string'))
|
||||
|
||||
def test_make_key(self):
|
||||
expected_key = 'http://111.11.11.11:5000/user/project/region/' \
|
||||
'publicURL/share/fake/fake'
|
||||
self.assertEqual(self.helper._make_key(), expected_key)
|
||||
|
||||
def test_make_key_missing_attrs(self):
|
||||
self.cs.client.service_name = self.cs.client.region_name = None
|
||||
expected_key = 'http://111.11.11.11:5000/user/project/?/' \
|
||||
'publicURL/share/?/fake'
|
||||
self.assertEqual(self.helper._make_key(), expected_key)
|
||||
|
||||
def test_save(self):
|
||||
shell.ManilaKeyring.get_password.return_value = ''
|
||||
expected_key = 'http://111.11.11.11:5000/user/project/region/' \
|
||||
'publicURL/share/fake/fake'
|
||||
self.helper.save('fake_token', 'fake_url', 'fake_tenant_id')
|
||||
shell.ManilaKeyring.set_password. \
|
||||
assert_called_once_with('manilaclient_auth',
|
||||
expected_key, 'fake_token|fake_url|fake_tenant_id')
|
||||
|
||||
def test_save_params_already_cached(self):
|
||||
self.helper.save('fake_token', 'fake_url', 'fake_tenant_id')
|
||||
self.assertFalse(shell.ManilaKeyring.set_password.called)
|
||||
|
||||
def test_save_missing_params(self):
|
||||
self.assertRaises(ValueError, self.helper.save, None, 'fake_url',
|
||||
'fake_tenant_id')
|
||||
|
||||
def test_password_os_password(self):
|
||||
self.args.os_password = 'fake_password'
|
||||
self.args.os_username = 'fake_username'
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
self.assertEqual(self.helper.password, 'fake_password')
|
||||
|
||||
@mock.patch.object(shell.SecretsHelper, '_prompt_password', mock.Mock())
|
||||
def test_password_from_keyboard(self):
|
||||
self.args.os_password = ''
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
shell.SecretsHelper._prompt_password.return_value = None
|
||||
self.assertRaises(exceptions.CommandError, getattr, self.helper,
|
||||
'password')
|
||||
|
||||
def test_management_url_os_cache_false(self):
|
||||
self.args.os_cache = False
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
self.assertIsNone(self.helper.management_url)
|
||||
|
||||
def test_management_url_os_cache_true(self):
|
||||
self.assertEqual(self.helper.management_url, 'fake_url')
|
||||
expected_key = 'http://111.11.11.11:5000/user/project/region/' \
|
||||
'publicURL/share/fake/fake'
|
||||
shell.ManilaKeyring.get_password. \
|
||||
assert_called_once_with('manilaclient_auth', expected_key)
|
||||
|
||||
def test_auth_token_os_cache_false(self):
|
||||
self.args.os_cache = False
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
self.assertIsNone(self.helper.auth_token)
|
||||
|
||||
def test_management_url_os_cache_true(self):
|
||||
self.assertEqual(self.helper.auth_token, 'fake_token')
|
||||
expected_key = 'http://111.11.11.11:5000/user/project/region/' \
|
||||
'publicURL/share/fake/fake'
|
||||
shell.ManilaKeyring.get_password. \
|
||||
assert_called_once_with('manilaclient_auth', expected_key)
|
||||
|
||||
def test_tenant_id_os_cache_false(self):
|
||||
self.args.os_cache = False
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
self.assertIsNone(self.helper.tenant_id)
|
||||
|
||||
def test_tenant_id_os_cache_true(self):
|
||||
self.assertEqual(self.helper.tenant_id, 'fake_tenant_id')
|
||||
expected_key = 'http://111.11.11.11:5000/user/project/region/' \
|
||||
'publicURL/share/fake/fake'
|
||||
shell.ManilaKeyring.get_password. \
|
||||
assert_called_once_with('manilaclient_auth', expected_key)
|
||||
|
||||
def test_check_cached_password_os_cache_false(self):
|
||||
self.args.os_cache = False
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
self.assertTrue(self.helper.check_cached_password())
|
||||
|
||||
def test_check_cached_password_same_passwords(self):
|
||||
self.args.os_password = 'user_password'
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
shell.ManilaKeyring.get_password.return_value = 'user_password'
|
||||
self.assertTrue(self.helper.check_cached_password())
|
||||
|
||||
def test_check_cached_password_no_cache(self):
|
||||
shell.ManilaKeyring.get_password.return_value = None
|
||||
self.assertTrue(self.helper.check_cached_password())
|
||||
|
||||
def test_check_cached_password_different_passwords(self):
|
||||
self.args.os_password = 'new_user_password'
|
||||
self.helper = shell.SecretsHelper(self.args, self.cs.client)
|
||||
shell.ManilaKeyring.get_password.return_value = 'cached_password'
|
||||
self.assertFalse(self.helper.check_cached_password())
|
||||
|
||||
def test_check_cached_password_cached_password_deleted(self):
|
||||
def _fake_get_password(service, username):
|
||||
if service == 'openstack':
|
||||
return None
|
||||
else:
|
||||
return 'fake_token'
|
||||
|
||||
shell.ManilaKeyring.get_password.side_effect = _fake_get_password
|
||||
self.assertFalse(self.helper.check_cached_password())
|
||||
|
Loading…
Reference in New Issue
Block a user