diff --git a/osc_lib/api/auth.py b/osc_lib/api/auth.py index 283e633..4649b78 100644 --- a/osc_lib/api/auth.py +++ b/osc_lib/api/auth.py @@ -67,74 +67,6 @@ def get_options_list(): return OPTIONS_LIST -def select_auth_plugin(options): - """Pick an auth plugin based on --os-auth-type or other options""" - - auth_plugin_name = None - - # Do the token/url check first as this must override the default - # 'password' set by os-client-config - # Also, url and token are not copied into o-c-c's auth dict (yet?) - if options.auth.get('url') and options.auth.get('token'): - # service token authentication - auth_plugin_name = 'token_endpoint' - elif options.auth_type in PLUGIN_LIST: - # A direct plugin name was given, use it - auth_plugin_name = options.auth_type - elif options.auth.get('username'): - if options.identity_api_version == '3': - auth_plugin_name = 'v3password' - elif options.identity_api_version.startswith('2'): - auth_plugin_name = 'v2password' - else: - # let keystoneauth figure it out itself - auth_plugin_name = 'password' - elif options.auth.get('token'): - if options.identity_api_version == '3': - auth_plugin_name = 'v3token' - elif options.identity_api_version.startswith('2'): - auth_plugin_name = 'v2token' - else: - # let keystoneauth figure it out itself - auth_plugin_name = 'token' - else: - # The ultimate default is similar to the original behaviour, - # but this time with version discovery - auth_plugin_name = 'password' - LOG.debug("Auth plugin %s selected", auth_plugin_name) - return auth_plugin_name - - -def build_auth_params(auth_plugin_name, cmd_options): - - if auth_plugin_name: - LOG.debug('auth_type: %s', auth_plugin_name) - auth_plugin_loader = base.get_plugin_loader(auth_plugin_name) - auth_params = { - opt.dest: opt.default - for opt in base.get_plugin_options(auth_plugin_name) - } - auth_params.update(dict(cmd_options.auth)) - # grab tenant from project for v2.0 API compatibility - if auth_plugin_name.startswith("v2"): - if 'project_id' in auth_params: - auth_params['tenant_id'] = auth_params['project_id'] - del auth_params['project_id'] - if 'project_name' in auth_params: - auth_params['tenant_name'] = auth_params['project_name'] - del auth_params['project_name'] - else: - LOG.debug('no auth_type') - # delay the plugin choice, grab every option - auth_plugin_loader = None - auth_params = dict(cmd_options.auth) - plugin_options = set([o.replace('-', '_') for o in get_options_list()]) - for option in plugin_options: - LOG.debug('fetching option %s', option) - auth_params[option] = getattr(cmd_options.auth, option, None) - return (auth_plugin_loader, auth_params) - - def check_valid_authorization_options(options, auth_plugin_name): """Validate authorization options, and provide helpful error messages.""" if (options.auth.get('project_id') and not diff --git a/osc_lib/clientmanager.py b/osc_lib/clientmanager.py index fa22237..d256fe9 100644 --- a/osc_lib/clientmanager.py +++ b/osc_lib/clientmanager.py @@ -87,7 +87,6 @@ class ClientManager(object): self._cli_options = cli_options self._api_version = api_version self._pw_callback = pw_func - self._url = self._cli_options.auth.get('url') self.region_name = self._cli_options.region_name self.interface = self._cli_options.interface @@ -197,9 +196,8 @@ class ClientManager(object): if self._auth_setup_completed: return - # If no auth type is named by the user, select one based on - # the supplied options - self.auth_plugin_name = auth.select_auth_plugin(self._cli_options) + # Stash the selected auth type + self.auth_plugin_name = self._cli_options.config['auth_type'] # Basic option checking to avoid unhelpful error messages auth.check_valid_authentication_options( @@ -213,23 +211,16 @@ class ClientManager(object): not self._cli_options.auth.get('password')): self._cli_options.auth['password'] = self._pw_callback() - (auth_plugin, self._auth_params) = auth.build_auth_params( - self.auth_plugin_name, - self._cli_options, - ) - - self._set_default_scope_options() - # For compatibility until all clients can be updated - if 'project_name' in self._auth_params: - self._project_name = self._auth_params['project_name'] - elif 'tenant_name' in self._auth_params: - self._project_name = self._auth_params['tenant_name'] + if 'project_name' in self._cli_options.auth: + self._project_name = self._cli_options.auth['project_name'] + elif 'tenant_name' in self._cli_options.auth: + self._project_name = self._cli_options.auth['tenant_name'] LOG.info('Using auth plugin: %s', self.auth_plugin_name) LOG.debug('Using parameters %s', - strutils.mask_password(self._auth_params)) - self.auth = auth_plugin.load_from_options(**self._auth_params) + strutils.mask_password(self._cli_options.auth)) + self.auth = self._cli_options.get_auth() # needed by SAML authentication request_session = requests.session() self.session = osc_session.TimingSession( diff --git a/osc_lib/shell.py b/osc_lib/shell.py index e2ac250..6d8639a 100644 --- a/osc_lib/shell.py +++ b/osc_lib/shell.py @@ -29,6 +29,7 @@ from cliff import help from oslo_utils import importutils from oslo_utils import strutils +from osc_lib.cli import client_config as cloud_config from osc_lib import clientmanager from osc_lib.command import commandmanager from osc_lib.command import timing @@ -37,8 +38,6 @@ from osc_lib.i18n import _ from osc_lib import logs from osc_lib import utils -from os_client_config import config as cloud_config - osprofiler_profiler = importutils.try_import("osprofiler.profiler") @@ -102,7 +101,7 @@ class OpenStackShell(app.App): if not command_manager: cm = commandmanager.CommandManager('openstack.cli') else: - command_manager + cm = command_manager super(OpenStackShell, self).__init__( description=__doc__.strip(), @@ -374,7 +373,7 @@ class OpenStackShell(app.App): # Ignore the default value of interface. Only if it is set later # will it be used. try: - cc = cloud_config.OpenStackConfig( + cc = cloud_config.OSC_Config( override_defaults={ 'interface': None, 'auth_type': self._auth_type, diff --git a/osc_lib/tests/fakes.py b/osc_lib/tests/fakes.py index 780611c..e500e1d 100644 --- a/osc_lib/tests/fakes.py +++ b/osc_lib/tests/fakes.py @@ -134,7 +134,6 @@ class FakeClientManager(object): self.session = None self.auth_ref = None self.auth_plugin_name = None - self.network_endpoint_enabled = True def get_configuration(self): return { @@ -147,9 +146,6 @@ class FakeClientManager(object): 'identity_api_version': VERSION, } - def is_network_endpoint_enabled(self): - return self.network_endpoint_enabled - class FakeModule(object): diff --git a/osc_lib/tests/test_clientmanager.py b/osc_lib/tests/test_clientmanager.py index 1568322..3ddc155 100644 --- a/osc_lib/tests/test_clientmanager.py +++ b/osc_lib/tests/test_clientmanager.py @@ -13,12 +13,14 @@ # under the License. # -import json as jsonutils +import copy import mock from keystoneauth1.access import service_catalog -from keystoneauth1.identity import v2 as auth_v2 -from requests_mock.contrib import fixture +from keystoneauth1 import exceptions as ksa_exceptions +from keystoneauth1.identity import generic as generic_plugin +from keystoneauth1 import loading +from os_client_config import cloud_config from osc_lib.api import auth from osc_lib import clientmanager @@ -26,8 +28,6 @@ from osc_lib import exceptions as exc from osc_lib.tests import fakes from osc_lib.tests import utils -API_VERSION = {"identity": "2.0"} - AUTH_REF = {'version': 'v2.0'} AUTH_REF.update(fakes.TEST_RESPONSE_DICT['access']) SERVICE_CATALOG = service_catalog.ServiceCatalogV2(AUTH_REF) @@ -45,27 +45,6 @@ class Container(object): pass -class FakeOptions(object): - - def __init__(self, **kwargs): - for option in auth.OPTIONS_LIST: - setattr(self, option.replace('-', '_'), None) - self.auth_type = None - self.verify = True - self.cacert = None - self.insecure = None - self.identity_api_version = '2.0' - self.timing = None - self.region_name = None - self.interface = None - self.url = None - self.auth = {} - self.cert = None - self.key = None - self.default_domain = 'default' - self.__dict__.update(kwargs) - - class TestClientCache(utils.TestCase): def test_singleton(self): @@ -82,61 +61,29 @@ class TestClientCache(utils.TestCase): self.assertEqual("'Container' object has no attribute 'foo'", str(err)) -class TestClientManager(utils.TestCase): - - def setUp(self): - super(TestClientManager, self).setUp() - self.mock = mock.Mock() - self.requests = self.useFixture(fixture.Fixture()) - # fake v2password token retrieval - self.stub_auth(json=fakes.TEST_RESPONSE_DICT) - # fake token and token_endpoint retrieval - self.stub_auth(json=fakes.TEST_RESPONSE_DICT, - url='/'.join([fakes.AUTH_URL, 'v2.0/tokens'])) - # fake v3password token retrieval - self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3, - url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens'])) - # fake password token retrieval - self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3, - url='/'.join([fakes.AUTH_URL, 'auth/tokens'])) - # fake password version endpoint discovery - self.stub_auth(json=fakes.TEST_VERSIONS, - url=fakes.AUTH_URL, - verb='GET') +class TestClientManager(utils.TestClientManager): def test_client_manager_password(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - ), - api_version=API_VERSION, - ) - client_manager.setup_auth() - client_manager.auth_ref + client_manager = self._make_clientmanager() self.assertEqual( fakes.AUTH_URL, - client_manager._auth_url, + client_manager._cli_options.config['auth']['auth_url'], ) self.assertEqual( fakes.USERNAME, - client_manager._username, + client_manager._cli_options.config['auth']['username'], ) self.assertEqual( fakes.PASSWORD, - client_manager._password, + client_manager._cli_options.config['auth']['password'], ) self.assertIsInstance( client_manager.auth, - auth_v2.Password, + generic_plugin.Password, ) self.assertTrue(client_manager.verify) + self.assertIsNone(client_manager.cert) # These need to stick around until the old-style clients are gone self.assertEqual( @@ -153,84 +100,20 @@ class TestClientManager(utils.TestCase): ) self.assertTrue(client_manager.is_service_available('network')) - def test_client_manager_endpoint_disabled(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - user_domain_name='default', - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - project_domain_name='default', - ), - auth_type='v3password', - ), - api_version={"identity": "3"}, - ) - client_manager.setup_auth() - client_manager.auth_ref - - # v3 fake doesn't have network endpoint. - self.assertFalse(client_manager.is_service_available('network')) - - def stub_auth(self, json=None, url=None, verb=None, **kwargs): - subject_token = fakes.AUTH_TOKEN - base_url = fakes.AUTH_URL - if json: - text = jsonutils.dumps(json) - headers = {'X-Subject-Token': subject_token, - 'Content-Type': 'application/json'} - if not url: - url = '/'.join([base_url, 'tokens']) - url = url.replace("/?", "?") - if not verb: - verb = 'POST' - self.requests.register_uri(verb, - url, - headers=headers, - text=text) - def test_client_manager_password_verify(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - auth_type='v2password', - verify=True, - ), - api_version=API_VERSION, - ) - client_manager.setup_auth() - client_manager.auth_ref + client_manager = self._make_clientmanager() self.assertTrue(client_manager.verify) self.assertEqual(None, client_manager.cacert) self.assertTrue(client_manager.is_service_available('network')) def test_client_manager_password_verify_ca(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - auth_type='v2password', - cacert='cafile', - ), - api_version=API_VERSION, + config_args = { + 'cacert': 'cafile', + } + client_manager = self._make_clientmanager( + config_args=config_args, ) - client_manager.setup_auth() - client_manager.auth_ref # Test that client_manager.verify is Requests-compatible, # i.e. it contains the value of cafile here @@ -240,146 +123,151 @@ class TestClientManager(utils.TestCase): self.assertTrue(client_manager.is_service_available('network')) def test_client_manager_password_verify_insecure(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - auth_type='v2password', - insecure=True, - ), - api_version=API_VERSION, + config_args = { + 'insecure': True, + } + client_manager = self._make_clientmanager( + config_args=config_args, ) - client_manager.setup_auth() - client_manager.auth_ref self.assertFalse(client_manager.verify) self.assertEqual(None, client_manager.cacert) self.assertTrue(client_manager.is_service_available('network')) def test_client_manager_password_verify_insecure_ca(self): - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), - auth_type='v2password', - insecure=True, - cacert='cafile', - ), - api_version=API_VERSION, + config_args = { + 'insecure': True, + 'cacert': 'cafile', + } + client_manager = self._make_clientmanager( + config_args=config_args, ) - client_manager.setup_auth() - client_manager.auth_ref # insecure overrides cacert self.assertFalse(client_manager.verify) self.assertEqual(None, client_manager.cacert) self.assertTrue(client_manager.is_service_available('network')) - def test_client_manager_password_no_cert(self): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions()) - self.assertIsNone(client_manager.cert) - def test_client_manager_password_client_cert(self): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions(cert='cert')) + config_args = { + 'cert': 'cert', + } + client_manager = self._make_clientmanager( + config_args=config_args, + ) + self.assertEqual('cert', client_manager.cert) - def test_client_manager_password_client_cert_and_key(self): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions(cert='cert', key='key')) + def test_client_manager_password_client_key(self): + config_args = { + 'cert': 'cert', + 'key': 'key', + } + client_manager = self._make_clientmanager( + config_args=config_args, + ) + self.assertEqual(('cert', 'key'), client_manager.cert) - def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name): - auth_params['auth_type'] = auth_plugin_name - auth_params['identity_api_version'] = api_version - - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions(**auth_params), - api_version={"identity": api_version}, - ) - client_manager.setup_auth() - client_manager.auth_ref - - self.assertEqual( - auth_plugin_name, - client_manager.auth_plugin_name, - ) - - def test_client_manager_select_auth_plugin(self): - # test token auth - params = dict( - auth=dict( - auth_url=fakes.AUTH_URL, - token=fakes.AUTH_TOKEN, - ), - ) - self._select_auth_plugin(params, '2.0', 'v2token') - self._select_auth_plugin(params, '3', 'v3token') - self._select_auth_plugin(params, 'XXX', 'token') - # test token/endpoint auth - params = dict( - auth_plugin='token_endpoint', - auth=dict( - url='test', - token=fakes.AUTH_TOKEN, - ), - ) + def test_client_manager_select_auth_plugin_password(self): # test password auth - params = dict( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - ), + auth_args = { + 'auth_url': fakes.AUTH_URL, + 'username': fakes.USERNAME, + 'password': fakes.PASSWORD, + 'tenant_name': fakes.PROJECT_NAME, + } + self._make_clientmanager( + auth_args=auth_args, + identity_api_version='2.0', + auth_plugin_name='v2password', ) - self._select_auth_plugin(params, '2.0', 'v2password') - params = dict( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - user_domain_name='default', - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, - project_domain_name='default', - ), + + auth_args = copy.deepcopy(self.default_password_auth) + auth_args.update({ + 'user_domain_name': 'default', + 'project_domain_name': 'default', + }) + self._make_clientmanager( + auth_args=auth_args, + identity_api_version='3', + auth_plugin_name='v3password', + ) + + # Use v2.0 auth args + auth_args = { + 'auth_url': fakes.AUTH_URL, + 'username': fakes.USERNAME, + 'password': fakes.PASSWORD, + 'tenant_name': fakes.PROJECT_NAME, + } + self._make_clientmanager( + auth_args=auth_args, + identity_api_version='2.0', + ) + + # Use v3 auth args + auth_args = copy.deepcopy(self.default_password_auth) + auth_args.update({ + 'user_domain_name': 'default', + 'project_domain_name': 'default', + }) + self._make_clientmanager( + auth_args=auth_args, + identity_api_version='3', + ) + + def test_client_manager_select_auth_plugin_token(self): + # test token auth + self._make_clientmanager( + # auth_args=auth_args, + identity_api_version='2.0', + auth_plugin_name='v2token', + ) + self._make_clientmanager( + # auth_args=auth_args, + identity_api_version='3', + auth_plugin_name='v3token', + ) + self._make_clientmanager( + # auth_args=auth_args, + identity_api_version='x', + auth_plugin_name='token', ) - self._select_auth_plugin(params, '3', 'v3password') - self._select_auth_plugin(params, 'XXX', 'password') def test_client_manager_select_auth_plugin_failure(self): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions(os_auth_plugin=''), - api_version=API_VERSION, - ) self.assertRaises( - exc.CommandError, - client_manager.setup_auth, + ksa_exceptions.NoMatchingPlugin, + self._make_clientmanager, + identity_api_version='3', + auth_plugin_name='bad_plugin', ) @mock.patch('osc_lib.api.auth.check_valid_authentication_options') def test_client_manager_auth_setup_once(self, check_authn_options_func): - client_manager = clientmanager.ClientManager( - cli_options=FakeOptions( - auth=dict( - auth_url=fakes.AUTH_URL, - username=fakes.USERNAME, - password=fakes.PASSWORD, - project_name=fakes.PROJECT_NAME, + auth_dict = { + 'auth_url': fakes.AUTH_URL, + 'username': fakes.USERNAME, + 'password': fakes.PASSWORD, + 'project_name': fakes.PROJECT_NAME, + } + loader = loading.get_plugin_loader('password') + auth_plugin = loader.load_from_options(**auth_dict) + client_manager = self._clientmanager_class()( + cli_options=cloud_config.CloudConfig( + name='t1', + region='1', + config=dict( + auth_type='password', + auth=auth_dict, + interface=fakes.INTERFACE, + region_name=fakes.REGION_NAME, ), + auth_plugin=auth_plugin, ), - api_version=API_VERSION, + api_version={ + 'identity': '2.0', + }, ) self.assertFalse(client_manager._auth_setup_completed) client_manager.setup_auth() @@ -391,3 +279,18 @@ class TestClientManager(utils.TestCase): check_authn_options_func.reset_mock() client_manager.auth_ref check_authn_options_func.assert_not_called() + + def test_client_manager_endpoint_disabled(self): + auth_args = copy.deepcopy(self.default_password_auth) + auth_args.update({ + 'user_domain_name': 'default', + 'project_domain_name': 'default', + }) + # v3 fake doesn't have network endpoint + client_manager = self._make_clientmanager( + auth_args=auth_args, + identity_api_version='3', + auth_plugin_name='v3password', + ) + + self.assertFalse(client_manager.is_service_available('network')) diff --git a/osc_lib/tests/test_shell.py b/osc_lib/tests/test_shell.py index 34ae230..e4e1e00 100644 --- a/osc_lib/tests/test_shell.py +++ b/osc_lib/tests/test_shell.py @@ -14,12 +14,10 @@ # import copy -import fixtures import mock import os import testtools -from osc_lib import shell from osc_lib.tests import utils @@ -117,68 +115,12 @@ global_options = { } -def opt2attr(opt): - if opt.startswith('--os-'): - attr = opt[5:] - elif opt.startswith('--'): - attr = opt[2:] - else: - attr = opt - return attr.lower().replace('-', '_') - - -def opt2env(opt): - return opt[2:].upper().replace('-', '_') - - -def make_shell(): - """Create a new command shell and mock out some bits.""" - _shell = shell.OpenStackShell() - _shell.command_manager = mock.Mock() - _shell.cloud = mock.Mock() - - return _shell - - -def fake_execute(shell, cmd): - """Pretend to execute shell commands.""" - return shell.run(cmd.split()) - - -class EnvFixture(fixtures.Fixture): - """Environment Fixture. - - This fixture replaces os.environ with provided env or an empty env. - """ - - def __init__(self, env=None): - self.new_env = env or {} - - def _setUp(self): - self.orig_env, os.environ = os.environ, self.new_env - self.addCleanup(self.revert) - - def revert(self): - os.environ = self.orig_env - - -class TestShell(utils.TestCase): - - def setUp(self): - super(TestShell, self).setUp() - patch = "osc_lib.shell.OpenStackShell.run_subcommand" - self.cmd_patch = mock.patch(patch) - self.cmd_save = self.cmd_patch.start() - self.addCleanup(self.cmd_patch.stop) - self.app = mock.Mock("Test Shell") - - -class TestShellHelp(TestShell): +class TestShellHelp(utils.TestShell): """Test the deferred help flag""" def setUp(self): super(TestShellHelp, self).setUp() - self.useFixture(EnvFixture()) + self.useFixture(utils.EnvFixture()) @testtools.skip("skip until bug 1444983 is resolved") def test_help_options(self): @@ -186,12 +128,9 @@ class TestShellHelp(TestShell): kwargs = { "deferred_help": True, } - with mock.patch( - "osc_lib.shell.OpenStackShell.initialize_app", - self.app, - ): - _shell, _cmd = make_shell(), flag - fake_execute(_shell, _cmd) + with mock.patch(self.app_patch + ".initialize_app", self.app): + _shell, _cmd = utils.make_shell(), flag + utils.fake_execute(_shell, _cmd) self.assertEqual( kwargs["deferred_help"], @@ -199,7 +138,7 @@ class TestShellHelp(TestShell): ) -class TestShellOptions(TestShell): +class TestShellOptions(utils.TestShell): """Test the option handling by argparse and os_client_config This covers getting the CLI options through the initial processing @@ -208,119 +147,12 @@ class TestShellOptions(TestShell): def setUp(self): super(TestShellOptions, self).setUp() - self.useFixture(EnvFixture()) + self.useFixture(utils.EnvFixture()) - def _assert_initialize_app_arg(self, cmd_options, default_args): - """Check the args passed to initialize_app() - - The argv argument to initialize_app() is the remainder from parsing - global options declared in both cliff.app and - osc_lib.OpenStackShell build_option_parser(). Any global - options passed on the command line should not be in argv but in - _shell.options. - """ - - with mock.patch( - "osc_lib.shell.OpenStackShell.initialize_app", - self.app, - ): - _shell, _cmd = make_shell(), cmd_options + " module list" - fake_execute(_shell, _cmd) - - self.app.assert_called_with(["module", "list"]) - for k in default_args.keys(): - self.assertEqual( - default_args[k], - vars(_shell.options)[k], - "%s does not match" % k, - ) - - def _assert_cloud_config_arg(self, cmd_options, default_args): - """Check the args passed to cloud_config.get_one_cloud() - - The argparse argument to get_one_cloud() is an argparse.Namespace - object that contains all of the options processed to this point in - initialize_app(). - """ - - cloud = mock.Mock(name="cloudy") - cloud.config = {} - self.occ_get_one = mock.Mock(return_value=cloud) - with mock.patch( - "os_client_config.config.OpenStackConfig.get_one_cloud", - self.occ_get_one, - ): - _shell, _cmd = make_shell(), cmd_options + " module list" - fake_execute(_shell, _cmd) - - self.app.assert_called_with(["module", "list"]) - opts = self.occ_get_one.call_args[1]['argparse'] - for k in default_args.keys(): - self.assertEqual( - default_args[k], - vars(opts)[k], - "%s does not match" % k, - ) - - def _test_options_init_app(self, test_opts): - """Test options on the command line""" - for opt in test_opts.keys(): - if not test_opts[opt][1]: - continue - key = opt2attr(opt) - if isinstance(test_opts[opt][0], str): - cmd = opt + " " + test_opts[opt][0] - else: - cmd = opt - kwargs = { - key: test_opts[opt][0], - } - self._assert_initialize_app_arg(cmd, kwargs) - - def _test_env_init_app(self, test_opts): - """Test options in the environment""" - for opt in test_opts.keys(): - if not test_opts[opt][2]: - continue - key = opt2attr(opt) - kwargs = { - key: test_opts[opt][0], - } - env = { - opt2env(opt): test_opts[opt][0], - } - os.environ = env.copy() - self._assert_initialize_app_arg("", kwargs) - - def _test_options_get_one_cloud(self, test_opts): - """Test options sent "to os_client_config""" - for opt in test_opts.keys(): - if not test_opts[opt][1]: - continue - key = opt2attr(opt) - if isinstance(test_opts[opt][0], str): - cmd = opt + " " + test_opts[opt][0] - else: - cmd = opt - kwargs = { - key: test_opts[opt][0], - } - self._assert_cloud_config_arg(cmd, kwargs) - - def _test_env_get_one_cloud(self, test_opts): - """Test environment options sent "to os_client_config""" - for opt in test_opts.keys(): - if not test_opts[opt][2]: - continue - key = opt2attr(opt) - kwargs = { - key: test_opts[opt][0], - } - env = { - opt2env(opt): test_opts[opt][0], - } - os.environ = env.copy() - self._assert_cloud_config_arg("", kwargs) + def test_empty_auth(self): + os.environ = {} + self._assert_initialize_app_arg("", {}) + self._assert_cloud_config_arg("", {}) def test_no_options(self): os.environ = {} @@ -336,7 +168,7 @@ class TestShellOptions(TestShell): self._test_env_get_one_cloud(global_options) -class TestShellCli(TestShell): +class TestShellCli(utils.TestShell): """Test handling of specific global options _shell.options is the parsed command line from argparse @@ -347,14 +179,23 @@ class TestShellCli(TestShell): def setUp(self): super(TestShellCli, self).setUp() env = {} - self.useFixture(EnvFixture(env.copy())) + self.useFixture(utils.EnvFixture(env.copy())) + + def test_shell_args_no_options(self): + _shell = utils.make_shell() + with mock.patch( + "osc_lib.shell.OpenStackShell.initialize_app", + self.app, + ): + utils.fake_execute(_shell, "list user") + self.app.assert_called_with(["list", "user"]) def test_shell_args_tls_options(self): """Test the TLS verify and CA cert file options""" - _shell = make_shell() + _shell = utils.make_shell() # Default - fake_execute(_shell, "module list") + utils.fake_execute(_shell, "module list") self.assertIsNone(_shell.options.verify) self.assertIsNone(_shell.options.insecure) self.assertIsNone(_shell.options.cacert) @@ -362,7 +203,7 @@ class TestShellCli(TestShell): self.assertIsNone(_shell.client_manager.cacert) # --verify - fake_execute(_shell, "--verify module list") + utils.fake_execute(_shell, "--verify module list") self.assertTrue(_shell.options.verify) self.assertIsNone(_shell.options.insecure) self.assertIsNone(_shell.options.cacert) @@ -370,7 +211,7 @@ class TestShellCli(TestShell): self.assertIsNone(_shell.client_manager.cacert) # --insecure - fake_execute(_shell, "--insecure module list") + utils.fake_execute(_shell, "--insecure module list") self.assertIsNone(_shell.options.verify) self.assertTrue(_shell.options.insecure) self.assertIsNone(_shell.options.cacert) @@ -378,7 +219,7 @@ class TestShellCli(TestShell): self.assertIsNone(_shell.client_manager.cacert) # --os-cacert - fake_execute(_shell, "--os-cacert foo module list") + utils.fake_execute(_shell, "--os-cacert foo module list") self.assertIsNone(_shell.options.verify) self.assertIsNone(_shell.options.insecure) self.assertEqual('foo', _shell.options.cacert) @@ -386,7 +227,7 @@ class TestShellCli(TestShell): self.assertEqual('foo', _shell.client_manager.cacert) # --os-cacert and --verify - fake_execute(_shell, "--os-cacert foo --verify module list") + utils.fake_execute(_shell, "--os-cacert foo --verify module list") self.assertTrue(_shell.options.verify) self.assertIsNone(_shell.options.insecure) self.assertEqual('foo', _shell.options.cacert) @@ -398,7 +239,7 @@ class TestShellCli(TestShell): # in this combination --insecure now overrides any # --os-cacert setting, where before --insecure # was ignored if --os-cacert was set. - fake_execute(_shell, "--os-cacert foo --insecure module list") + utils.fake_execute(_shell, "--os-cacert foo --insecure module list") self.assertIsNone(_shell.options.verify) self.assertTrue(_shell.options.insecure) self.assertEqual('foo', _shell.options.cacert) @@ -407,28 +248,31 @@ class TestShellCli(TestShell): def test_shell_args_cert_options(self): """Test client cert options""" - _shell = make_shell() + _shell = utils.make_shell() # Default - fake_execute(_shell, "module list") + utils.fake_execute(_shell, "module list") self.assertEqual('', _shell.options.cert) self.assertEqual('', _shell.options.key) self.assertIsNone(_shell.client_manager.cert) # --os-cert - fake_execute(_shell, "--os-cert mycert module list") + utils.fake_execute(_shell, "--os-cert mycert module list") self.assertEqual('mycert', _shell.options.cert) self.assertEqual('', _shell.options.key) self.assertEqual('mycert', _shell.client_manager.cert) # --os-key - fake_execute(_shell, "--os-key mickey module list") + utils.fake_execute(_shell, "--os-key mickey module list") self.assertEqual('', _shell.options.cert) self.assertEqual('mickey', _shell.options.key) self.assertIsNone(_shell.client_manager.cert) # --os-cert and --os-key - fake_execute(_shell, "--os-cert mycert --os-key mickey module list") + utils.fake_execute( + _shell, + "--os-cert mycert --os-key mickey module list" + ) self.assertEqual('mycert', _shell.options.cert) self.assertEqual('mickey', _shell.options.key) self.assertEqual(('mycert', 'mickey'), _shell.client_manager.cert) @@ -437,9 +281,9 @@ class TestShellCli(TestShell): def test_shell_args_cloud_no_vendor(self, config_mock): """Test cloud config options without the vendor file""" config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1)) - _shell = make_shell() + _shell = utils.make_shell() - fake_execute( + utils.fake_execute( _shell, "--os-cloud scc module list", ) @@ -488,9 +332,9 @@ class TestShellCli(TestShell): """Test cloud config options with the vendor file""" config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2)) public_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) - _shell = make_shell() + _shell = utils.make_shell() - fake_execute( + utils.fake_execute( _shell, "--os-cloud megacloud module list", ) @@ -536,10 +380,10 @@ class TestShellCli(TestShell): def test_shell_args_precedence(self, config_mock, vendor_mock): config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2)) vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) - _shell = make_shell() + _shell = utils.make_shell() # Test command option overriding config file value - fake_execute( + utils.fake_execute( _shell, "--os-cloud megacloud --os-region-name krikkit module list", ) @@ -577,7 +421,7 @@ class TestShellCli(TestShell): ) -class TestShellCliPrecedence(TestShell): +class TestShellCliPrecedence(utils.TestShell): """Test option precedencr order""" def setUp(self): @@ -586,7 +430,7 @@ class TestShellCliPrecedence(TestShell): 'OS_CLOUD': 'megacloud', 'OS_REGION_NAME': 'occ-env', } - self.useFixture(EnvFixture(env.copy())) + self.useFixture(utils.EnvFixture(env.copy())) @mock.patch("os_client_config.config.OpenStackConfig._load_vendor_file") @mock.patch("os_client_config.config.OpenStackConfig._load_config_file") @@ -594,10 +438,10 @@ class TestShellCliPrecedence(TestShell): """Test environment overriding occ""" config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2)) vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) - _shell = make_shell() + _shell = utils.make_shell() # Test env var - fake_execute( + utils.fake_execute( _shell, "module list", ) @@ -642,10 +486,10 @@ class TestShellCliPrecedence(TestShell): """Test command line overriding environment and occ""" config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2)) vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) - _shell = make_shell() + _shell = utils.make_shell() # Test command option overriding config file value - fake_execute( + utils.fake_execute( _shell, "--os-region-name krikkit list user", ) @@ -690,10 +534,10 @@ class TestShellCliPrecedence(TestShell): """Test command line overriding environment and occ""" config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1)) vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) - _shell = make_shell() + _shell = utils.make_shell() # Test command option overriding config file value - fake_execute( + utils.fake_execute( _shell, "--os-cloud scc --os-region-name krikkit list user", ) diff --git a/osc_lib/tests/utils.py b/osc_lib/tests/utils.py index 03fd647..8fa5889 100644 --- a/osc_lib/tests/utils.py +++ b/osc_lib/tests/utils.py @@ -14,14 +14,69 @@ # under the License. # +import copy +import json as jsonutils +import mock import os import fixtures +from keystoneauth1 import loading +from os_client_config import cloud_config +from requests_mock.contrib import fixture import testtools +from osc_lib import clientmanager +from osc_lib import shell from osc_lib.tests import fakes +def fake_execute(shell, cmd): + """Pretend to execute shell commands.""" + return shell.run(cmd.split()) + + +def make_shell(shell_class=None): + """Create a new command shell and mock out some bits.""" + if shell_class is None: + shell_class = shell.OpenStackShell + _shell = shell_class() + _shell.command_manager = mock.Mock() + # _shell.cloud = mock.Mock() + + return _shell + + +def opt2attr(opt): + if opt.startswith('--os-'): + attr = opt[5:] + elif opt.startswith('--'): + attr = opt[2:] + else: + attr = opt + return attr.lower().replace('-', '_') + + +def opt2env(opt): + return opt[2:].upper().replace('-', '_') + + +class EnvFixture(fixtures.Fixture): + """Environment Fixture. + + This fixture replaces os.environ with provided env or an empty env. + """ + + def __init__(self, env=None): + self.new_env = env or {} + + def _setUp(self): + self.orig_env, os.environ = os.environ, self.new_env + self.addCleanup(self.revert) + + def revert(self): + os.environ = self.orig_env + + class ParserException(Exception): pass @@ -73,3 +128,263 @@ class TestCommand(TestCase): self.assertIn(attr, parsed_args) self.assertEqual(value, getattr(parsed_args, attr)) return parsed_args + + +class TestClientManager(TestCase): + """ClientManager class test framework""" + + default_password_auth = { + 'auth_url': fakes.AUTH_URL, + 'username': fakes.USERNAME, + 'password': fakes.PASSWORD, + 'project_name': fakes.PROJECT_NAME, + } + default_token_auth = { + 'auth_url': fakes.AUTH_URL, + 'token': fakes.AUTH_TOKEN, + } + + def setUp(self): + super(TestClientManager, self).setUp() + self.mock = mock.Mock() + self.requests = self.useFixture(fixture.Fixture()) + # fake v2password token retrieval + self.stub_auth(json=fakes.TEST_RESPONSE_DICT) + # fake token and token_endpoint retrieval + self.stub_auth(json=fakes.TEST_RESPONSE_DICT, + url='/'.join([fakes.AUTH_URL, 'v2.0/tokens'])) + # fake v3password token retrieval + self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3, + url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens'])) + # fake password token retrieval + self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3, + url='/'.join([fakes.AUTH_URL, 'auth/tokens'])) + # fake password version endpoint discovery + self.stub_auth(json=fakes.TEST_VERSIONS, + url=fakes.AUTH_URL, + verb='GET') + + # Mock the auth plugin + self.auth_mock = mock.Mock() + + def stub_auth(self, json=None, url=None, verb=None, **kwargs): + subject_token = fakes.AUTH_TOKEN + base_url = fakes.AUTH_URL + if json: + text = jsonutils.dumps(json) + headers = { + 'X-Subject-Token': subject_token, + 'Content-Type': 'application/json', + } + if not url: + url = '/'.join([base_url, 'tokens']) + url = url.replace("/?", "?") + if not verb: + verb = 'POST' + self.requests.register_uri( + verb, + url, + headers=headers, + text=text, + ) + + def _clientmanager_class(self): + """Allow subclasses to override the ClientManager class""" + return clientmanager.ClientManager + + def _make_clientmanager( + self, + auth_args=None, + config_args=None, + identity_api_version=None, + auth_plugin_name=None, + ): + + if identity_api_version is None: + identity_api_version = '2.0' + if auth_plugin_name is None: + auth_plugin_name = 'password' + + if auth_plugin_name.endswith('password'): + auth_dict = copy.deepcopy(self.default_password_auth) + elif auth_plugin_name.endswith('token'): + auth_dict = copy.deepcopy(self.default_token_auth) + else: + auth_dict = {} + + if auth_args is not None: + auth_dict = auth_args + + cli_options = { + 'auth_type': auth_plugin_name, + 'auth': auth_dict, + 'interface': fakes.INTERFACE, + 'region_name': fakes.REGION_NAME, + } + if config_args is not None: + cli_options.update(config_args) + + loader = loading.get_plugin_loader(auth_plugin_name) + auth_plugin = loader.load_from_options(**auth_dict) + client_manager = self._clientmanager_class()( + cli_options=cloud_config.CloudConfig( + name='t1', + region='1', + config=cli_options, + auth_plugin=auth_plugin, + ), + api_version={ + 'identity': identity_api_version, + }, + ) + client_manager.setup_auth() + client_manager.auth_ref + + self.assertEqual( + auth_plugin_name, + client_manager.auth_plugin_name, + ) + return client_manager + + +class TestShell(TestCase): + + # cliff.app.App subclass + app_patch = "osc_lib.shell.OpenStackShell" + + def setUp(self): + super(TestShell, self).setUp() + self.cmd_patch = mock.patch(self.app_patch + ".run_subcommand") + self.cmd_save = self.cmd_patch.start() + self.addCleanup(self.cmd_patch.stop) + self.app = mock.Mock("Test Shell") + + def _assert_initialize_app_arg(self, cmd_options, default_args): + """Check the args passed to initialize_app() + + The argv argument to initialize_app() is the remainder from parsing + global options declared in both cliff.app and + osc_lib.OpenStackShell build_option_parser(). Any global + options passed on the command line should not be in argv but in + _shell.options. + """ + + with mock.patch( + self.app_patch + ".initialize_app", + self.app, + ): + _shell, _cmd = make_shell(), cmd_options + " module list" + fake_execute(_shell, _cmd) + + self.app.assert_called_with(["module", "list"]) + for k in default_args.keys(): + self.assertEqual( + default_args[k], + vars(_shell.options)[k], + "%s does not match" % k, + ) + + def _assert_cloud_config_arg(self, cmd_options, default_args): + """Check the args passed to cloud_config.get_one_cloud() + + The argparse argument to get_one_cloud() is an argparse.Namespace + object that contains all of the options processed to this point in + initialize_app(). + """ + + cloud = mock.Mock(name="cloudy") + cloud.config = {} + self.occ_get_one = mock.Mock(return_value=cloud) + with mock.patch( + "os_client_config.config.OpenStackConfig.get_one_cloud", + self.occ_get_one, + ): + _shell, _cmd = make_shell(), cmd_options + " module list" + fake_execute(_shell, _cmd) + + self.app.assert_called_with(["module", "list"]) + opts = self.occ_get_one.call_args[1]['argparse'] + for k in default_args.keys(): + self.assertEqual( + default_args[k], + vars(opts)[k], + "%s does not match" % k, + ) + + def _assert_token_auth(self, cmd_options, default_args): + with mock.patch(self.app_patch + ".initialize_app", + self.app): + _shell, _cmd = make_shell(), cmd_options + " list role" + fake_execute(_shell, _cmd) + + self.app.assert_called_with(["list", "role"]) + self.assertEqual( + default_args.get("token", ''), + _shell.options.token, + "token" + ) + self.assertEqual( + default_args.get("auth_url", ''), + _shell.options.auth_url, + "auth_url" + ) + + def _test_options_init_app(self, test_opts): + """Test options on the command line""" + for opt in test_opts.keys(): + if not test_opts[opt][1]: + continue + key = opt2attr(opt) + if isinstance(test_opts[opt][0], str): + cmd = opt + " " + test_opts[opt][0] + else: + cmd = opt + kwargs = { + key: test_opts[opt][0], + } + self._assert_initialize_app_arg(cmd, kwargs) + + def _test_env_init_app(self, test_opts): + """Test options in the environment""" + for opt in test_opts.keys(): + if not test_opts[opt][2]: + continue + key = opt2attr(opt) + kwargs = { + key: test_opts[opt][0], + } + env = { + opt2env(opt): test_opts[opt][0], + } + os.environ = env.copy() + self._assert_initialize_app_arg("", kwargs) + + def _test_options_get_one_cloud(self, test_opts): + """Test options sent "to os_client_config""" + for opt in test_opts.keys(): + if not test_opts[opt][1]: + continue + key = opt2attr(opt) + if isinstance(test_opts[opt][0], str): + cmd = opt + " " + test_opts[opt][0] + else: + cmd = opt + kwargs = { + key: test_opts[opt][0], + } + self._assert_cloud_config_arg(cmd, kwargs) + + def _test_env_get_one_cloud(self, test_opts): + """Test environment options sent "to os_client_config""" + for opt in test_opts.keys(): + if not test_opts[opt][2]: + continue + key = opt2attr(opt) + kwargs = { + key: test_opts[opt][0], + } + env = { + opt2env(opt): test_opts[opt][0], + } + os.environ = env.copy() + self._assert_cloud_config_arg("", kwargs)