Make OSC_Config the default

Reworks ClientManager and OpenStackShell to use the OSC_Config
subclass to handle all of the argument correctness checking.
These checks will eventually make their way into os-client-config
once all of the duplication and timing is worked out.

test_clientmanager is radically rewritten since ClientManager no
longer handles loading auth plugins.

osc_lib.tests.utils.TestClientManager is meant to be used by tests
for ClientManager subclasses. _clientmanager_class() must be overridden
to provide the correct subclass for instantiation.

Change-Id: I7dae0d17ee7940152bda52cd0020792bbc1a7dac
This commit is contained in:
Dean Troyer 2016-06-24 09:17:32 -05:00
parent 7084903ec5
commit a8ce81c6c4
7 changed files with 524 additions and 544 deletions

View File

@ -67,74 +67,6 @@ def get_options_list():
return OPTIONS_LIST
def select_auth_plugin(options):
"""Pick an auth plugin based on --os-auth-type or other options"""
auth_plugin_name = None
# Do the token/url check first as this must override the default
# 'password' set by os-client-config
# Also, url and token are not copied into o-c-c's auth dict (yet?)
if options.auth.get('url') and options.auth.get('token'):
# service token authentication
auth_plugin_name = 'token_endpoint'
elif options.auth_type in PLUGIN_LIST:
# A direct plugin name was given, use it
auth_plugin_name = options.auth_type
elif options.auth.get('username'):
if options.identity_api_version == '3':
auth_plugin_name = 'v3password'
elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2password'
else:
# let keystoneauth figure it out itself
auth_plugin_name = 'password'
elif options.auth.get('token'):
if options.identity_api_version == '3':
auth_plugin_name = 'v3token'
elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2token'
else:
# let keystoneauth figure it out itself
auth_plugin_name = 'token'
else:
# The ultimate default is similar to the original behaviour,
# but this time with version discovery
auth_plugin_name = 'password'
LOG.debug("Auth plugin %s selected", auth_plugin_name)
return auth_plugin_name
def build_auth_params(auth_plugin_name, cmd_options):
if auth_plugin_name:
LOG.debug('auth_type: %s', auth_plugin_name)
auth_plugin_loader = base.get_plugin_loader(auth_plugin_name)
auth_params = {
opt.dest: opt.default
for opt in base.get_plugin_options(auth_plugin_name)
}
auth_params.update(dict(cmd_options.auth))
# grab tenant from project for v2.0 API compatibility
if auth_plugin_name.startswith("v2"):
if 'project_id' in auth_params:
auth_params['tenant_id'] = auth_params['project_id']
del auth_params['project_id']
if 'project_name' in auth_params:
auth_params['tenant_name'] = auth_params['project_name']
del auth_params['project_name']
else:
LOG.debug('no auth_type')
# delay the plugin choice, grab every option
auth_plugin_loader = None
auth_params = dict(cmd_options.auth)
plugin_options = set([o.replace('-', '_') for o in get_options_list()])
for option in plugin_options:
LOG.debug('fetching option %s', option)
auth_params[option] = getattr(cmd_options.auth, option, None)
return (auth_plugin_loader, auth_params)
def check_valid_authorization_options(options, auth_plugin_name):
"""Validate authorization options, and provide helpful error messages."""
if (options.auth.get('project_id') and not

View File

@ -87,7 +87,6 @@ class ClientManager(object):
self._cli_options = cli_options
self._api_version = api_version
self._pw_callback = pw_func
self._url = self._cli_options.auth.get('url')
self.region_name = self._cli_options.region_name
self.interface = self._cli_options.interface
@ -197,9 +196,8 @@ class ClientManager(object):
if self._auth_setup_completed:
return
# If no auth type is named by the user, select one based on
# the supplied options
self.auth_plugin_name = auth.select_auth_plugin(self._cli_options)
# Stash the selected auth type
self.auth_plugin_name = self._cli_options.config['auth_type']
# Basic option checking to avoid unhelpful error messages
auth.check_valid_authentication_options(
@ -213,23 +211,16 @@ class ClientManager(object):
not self._cli_options.auth.get('password')):
self._cli_options.auth['password'] = self._pw_callback()
(auth_plugin, self._auth_params) = auth.build_auth_params(
self.auth_plugin_name,
self._cli_options,
)
self._set_default_scope_options()
# For compatibility until all clients can be updated
if 'project_name' in self._auth_params:
self._project_name = self._auth_params['project_name']
elif 'tenant_name' in self._auth_params:
self._project_name = self._auth_params['tenant_name']
if 'project_name' in self._cli_options.auth:
self._project_name = self._cli_options.auth['project_name']
elif 'tenant_name' in self._cli_options.auth:
self._project_name = self._cli_options.auth['tenant_name']
LOG.info('Using auth plugin: %s', self.auth_plugin_name)
LOG.debug('Using parameters %s',
strutils.mask_password(self._auth_params))
self.auth = auth_plugin.load_from_options(**self._auth_params)
strutils.mask_password(self._cli_options.auth))
self.auth = self._cli_options.get_auth()
# needed by SAML authentication
request_session = requests.session()
self.session = osc_session.TimingSession(

View File

@ -29,6 +29,7 @@ from cliff import help
from oslo_utils import importutils
from oslo_utils import strutils
from osc_lib.cli import client_config as cloud_config
from osc_lib import clientmanager
from osc_lib.command import commandmanager
from osc_lib.command import timing
@ -37,8 +38,6 @@ from osc_lib.i18n import _
from osc_lib import logs
from osc_lib import utils
from os_client_config import config as cloud_config
osprofiler_profiler = importutils.try_import("osprofiler.profiler")
@ -102,7 +101,7 @@ class OpenStackShell(app.App):
if not command_manager:
cm = commandmanager.CommandManager('openstack.cli')
else:
command_manager
cm = command_manager
super(OpenStackShell, self).__init__(
description=__doc__.strip(),
@ -374,7 +373,7 @@ class OpenStackShell(app.App):
# Ignore the default value of interface. Only if it is set later
# will it be used.
try:
cc = cloud_config.OpenStackConfig(
cc = cloud_config.OSC_Config(
override_defaults={
'interface': None,
'auth_type': self._auth_type,

View File

@ -134,7 +134,6 @@ class FakeClientManager(object):
self.session = None
self.auth_ref = None
self.auth_plugin_name = None
self.network_endpoint_enabled = True
def get_configuration(self):
return {
@ -147,9 +146,6 @@ class FakeClientManager(object):
'identity_api_version': VERSION,
}
def is_network_endpoint_enabled(self):
return self.network_endpoint_enabled
class FakeModule(object):

View File

@ -13,12 +13,14 @@
# under the License.
#
import json as jsonutils
import copy
import mock
from keystoneauth1.access import service_catalog
from keystoneauth1.identity import v2 as auth_v2
from requests_mock.contrib import fixture
from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1.identity import generic as generic_plugin
from keystoneauth1 import loading
from os_client_config import cloud_config
from osc_lib.api import auth
from osc_lib import clientmanager
@ -26,8 +28,6 @@ from osc_lib import exceptions as exc
from osc_lib.tests import fakes
from osc_lib.tests import utils
API_VERSION = {"identity": "2.0"}
AUTH_REF = {'version': 'v2.0'}
AUTH_REF.update(fakes.TEST_RESPONSE_DICT['access'])
SERVICE_CATALOG = service_catalog.ServiceCatalogV2(AUTH_REF)
@ -45,27 +45,6 @@ class Container(object):
pass
class FakeOptions(object):
def __init__(self, **kwargs):
for option in auth.OPTIONS_LIST:
setattr(self, option.replace('-', '_'), None)
self.auth_type = None
self.verify = True
self.cacert = None
self.insecure = None
self.identity_api_version = '2.0'
self.timing = None
self.region_name = None
self.interface = None
self.url = None
self.auth = {}
self.cert = None
self.key = None
self.default_domain = 'default'
self.__dict__.update(kwargs)
class TestClientCache(utils.TestCase):
def test_singleton(self):
@ -82,61 +61,29 @@ class TestClientCache(utils.TestCase):
self.assertEqual("'Container' object has no attribute 'foo'", str(err))
class TestClientManager(utils.TestCase):
def setUp(self):
super(TestClientManager, self).setUp()
self.mock = mock.Mock()
self.requests = self.useFixture(fixture.Fixture())
# fake v2password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT)
# fake token and token_endpoint retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT,
url='/'.join([fakes.AUTH_URL, 'v2.0/tokens']))
# fake v3password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens']))
# fake password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'auth/tokens']))
# fake password version endpoint discovery
self.stub_auth(json=fakes.TEST_VERSIONS,
url=fakes.AUTH_URL,
verb='GET')
class TestClientManager(utils.TestClientManager):
def test_client_manager_password(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
),
api_version=API_VERSION,
)
client_manager.setup_auth()
client_manager.auth_ref
client_manager = self._make_clientmanager()
self.assertEqual(
fakes.AUTH_URL,
client_manager._auth_url,
client_manager._cli_options.config['auth']['auth_url'],
)
self.assertEqual(
fakes.USERNAME,
client_manager._username,
client_manager._cli_options.config['auth']['username'],
)
self.assertEqual(
fakes.PASSWORD,
client_manager._password,
client_manager._cli_options.config['auth']['password'],
)
self.assertIsInstance(
client_manager.auth,
auth_v2.Password,
generic_plugin.Password,
)
self.assertTrue(client_manager.verify)
self.assertIsNone(client_manager.cert)
# These need to stick around until the old-style clients are gone
self.assertEqual(
@ -153,84 +100,20 @@ class TestClientManager(utils.TestCase):
)
self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_endpoint_disabled(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
user_domain_name='default',
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
project_domain_name='default',
),
auth_type='v3password',
),
api_version={"identity": "3"},
)
client_manager.setup_auth()
client_manager.auth_ref
# v3 fake doesn't have network endpoint.
self.assertFalse(client_manager.is_service_available('network'))
def stub_auth(self, json=None, url=None, verb=None, **kwargs):
subject_token = fakes.AUTH_TOKEN
base_url = fakes.AUTH_URL
if json:
text = jsonutils.dumps(json)
headers = {'X-Subject-Token': subject_token,
'Content-Type': 'application/json'}
if not url:
url = '/'.join([base_url, 'tokens'])
url = url.replace("/?", "?")
if not verb:
verb = 'POST'
self.requests.register_uri(verb,
url,
headers=headers,
text=text)
def test_client_manager_password_verify(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
verify=True,
),
api_version=API_VERSION,
)
client_manager.setup_auth()
client_manager.auth_ref
client_manager = self._make_clientmanager()
self.assertTrue(client_manager.verify)
self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_password_verify_ca(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
cacert='cafile',
),
api_version=API_VERSION,
config_args = {
'cacert': 'cafile',
}
client_manager = self._make_clientmanager(
config_args=config_args,
)
client_manager.setup_auth()
client_manager.auth_ref
# Test that client_manager.verify is Requests-compatible,
# i.e. it contains the value of cafile here
@ -240,146 +123,151 @@ class TestClientManager(utils.TestCase):
self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_password_verify_insecure(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
insecure=True,
),
api_version=API_VERSION,
config_args = {
'insecure': True,
}
client_manager = self._make_clientmanager(
config_args=config_args,
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertFalse(client_manager.verify)
self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_password_verify_insecure_ca(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
insecure=True,
cacert='cafile',
),
api_version=API_VERSION,
config_args = {
'insecure': True,
'cacert': 'cafile',
}
client_manager = self._make_clientmanager(
config_args=config_args,
)
client_manager.setup_auth()
client_manager.auth_ref
# insecure overrides cacert
self.assertFalse(client_manager.verify)
self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_password_no_cert(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions())
self.assertIsNone(client_manager.cert)
def test_client_manager_password_client_cert(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(cert='cert'))
config_args = {
'cert': 'cert',
}
client_manager = self._make_clientmanager(
config_args=config_args,
)
self.assertEqual('cert', client_manager.cert)
def test_client_manager_password_client_cert_and_key(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(cert='cert', key='key'))
def test_client_manager_password_client_key(self):
config_args = {
'cert': 'cert',
'key': 'key',
}
client_manager = self._make_clientmanager(
config_args=config_args,
)
self.assertEqual(('cert', 'key'), client_manager.cert)
def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name):
auth_params['auth_type'] = auth_plugin_name
auth_params['identity_api_version'] = api_version
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(**auth_params),
api_version={"identity": api_version},
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertEqual(
auth_plugin_name,
client_manager.auth_plugin_name,
)
def test_client_manager_select_auth_plugin(self):
# test token auth
params = dict(
auth=dict(
auth_url=fakes.AUTH_URL,
token=fakes.AUTH_TOKEN,
),
)
self._select_auth_plugin(params, '2.0', 'v2token')
self._select_auth_plugin(params, '3', 'v3token')
self._select_auth_plugin(params, 'XXX', 'token')
# test token/endpoint auth
params = dict(
auth_plugin='token_endpoint',
auth=dict(
url='test',
token=fakes.AUTH_TOKEN,
),
)
def test_client_manager_select_auth_plugin_password(self):
# test password auth
params = dict(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_args = {
'auth_url': fakes.AUTH_URL,
'username': fakes.USERNAME,
'password': fakes.PASSWORD,
'tenant_name': fakes.PROJECT_NAME,
}
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='2.0',
auth_plugin_name='v2password',
)
self._select_auth_plugin(params, '2.0', 'v2password')
params = dict(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
user_domain_name='default',
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
project_domain_name='default',
),
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.update({
'user_domain_name': 'default',
'project_domain_name': 'default',
})
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='3',
auth_plugin_name='v3password',
)
# Use v2.0 auth args
auth_args = {
'auth_url': fakes.AUTH_URL,
'username': fakes.USERNAME,
'password': fakes.PASSWORD,
'tenant_name': fakes.PROJECT_NAME,
}
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='2.0',
)
# Use v3 auth args
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.update({
'user_domain_name': 'default',
'project_domain_name': 'default',
})
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='3',
)
def test_client_manager_select_auth_plugin_token(self):
# test token auth
self._make_clientmanager(
# auth_args=auth_args,
identity_api_version='2.0',
auth_plugin_name='v2token',
)
self._make_clientmanager(
# auth_args=auth_args,
identity_api_version='3',
auth_plugin_name='v3token',
)
self._make_clientmanager(
# auth_args=auth_args,
identity_api_version='x',
auth_plugin_name='token',
)
self._select_auth_plugin(params, '3', 'v3password')
self._select_auth_plugin(params, 'XXX', 'password')
def test_client_manager_select_auth_plugin_failure(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(os_auth_plugin=''),
api_version=API_VERSION,
)
self.assertRaises(
exc.CommandError,
client_manager.setup_auth,
ksa_exceptions.NoMatchingPlugin,
self._make_clientmanager,
identity_api_version='3',
auth_plugin_name='bad_plugin',
)
@mock.patch('osc_lib.api.auth.check_valid_authentication_options')
def test_client_manager_auth_setup_once(self, check_authn_options_func):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
auth_dict = {
'auth_url': fakes.AUTH_URL,
'username': fakes.USERNAME,
'password': fakes.PASSWORD,
'project_name': fakes.PROJECT_NAME,
}
loader = loading.get_plugin_loader('password')
auth_plugin = loader.load_from_options(**auth_dict)
client_manager = self._clientmanager_class()(
cli_options=cloud_config.CloudConfig(
name='t1',
region='1',
config=dict(
auth_type='password',
auth=auth_dict,
interface=fakes.INTERFACE,
region_name=fakes.REGION_NAME,
),
auth_plugin=auth_plugin,
),
api_version=API_VERSION,
api_version={
'identity': '2.0',
},
)
self.assertFalse(client_manager._auth_setup_completed)
client_manager.setup_auth()
@ -391,3 +279,18 @@ class TestClientManager(utils.TestCase):
check_authn_options_func.reset_mock()
client_manager.auth_ref
check_authn_options_func.assert_not_called()
def test_client_manager_endpoint_disabled(self):
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.update({
'user_domain_name': 'default',
'project_domain_name': 'default',
})
# v3 fake doesn't have network endpoint
client_manager = self._make_clientmanager(
auth_args=auth_args,
identity_api_version='3',
auth_plugin_name='v3password',
)
self.assertFalse(client_manager.is_service_available('network'))

View File

@ -14,12 +14,10 @@
#
import copy
import fixtures
import mock
import os
import testtools
from osc_lib import shell
from osc_lib.tests import utils
@ -117,68 +115,12 @@ global_options = {
}
def opt2attr(opt):
if opt.startswith('--os-'):
attr = opt[5:]
elif opt.startswith('--'):
attr = opt[2:]
else:
attr = opt
return attr.lower().replace('-', '_')
def opt2env(opt):
return opt[2:].upper().replace('-', '_')
def make_shell():
"""Create a new command shell and mock out some bits."""
_shell = shell.OpenStackShell()
_shell.command_manager = mock.Mock()
_shell.cloud = mock.Mock()
return _shell
def fake_execute(shell, cmd):
"""Pretend to execute shell commands."""
return shell.run(cmd.split())
class EnvFixture(fixtures.Fixture):
"""Environment Fixture.
This fixture replaces os.environ with provided env or an empty env.
"""
def __init__(self, env=None):
self.new_env = env or {}
def _setUp(self):
self.orig_env, os.environ = os.environ, self.new_env
self.addCleanup(self.revert)
def revert(self):
os.environ = self.orig_env
class TestShell(utils.TestCase):
def setUp(self):
super(TestShell, self).setUp()
patch = "osc_lib.shell.OpenStackShell.run_subcommand"
self.cmd_patch = mock.patch(patch)
self.cmd_save = self.cmd_patch.start()
self.addCleanup(self.cmd_patch.stop)
self.app = mock.Mock("Test Shell")
class TestShellHelp(TestShell):
class TestShellHelp(utils.TestShell):
"""Test the deferred help flag"""
def setUp(self):
super(TestShellHelp, self).setUp()
self.useFixture(EnvFixture())
self.useFixture(utils.EnvFixture())
@testtools.skip("skip until bug 1444983 is resolved")
def test_help_options(self):
@ -186,12 +128,9 @@ class TestShellHelp(TestShell):
kwargs = {
"deferred_help": True,
}
with mock.patch(
"osc_lib.shell.OpenStackShell.initialize_app",
self.app,
):
_shell, _cmd = make_shell(), flag
fake_execute(_shell, _cmd)
with mock.patch(self.app_patch + ".initialize_app", self.app):
_shell, _cmd = utils.make_shell(), flag
utils.fake_execute(_shell, _cmd)
self.assertEqual(
kwargs["deferred_help"],
@ -199,7 +138,7 @@ class TestShellHelp(TestShell):
)
class TestShellOptions(TestShell):
class TestShellOptions(utils.TestShell):
"""Test the option handling by argparse and os_client_config
This covers getting the CLI options through the initial processing
@ -208,119 +147,12 @@ class TestShellOptions(TestShell):
def setUp(self):
super(TestShellOptions, self).setUp()
self.useFixture(EnvFixture())
self.useFixture(utils.EnvFixture())
def _assert_initialize_app_arg(self, cmd_options, default_args):
"""Check the args passed to initialize_app()
The argv argument to initialize_app() is the remainder from parsing
global options declared in both cliff.app and
osc_lib.OpenStackShell build_option_parser(). Any global
options passed on the command line should not be in argv but in
_shell.options.
"""
with mock.patch(
"osc_lib.shell.OpenStackShell.initialize_app",
self.app,
):
_shell, _cmd = make_shell(), cmd_options + " module list"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["module", "list"])
for k in default_args.keys():
self.assertEqual(
default_args[k],
vars(_shell.options)[k],
"%s does not match" % k,
)
def _assert_cloud_config_arg(self, cmd_options, default_args):
"""Check the args passed to cloud_config.get_one_cloud()
The argparse argument to get_one_cloud() is an argparse.Namespace
object that contains all of the options processed to this point in
initialize_app().
"""
cloud = mock.Mock(name="cloudy")
cloud.config = {}
self.occ_get_one = mock.Mock(return_value=cloud)
with mock.patch(
"os_client_config.config.OpenStackConfig.get_one_cloud",
self.occ_get_one,
):
_shell, _cmd = make_shell(), cmd_options + " module list"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["module", "list"])
opts = self.occ_get_one.call_args[1]['argparse']
for k in default_args.keys():
self.assertEqual(
default_args[k],
vars(opts)[k],
"%s does not match" % k,
)
def _test_options_init_app(self, test_opts):
"""Test options on the command line"""
for opt in test_opts.keys():
if not test_opts[opt][1]:
continue
key = opt2attr(opt)
if isinstance(test_opts[opt][0], str):
cmd = opt + " " + test_opts[opt][0]
else:
cmd = opt
kwargs = {
key: test_opts[opt][0],
}
self._assert_initialize_app_arg(cmd, kwargs)
def _test_env_init_app(self, test_opts):
"""Test options in the environment"""
for opt in test_opts.keys():
if not test_opts[opt][2]:
continue
key = opt2attr(opt)
kwargs = {
key: test_opts[opt][0],
}
env = {
opt2env(opt): test_opts[opt][0],
}
os.environ = env.copy()
self._assert_initialize_app_arg("", kwargs)
def _test_options_get_one_cloud(self, test_opts):
"""Test options sent "to os_client_config"""
for opt in test_opts.keys():
if not test_opts[opt][1]:
continue
key = opt2attr(opt)
if isinstance(test_opts[opt][0], str):
cmd = opt + " " + test_opts[opt][0]
else:
cmd = opt
kwargs = {
key: test_opts[opt][0],
}
self._assert_cloud_config_arg(cmd, kwargs)
def _test_env_get_one_cloud(self, test_opts):
"""Test environment options sent "to os_client_config"""
for opt in test_opts.keys():
if not test_opts[opt][2]:
continue
key = opt2attr(opt)
kwargs = {
key: test_opts[opt][0],
}
env = {
opt2env(opt): test_opts[opt][0],
}
os.environ = env.copy()
self._assert_cloud_config_arg("", kwargs)
def test_empty_auth(self):
os.environ = {}
self._assert_initialize_app_arg("", {})
self._assert_cloud_config_arg("", {})
def test_no_options(self):
os.environ = {}
@ -336,7 +168,7 @@ class TestShellOptions(TestShell):
self._test_env_get_one_cloud(global_options)
class TestShellCli(TestShell):
class TestShellCli(utils.TestShell):
"""Test handling of specific global options
_shell.options is the parsed command line from argparse
@ -347,14 +179,23 @@ class TestShellCli(TestShell):
def setUp(self):
super(TestShellCli, self).setUp()
env = {}
self.useFixture(EnvFixture(env.copy()))
self.useFixture(utils.EnvFixture(env.copy()))
def test_shell_args_no_options(self):
_shell = utils.make_shell()
with mock.patch(
"osc_lib.shell.OpenStackShell.initialize_app",
self.app,
):
utils.fake_execute(_shell, "list user")
self.app.assert_called_with(["list", "user"])
def test_shell_args_tls_options(self):
"""Test the TLS verify and CA cert file options"""
_shell = make_shell()
_shell = utils.make_shell()
# Default
fake_execute(_shell, "module list")
utils.fake_execute(_shell, "module list")
self.assertIsNone(_shell.options.verify)
self.assertIsNone(_shell.options.insecure)
self.assertIsNone(_shell.options.cacert)
@ -362,7 +203,7 @@ class TestShellCli(TestShell):
self.assertIsNone(_shell.client_manager.cacert)
# --verify
fake_execute(_shell, "--verify module list")
utils.fake_execute(_shell, "--verify module list")
self.assertTrue(_shell.options.verify)
self.assertIsNone(_shell.options.insecure)
self.assertIsNone(_shell.options.cacert)
@ -370,7 +211,7 @@ class TestShellCli(TestShell):
self.assertIsNone(_shell.client_manager.cacert)
# --insecure
fake_execute(_shell, "--insecure module list")
utils.fake_execute(_shell, "--insecure module list")
self.assertIsNone(_shell.options.verify)
self.assertTrue(_shell.options.insecure)
self.assertIsNone(_shell.options.cacert)
@ -378,7 +219,7 @@ class TestShellCli(TestShell):
self.assertIsNone(_shell.client_manager.cacert)
# --os-cacert
fake_execute(_shell, "--os-cacert foo module list")
utils.fake_execute(_shell, "--os-cacert foo module list")
self.assertIsNone(_shell.options.verify)
self.assertIsNone(_shell.options.insecure)
self.assertEqual('foo', _shell.options.cacert)
@ -386,7 +227,7 @@ class TestShellCli(TestShell):
self.assertEqual('foo', _shell.client_manager.cacert)
# --os-cacert and --verify
fake_execute(_shell, "--os-cacert foo --verify module list")
utils.fake_execute(_shell, "--os-cacert foo --verify module list")
self.assertTrue(_shell.options.verify)
self.assertIsNone(_shell.options.insecure)
self.assertEqual('foo', _shell.options.cacert)
@ -398,7 +239,7 @@ class TestShellCli(TestShell):
# in this combination --insecure now overrides any
# --os-cacert setting, where before --insecure
# was ignored if --os-cacert was set.
fake_execute(_shell, "--os-cacert foo --insecure module list")
utils.fake_execute(_shell, "--os-cacert foo --insecure module list")
self.assertIsNone(_shell.options.verify)
self.assertTrue(_shell.options.insecure)
self.assertEqual('foo', _shell.options.cacert)
@ -407,28 +248,31 @@ class TestShellCli(TestShell):
def test_shell_args_cert_options(self):
"""Test client cert options"""
_shell = make_shell()
_shell = utils.make_shell()
# Default
fake_execute(_shell, "module list")
utils.fake_execute(_shell, "module list")
self.assertEqual('', _shell.options.cert)
self.assertEqual('', _shell.options.key)
self.assertIsNone(_shell.client_manager.cert)
# --os-cert
fake_execute(_shell, "--os-cert mycert module list")
utils.fake_execute(_shell, "--os-cert mycert module list")
self.assertEqual('mycert', _shell.options.cert)
self.assertEqual('', _shell.options.key)
self.assertEqual('mycert', _shell.client_manager.cert)
# --os-key
fake_execute(_shell, "--os-key mickey module list")
utils.fake_execute(_shell, "--os-key mickey module list")
self.assertEqual('', _shell.options.cert)
self.assertEqual('mickey', _shell.options.key)
self.assertIsNone(_shell.client_manager.cert)
# --os-cert and --os-key
fake_execute(_shell, "--os-cert mycert --os-key mickey module list")
utils.fake_execute(
_shell,
"--os-cert mycert --os-key mickey module list"
)
self.assertEqual('mycert', _shell.options.cert)
self.assertEqual('mickey', _shell.options.key)
self.assertEqual(('mycert', 'mickey'), _shell.client_manager.cert)
@ -437,9 +281,9 @@ class TestShellCli(TestShell):
def test_shell_args_cloud_no_vendor(self, config_mock):
"""Test cloud config options without the vendor file"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1))
_shell = make_shell()
_shell = utils.make_shell()
fake_execute(
utils.fake_execute(
_shell,
"--os-cloud scc module list",
)
@ -488,9 +332,9 @@ class TestShellCli(TestShell):
"""Test cloud config options with the vendor file"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
public_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell()
_shell = utils.make_shell()
fake_execute(
utils.fake_execute(
_shell,
"--os-cloud megacloud module list",
)
@ -536,10 +380,10 @@ class TestShellCli(TestShell):
def test_shell_args_precedence(self, config_mock, vendor_mock):
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell()
_shell = utils.make_shell()
# Test command option overriding config file value
fake_execute(
utils.fake_execute(
_shell,
"--os-cloud megacloud --os-region-name krikkit module list",
)
@ -577,7 +421,7 @@ class TestShellCli(TestShell):
)
class TestShellCliPrecedence(TestShell):
class TestShellCliPrecedence(utils.TestShell):
"""Test option precedencr order"""
def setUp(self):
@ -586,7 +430,7 @@ class TestShellCliPrecedence(TestShell):
'OS_CLOUD': 'megacloud',
'OS_REGION_NAME': 'occ-env',
}
self.useFixture(EnvFixture(env.copy()))
self.useFixture(utils.EnvFixture(env.copy()))
@mock.patch("os_client_config.config.OpenStackConfig._load_vendor_file")
@mock.patch("os_client_config.config.OpenStackConfig._load_config_file")
@ -594,10 +438,10 @@ class TestShellCliPrecedence(TestShell):
"""Test environment overriding occ"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell()
_shell = utils.make_shell()
# Test env var
fake_execute(
utils.fake_execute(
_shell,
"module list",
)
@ -642,10 +486,10 @@ class TestShellCliPrecedence(TestShell):
"""Test command line overriding environment and occ"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell()
_shell = utils.make_shell()
# Test command option overriding config file value
fake_execute(
utils.fake_execute(
_shell,
"--os-region-name krikkit list user",
)
@ -690,10 +534,10 @@ class TestShellCliPrecedence(TestShell):
"""Test command line overriding environment and occ"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1))
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell()
_shell = utils.make_shell()
# Test command option overriding config file value
fake_execute(
utils.fake_execute(
_shell,
"--os-cloud scc --os-region-name krikkit list user",
)

View File

@ -14,14 +14,69 @@
# under the License.
#
import copy
import json as jsonutils
import mock
import os
import fixtures
from keystoneauth1 import loading
from os_client_config import cloud_config
from requests_mock.contrib import fixture
import testtools
from osc_lib import clientmanager
from osc_lib import shell
from osc_lib.tests import fakes
def fake_execute(shell, cmd):
"""Pretend to execute shell commands."""
return shell.run(cmd.split())
def make_shell(shell_class=None):
"""Create a new command shell and mock out some bits."""
if shell_class is None:
shell_class = shell.OpenStackShell
_shell = shell_class()
_shell.command_manager = mock.Mock()
# _shell.cloud = mock.Mock()
return _shell
def opt2attr(opt):
if opt.startswith('--os-'):
attr = opt[5:]
elif opt.startswith('--'):
attr = opt[2:]
else:
attr = opt
return attr.lower().replace('-', '_')
def opt2env(opt):
return opt[2:].upper().replace('-', '_')
class EnvFixture(fixtures.Fixture):
"""Environment Fixture.
This fixture replaces os.environ with provided env or an empty env.
"""
def __init__(self, env=None):
self.new_env = env or {}
def _setUp(self):
self.orig_env, os.environ = os.environ, self.new_env
self.addCleanup(self.revert)
def revert(self):
os.environ = self.orig_env
class ParserException(Exception):
pass
@ -73,3 +128,263 @@ class TestCommand(TestCase):
self.assertIn(attr, parsed_args)
self.assertEqual(value, getattr(parsed_args, attr))
return parsed_args
class TestClientManager(TestCase):
"""ClientManager class test framework"""
default_password_auth = {
'auth_url': fakes.AUTH_URL,
'username': fakes.USERNAME,
'password': fakes.PASSWORD,
'project_name': fakes.PROJECT_NAME,
}
default_token_auth = {
'auth_url': fakes.AUTH_URL,
'token': fakes.AUTH_TOKEN,
}
def setUp(self):
super(TestClientManager, self).setUp()
self.mock = mock.Mock()
self.requests = self.useFixture(fixture.Fixture())
# fake v2password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT)
# fake token and token_endpoint retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT,
url='/'.join([fakes.AUTH_URL, 'v2.0/tokens']))
# fake v3password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens']))
# fake password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'auth/tokens']))
# fake password version endpoint discovery
self.stub_auth(json=fakes.TEST_VERSIONS,
url=fakes.AUTH_URL,
verb='GET')
# Mock the auth plugin
self.auth_mock = mock.Mock()
def stub_auth(self, json=None, url=None, verb=None, **kwargs):
subject_token = fakes.AUTH_TOKEN
base_url = fakes.AUTH_URL
if json:
text = jsonutils.dumps(json)
headers = {
'X-Subject-Token': subject_token,
'Content-Type': 'application/json',
}
if not url:
url = '/'.join([base_url, 'tokens'])
url = url.replace("/?", "?")
if not verb:
verb = 'POST'
self.requests.register_uri(
verb,
url,
headers=headers,
text=text,
)
def _clientmanager_class(self):
"""Allow subclasses to override the ClientManager class"""
return clientmanager.ClientManager
def _make_clientmanager(
self,
auth_args=None,
config_args=None,
identity_api_version=None,
auth_plugin_name=None,
):
if identity_api_version is None:
identity_api_version = '2.0'
if auth_plugin_name is None:
auth_plugin_name = 'password'
if auth_plugin_name.endswith('password'):
auth_dict = copy.deepcopy(self.default_password_auth)
elif auth_plugin_name.endswith('token'):
auth_dict = copy.deepcopy(self.default_token_auth)
else:
auth_dict = {}
if auth_args is not None:
auth_dict = auth_args
cli_options = {
'auth_type': auth_plugin_name,
'auth': auth_dict,
'interface': fakes.INTERFACE,
'region_name': fakes.REGION_NAME,
}
if config_args is not None:
cli_options.update(config_args)
loader = loading.get_plugin_loader(auth_plugin_name)
auth_plugin = loader.load_from_options(**auth_dict)
client_manager = self._clientmanager_class()(
cli_options=cloud_config.CloudConfig(
name='t1',
region='1',
config=cli_options,
auth_plugin=auth_plugin,
),
api_version={
'identity': identity_api_version,
},
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertEqual(
auth_plugin_name,
client_manager.auth_plugin_name,
)
return client_manager
class TestShell(TestCase):
# cliff.app.App subclass
app_patch = "osc_lib.shell.OpenStackShell"
def setUp(self):
super(TestShell, self).setUp()
self.cmd_patch = mock.patch(self.app_patch + ".run_subcommand")
self.cmd_save = self.cmd_patch.start()
self.addCleanup(self.cmd_patch.stop)
self.app = mock.Mock("Test Shell")
def _assert_initialize_app_arg(self, cmd_options, default_args):
"""Check the args passed to initialize_app()
The argv argument to initialize_app() is the remainder from parsing
global options declared in both cliff.app and
osc_lib.OpenStackShell build_option_parser(). Any global
options passed on the command line should not be in argv but in
_shell.options.
"""
with mock.patch(
self.app_patch + ".initialize_app",
self.app,
):
_shell, _cmd = make_shell(), cmd_options + " module list"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["module", "list"])
for k in default_args.keys():
self.assertEqual(
default_args[k],
vars(_shell.options)[k],
"%s does not match" % k,
)
def _assert_cloud_config_arg(self, cmd_options, default_args):
"""Check the args passed to cloud_config.get_one_cloud()
The argparse argument to get_one_cloud() is an argparse.Namespace
object that contains all of the options processed to this point in
initialize_app().
"""
cloud = mock.Mock(name="cloudy")
cloud.config = {}
self.occ_get_one = mock.Mock(return_value=cloud)
with mock.patch(
"os_client_config.config.OpenStackConfig.get_one_cloud",
self.occ_get_one,
):
_shell, _cmd = make_shell(), cmd_options + " module list"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["module", "list"])
opts = self.occ_get_one.call_args[1]['argparse']
for k in default_args.keys():
self.assertEqual(
default_args[k],
vars(opts)[k],
"%s does not match" % k,
)
def _assert_token_auth(self, cmd_options, default_args):
with mock.patch(self.app_patch + ".initialize_app",
self.app):
_shell, _cmd = make_shell(), cmd_options + " list role"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "role"])
self.assertEqual(
default_args.get("token", ''),
_shell.options.token,
"token"
)
self.assertEqual(
default_args.get("auth_url", ''),
_shell.options.auth_url,
"auth_url"
)
def _test_options_init_app(self, test_opts):
"""Test options on the command line"""
for opt in test_opts.keys():
if not test_opts[opt][1]:
continue
key = opt2attr(opt)
if isinstance(test_opts[opt][0], str):
cmd = opt + " " + test_opts[opt][0]
else:
cmd = opt
kwargs = {
key: test_opts[opt][0],
}
self._assert_initialize_app_arg(cmd, kwargs)
def _test_env_init_app(self, test_opts):
"""Test options in the environment"""
for opt in test_opts.keys():
if not test_opts[opt][2]:
continue
key = opt2attr(opt)
kwargs = {
key: test_opts[opt][0],
}
env = {
opt2env(opt): test_opts[opt][0],
}
os.environ = env.copy()
self._assert_initialize_app_arg("", kwargs)
def _test_options_get_one_cloud(self, test_opts):
"""Test options sent "to os_client_config"""
for opt in test_opts.keys():
if not test_opts[opt][1]:
continue
key = opt2attr(opt)
if isinstance(test_opts[opt][0], str):
cmd = opt + " " + test_opts[opt][0]
else:
cmd = opt
kwargs = {
key: test_opts[opt][0],
}
self._assert_cloud_config_arg(cmd, kwargs)
def _test_env_get_one_cloud(self, test_opts):
"""Test environment options sent "to os_client_config"""
for opt in test_opts.keys():
if not test_opts[opt][2]:
continue
key = opt2attr(opt)
kwargs = {
key: test_opts[opt][0],
}
env = {
opt2env(opt): test_opts[opt][0],
}
os.environ = env.copy()
self._assert_cloud_config_arg("", kwargs)