Merge "Make OSC_Config the default"

This commit is contained in:
Jenkins 2016-07-02 18:50:47 +00:00 committed by Gerrit Code Review
commit 6dafb78290
7 changed files with 524 additions and 544 deletions

View File

@ -67,74 +67,6 @@ def get_options_list():
return OPTIONS_LIST return OPTIONS_LIST
def select_auth_plugin(options):
"""Pick an auth plugin based on --os-auth-type or other options"""
auth_plugin_name = None
# Do the token/url check first as this must override the default
# 'password' set by os-client-config
# Also, url and token are not copied into o-c-c's auth dict (yet?)
if options.auth.get('url') and options.auth.get('token'):
# service token authentication
auth_plugin_name = 'token_endpoint'
elif options.auth_type in PLUGIN_LIST:
# A direct plugin name was given, use it
auth_plugin_name = options.auth_type
elif options.auth.get('username'):
if options.identity_api_version == '3':
auth_plugin_name = 'v3password'
elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2password'
else:
# let keystoneauth figure it out itself
auth_plugin_name = 'password'
elif options.auth.get('token'):
if options.identity_api_version == '3':
auth_plugin_name = 'v3token'
elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2token'
else:
# let keystoneauth figure it out itself
auth_plugin_name = 'token'
else:
# The ultimate default is similar to the original behaviour,
# but this time with version discovery
auth_plugin_name = 'password'
LOG.debug("Auth plugin %s selected", auth_plugin_name)
return auth_plugin_name
def build_auth_params(auth_plugin_name, cmd_options):
if auth_plugin_name:
LOG.debug('auth_type: %s', auth_plugin_name)
auth_plugin_loader = base.get_plugin_loader(auth_plugin_name)
auth_params = {
opt.dest: opt.default
for opt in base.get_plugin_options(auth_plugin_name)
}
auth_params.update(dict(cmd_options.auth))
# grab tenant from project for v2.0 API compatibility
if auth_plugin_name.startswith("v2"):
if 'project_id' in auth_params:
auth_params['tenant_id'] = auth_params['project_id']
del auth_params['project_id']
if 'project_name' in auth_params:
auth_params['tenant_name'] = auth_params['project_name']
del auth_params['project_name']
else:
LOG.debug('no auth_type')
# delay the plugin choice, grab every option
auth_plugin_loader = None
auth_params = dict(cmd_options.auth)
plugin_options = set([o.replace('-', '_') for o in get_options_list()])
for option in plugin_options:
LOG.debug('fetching option %s', option)
auth_params[option] = getattr(cmd_options.auth, option, None)
return (auth_plugin_loader, auth_params)
def check_valid_authorization_options(options, auth_plugin_name): def check_valid_authorization_options(options, auth_plugin_name):
"""Validate authorization options, and provide helpful error messages.""" """Validate authorization options, and provide helpful error messages."""
if (options.auth.get('project_id') and not if (options.auth.get('project_id') and not

View File

@ -87,7 +87,6 @@ class ClientManager(object):
self._cli_options = cli_options self._cli_options = cli_options
self._api_version = api_version self._api_version = api_version
self._pw_callback = pw_func self._pw_callback = pw_func
self._url = self._cli_options.auth.get('url')
self.region_name = self._cli_options.region_name self.region_name = self._cli_options.region_name
self.interface = self._cli_options.interface self.interface = self._cli_options.interface
@ -197,9 +196,8 @@ class ClientManager(object):
if self._auth_setup_completed: if self._auth_setup_completed:
return return
# If no auth type is named by the user, select one based on # Stash the selected auth type
# the supplied options self.auth_plugin_name = self._cli_options.config['auth_type']
self.auth_plugin_name = auth.select_auth_plugin(self._cli_options)
# Basic option checking to avoid unhelpful error messages # Basic option checking to avoid unhelpful error messages
auth.check_valid_authentication_options( auth.check_valid_authentication_options(
@ -213,23 +211,16 @@ class ClientManager(object):
not self._cli_options.auth.get('password')): not self._cli_options.auth.get('password')):
self._cli_options.auth['password'] = self._pw_callback() self._cli_options.auth['password'] = self._pw_callback()
(auth_plugin, self._auth_params) = auth.build_auth_params(
self.auth_plugin_name,
self._cli_options,
)
self._set_default_scope_options()
# For compatibility until all clients can be updated # For compatibility until all clients can be updated
if 'project_name' in self._auth_params: if 'project_name' in self._cli_options.auth:
self._project_name = self._auth_params['project_name'] self._project_name = self._cli_options.auth['project_name']
elif 'tenant_name' in self._auth_params: elif 'tenant_name' in self._cli_options.auth:
self._project_name = self._auth_params['tenant_name'] self._project_name = self._cli_options.auth['tenant_name']
LOG.info('Using auth plugin: %s', self.auth_plugin_name) LOG.info('Using auth plugin: %s', self.auth_plugin_name)
LOG.debug('Using parameters %s', LOG.debug('Using parameters %s',
strutils.mask_password(self._auth_params)) strutils.mask_password(self._cli_options.auth))
self.auth = auth_plugin.load_from_options(**self._auth_params) self.auth = self._cli_options.get_auth()
# needed by SAML authentication # needed by SAML authentication
request_session = requests.session() request_session = requests.session()
self.session = osc_session.TimingSession( self.session = osc_session.TimingSession(

View File

@ -29,6 +29,7 @@ from cliff import help
from oslo_utils import importutils from oslo_utils import importutils
from oslo_utils import strutils from oslo_utils import strutils
from osc_lib.cli import client_config as cloud_config
from osc_lib import clientmanager from osc_lib import clientmanager
from osc_lib.command import commandmanager from osc_lib.command import commandmanager
from osc_lib.command import timing from osc_lib.command import timing
@ -37,8 +38,6 @@ from osc_lib.i18n import _
from osc_lib import logs from osc_lib import logs
from osc_lib import utils from osc_lib import utils
from os_client_config import config as cloud_config
osprofiler_profiler = importutils.try_import("osprofiler.profiler") osprofiler_profiler = importutils.try_import("osprofiler.profiler")
@ -102,7 +101,7 @@ class OpenStackShell(app.App):
if not command_manager: if not command_manager:
cm = commandmanager.CommandManager('openstack.cli') cm = commandmanager.CommandManager('openstack.cli')
else: else:
command_manager cm = command_manager
super(OpenStackShell, self).__init__( super(OpenStackShell, self).__init__(
description=__doc__.strip(), description=__doc__.strip(),
@ -374,7 +373,7 @@ class OpenStackShell(app.App):
# Ignore the default value of interface. Only if it is set later # Ignore the default value of interface. Only if it is set later
# will it be used. # will it be used.
try: try:
cc = cloud_config.OpenStackConfig( cc = cloud_config.OSC_Config(
override_defaults={ override_defaults={
'interface': None, 'interface': None,
'auth_type': self._auth_type, 'auth_type': self._auth_type,

View File

@ -134,7 +134,6 @@ class FakeClientManager(object):
self.session = None self.session = None
self.auth_ref = None self.auth_ref = None
self.auth_plugin_name = None self.auth_plugin_name = None
self.network_endpoint_enabled = True
def get_configuration(self): def get_configuration(self):
return { return {
@ -147,9 +146,6 @@ class FakeClientManager(object):
'identity_api_version': VERSION, 'identity_api_version': VERSION,
} }
def is_network_endpoint_enabled(self):
return self.network_endpoint_enabled
class FakeModule(object): class FakeModule(object):

View File

@ -13,12 +13,14 @@
# under the License. # under the License.
# #
import json as jsonutils import copy
import mock import mock
from keystoneauth1.access import service_catalog from keystoneauth1.access import service_catalog
from keystoneauth1.identity import v2 as auth_v2 from keystoneauth1 import exceptions as ksa_exceptions
from requests_mock.contrib import fixture from keystoneauth1.identity import generic as generic_plugin
from keystoneauth1 import loading
from os_client_config import cloud_config
from osc_lib.api import auth from osc_lib.api import auth
from osc_lib import clientmanager from osc_lib import clientmanager
@ -26,8 +28,6 @@ from osc_lib import exceptions as exc
from osc_lib.tests import fakes from osc_lib.tests import fakes
from osc_lib.tests import utils from osc_lib.tests import utils
API_VERSION = {"identity": "2.0"}
AUTH_REF = {'version': 'v2.0'} AUTH_REF = {'version': 'v2.0'}
AUTH_REF.update(fakes.TEST_RESPONSE_DICT['access']) AUTH_REF.update(fakes.TEST_RESPONSE_DICT['access'])
SERVICE_CATALOG = service_catalog.ServiceCatalogV2(AUTH_REF) SERVICE_CATALOG = service_catalog.ServiceCatalogV2(AUTH_REF)
@ -45,27 +45,6 @@ class Container(object):
pass pass
class FakeOptions(object):
def __init__(self, **kwargs):
for option in auth.OPTIONS_LIST:
setattr(self, option.replace('-', '_'), None)
self.auth_type = None
self.verify = True
self.cacert = None
self.insecure = None
self.identity_api_version = '2.0'
self.timing = None
self.region_name = None
self.interface = None
self.url = None
self.auth = {}
self.cert = None
self.key = None
self.default_domain = 'default'
self.__dict__.update(kwargs)
class TestClientCache(utils.TestCase): class TestClientCache(utils.TestCase):
def test_singleton(self): def test_singleton(self):
@ -82,61 +61,29 @@ class TestClientCache(utils.TestCase):
self.assertEqual("'Container' object has no attribute 'foo'", str(err)) self.assertEqual("'Container' object has no attribute 'foo'", str(err))
class TestClientManager(utils.TestCase): class TestClientManager(utils.TestClientManager):
def setUp(self):
super(TestClientManager, self).setUp()
self.mock = mock.Mock()
self.requests = self.useFixture(fixture.Fixture())
# fake v2password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT)
# fake token and token_endpoint retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT,
url='/'.join([fakes.AUTH_URL, 'v2.0/tokens']))
# fake v3password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens']))
# fake password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'auth/tokens']))
# fake password version endpoint discovery
self.stub_auth(json=fakes.TEST_VERSIONS,
url=fakes.AUTH_URL,
verb='GET')
def test_client_manager_password(self): def test_client_manager_password(self):
client_manager = self._make_clientmanager()
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
),
api_version=API_VERSION,
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertEqual( self.assertEqual(
fakes.AUTH_URL, fakes.AUTH_URL,
client_manager._auth_url, client_manager._cli_options.config['auth']['auth_url'],
) )
self.assertEqual( self.assertEqual(
fakes.USERNAME, fakes.USERNAME,
client_manager._username, client_manager._cli_options.config['auth']['username'],
) )
self.assertEqual( self.assertEqual(
fakes.PASSWORD, fakes.PASSWORD,
client_manager._password, client_manager._cli_options.config['auth']['password'],
) )
self.assertIsInstance( self.assertIsInstance(
client_manager.auth, client_manager.auth,
auth_v2.Password, generic_plugin.Password,
) )
self.assertTrue(client_manager.verify) self.assertTrue(client_manager.verify)
self.assertIsNone(client_manager.cert)
# These need to stick around until the old-style clients are gone # These need to stick around until the old-style clients are gone
self.assertEqual( self.assertEqual(
@ -153,84 +100,20 @@ class TestClientManager(utils.TestCase):
) )
self.assertTrue(client_manager.is_service_available('network')) self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_endpoint_disabled(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
user_domain_name='default',
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
project_domain_name='default',
),
auth_type='v3password',
),
api_version={"identity": "3"},
)
client_manager.setup_auth()
client_manager.auth_ref
# v3 fake doesn't have network endpoint.
self.assertFalse(client_manager.is_service_available('network'))
def stub_auth(self, json=None, url=None, verb=None, **kwargs):
subject_token = fakes.AUTH_TOKEN
base_url = fakes.AUTH_URL
if json:
text = jsonutils.dumps(json)
headers = {'X-Subject-Token': subject_token,
'Content-Type': 'application/json'}
if not url:
url = '/'.join([base_url, 'tokens'])
url = url.replace("/?", "?")
if not verb:
verb = 'POST'
self.requests.register_uri(verb,
url,
headers=headers,
text=text)
def test_client_manager_password_verify(self): def test_client_manager_password_verify(self):
client_manager = self._make_clientmanager()
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
verify=True,
),
api_version=API_VERSION,
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertTrue(client_manager.verify) self.assertTrue(client_manager.verify)
self.assertEqual(None, client_manager.cacert) self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_service_available('network')) self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_password_verify_ca(self): def test_client_manager_password_verify_ca(self):
config_args = {
client_manager = clientmanager.ClientManager( 'cacert': 'cafile',
cli_options=FakeOptions( }
auth=dict( client_manager = self._make_clientmanager(
auth_url=fakes.AUTH_URL, config_args=config_args,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
cacert='cafile',
),
api_version=API_VERSION,
) )
client_manager.setup_auth()
client_manager.auth_ref
# Test that client_manager.verify is Requests-compatible, # Test that client_manager.verify is Requests-compatible,
# i.e. it contains the value of cafile here # i.e. it contains the value of cafile here
@ -240,146 +123,151 @@ class TestClientManager(utils.TestCase):
self.assertTrue(client_manager.is_service_available('network')) self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_password_verify_insecure(self): def test_client_manager_password_verify_insecure(self):
config_args = {
client_manager = clientmanager.ClientManager( 'insecure': True,
cli_options=FakeOptions( }
auth=dict( client_manager = self._make_clientmanager(
auth_url=fakes.AUTH_URL, config_args=config_args,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
insecure=True,
),
api_version=API_VERSION,
) )
client_manager.setup_auth()
client_manager.auth_ref
self.assertFalse(client_manager.verify) self.assertFalse(client_manager.verify)
self.assertEqual(None, client_manager.cacert) self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_service_available('network')) self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_password_verify_insecure_ca(self): def test_client_manager_password_verify_insecure_ca(self):
config_args = {
client_manager = clientmanager.ClientManager( 'insecure': True,
cli_options=FakeOptions( 'cacert': 'cafile',
auth=dict( }
auth_url=fakes.AUTH_URL, client_manager = self._make_clientmanager(
username=fakes.USERNAME, config_args=config_args,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
insecure=True,
cacert='cafile',
),
api_version=API_VERSION,
) )
client_manager.setup_auth()
client_manager.auth_ref
# insecure overrides cacert # insecure overrides cacert
self.assertFalse(client_manager.verify) self.assertFalse(client_manager.verify)
self.assertEqual(None, client_manager.cacert) self.assertEqual(None, client_manager.cacert)
self.assertTrue(client_manager.is_service_available('network')) self.assertTrue(client_manager.is_service_available('network'))
def test_client_manager_password_no_cert(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions())
self.assertIsNone(client_manager.cert)
def test_client_manager_password_client_cert(self): def test_client_manager_password_client_cert(self):
client_manager = clientmanager.ClientManager( config_args = {
cli_options=FakeOptions(cert='cert')) 'cert': 'cert',
}
client_manager = self._make_clientmanager(
config_args=config_args,
)
self.assertEqual('cert', client_manager.cert) self.assertEqual('cert', client_manager.cert)
def test_client_manager_password_client_cert_and_key(self): def test_client_manager_password_client_key(self):
client_manager = clientmanager.ClientManager( config_args = {
cli_options=FakeOptions(cert='cert', key='key')) 'cert': 'cert',
'key': 'key',
}
client_manager = self._make_clientmanager(
config_args=config_args,
)
self.assertEqual(('cert', 'key'), client_manager.cert) self.assertEqual(('cert', 'key'), client_manager.cert)
def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name): def test_client_manager_select_auth_plugin_password(self):
auth_params['auth_type'] = auth_plugin_name
auth_params['identity_api_version'] = api_version
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(**auth_params),
api_version={"identity": api_version},
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertEqual(
auth_plugin_name,
client_manager.auth_plugin_name,
)
def test_client_manager_select_auth_plugin(self):
# test token auth
params = dict(
auth=dict(
auth_url=fakes.AUTH_URL,
token=fakes.AUTH_TOKEN,
),
)
self._select_auth_plugin(params, '2.0', 'v2token')
self._select_auth_plugin(params, '3', 'v3token')
self._select_auth_plugin(params, 'XXX', 'token')
# test token/endpoint auth
params = dict(
auth_plugin='token_endpoint',
auth=dict(
url='test',
token=fakes.AUTH_TOKEN,
),
)
# test password auth # test password auth
params = dict( auth_args = {
auth=dict( 'auth_url': fakes.AUTH_URL,
auth_url=fakes.AUTH_URL, 'username': fakes.USERNAME,
username=fakes.USERNAME, 'password': fakes.PASSWORD,
password=fakes.PASSWORD, 'tenant_name': fakes.PROJECT_NAME,
project_name=fakes.PROJECT_NAME, }
), self._make_clientmanager(
auth_args=auth_args,
identity_api_version='2.0',
auth_plugin_name='v2password',
) )
self._select_auth_plugin(params, '2.0', 'v2password')
params = dict( auth_args = copy.deepcopy(self.default_password_auth)
auth=dict( auth_args.update({
auth_url=fakes.AUTH_URL, 'user_domain_name': 'default',
username=fakes.USERNAME, 'project_domain_name': 'default',
user_domain_name='default', })
password=fakes.PASSWORD, self._make_clientmanager(
project_name=fakes.PROJECT_NAME, auth_args=auth_args,
project_domain_name='default', identity_api_version='3',
), auth_plugin_name='v3password',
)
# Use v2.0 auth args
auth_args = {
'auth_url': fakes.AUTH_URL,
'username': fakes.USERNAME,
'password': fakes.PASSWORD,
'tenant_name': fakes.PROJECT_NAME,
}
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='2.0',
)
# Use v3 auth args
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.update({
'user_domain_name': 'default',
'project_domain_name': 'default',
})
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='3',
)
def test_client_manager_select_auth_plugin_token(self):
# test token auth
self._make_clientmanager(
# auth_args=auth_args,
identity_api_version='2.0',
auth_plugin_name='v2token',
)
self._make_clientmanager(
# auth_args=auth_args,
identity_api_version='3',
auth_plugin_name='v3token',
)
self._make_clientmanager(
# auth_args=auth_args,
identity_api_version='x',
auth_plugin_name='token',
) )
self._select_auth_plugin(params, '3', 'v3password')
self._select_auth_plugin(params, 'XXX', 'password')
def test_client_manager_select_auth_plugin_failure(self): def test_client_manager_select_auth_plugin_failure(self):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(os_auth_plugin=''),
api_version=API_VERSION,
)
self.assertRaises( self.assertRaises(
exc.CommandError, ksa_exceptions.NoMatchingPlugin,
client_manager.setup_auth, self._make_clientmanager,
identity_api_version='3',
auth_plugin_name='bad_plugin',
) )
@mock.patch('osc_lib.api.auth.check_valid_authentication_options') @mock.patch('osc_lib.api.auth.check_valid_authentication_options')
def test_client_manager_auth_setup_once(self, check_authn_options_func): def test_client_manager_auth_setup_once(self, check_authn_options_func):
client_manager = clientmanager.ClientManager( auth_dict = {
cli_options=FakeOptions( 'auth_url': fakes.AUTH_URL,
auth=dict( 'username': fakes.USERNAME,
auth_url=fakes.AUTH_URL, 'password': fakes.PASSWORD,
username=fakes.USERNAME, 'project_name': fakes.PROJECT_NAME,
password=fakes.PASSWORD, }
project_name=fakes.PROJECT_NAME, loader = loading.get_plugin_loader('password')
auth_plugin = loader.load_from_options(**auth_dict)
client_manager = self._clientmanager_class()(
cli_options=cloud_config.CloudConfig(
name='t1',
region='1',
config=dict(
auth_type='password',
auth=auth_dict,
interface=fakes.INTERFACE,
region_name=fakes.REGION_NAME,
), ),
auth_plugin=auth_plugin,
), ),
api_version=API_VERSION, api_version={
'identity': '2.0',
},
) )
self.assertFalse(client_manager._auth_setup_completed) self.assertFalse(client_manager._auth_setup_completed)
client_manager.setup_auth() client_manager.setup_auth()
@ -391,3 +279,18 @@ class TestClientManager(utils.TestCase):
check_authn_options_func.reset_mock() check_authn_options_func.reset_mock()
client_manager.auth_ref client_manager.auth_ref
check_authn_options_func.assert_not_called() check_authn_options_func.assert_not_called()
def test_client_manager_endpoint_disabled(self):
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.update({
'user_domain_name': 'default',
'project_domain_name': 'default',
})
# v3 fake doesn't have network endpoint
client_manager = self._make_clientmanager(
auth_args=auth_args,
identity_api_version='3',
auth_plugin_name='v3password',
)
self.assertFalse(client_manager.is_service_available('network'))

View File

@ -14,12 +14,10 @@
# #
import copy import copy
import fixtures
import mock import mock
import os import os
import testtools import testtools
from osc_lib import shell
from osc_lib.tests import utils from osc_lib.tests import utils
@ -117,68 +115,12 @@ global_options = {
} }
def opt2attr(opt): class TestShellHelp(utils.TestShell):
if opt.startswith('--os-'):
attr = opt[5:]
elif opt.startswith('--'):
attr = opt[2:]
else:
attr = opt
return attr.lower().replace('-', '_')
def opt2env(opt):
return opt[2:].upper().replace('-', '_')
def make_shell():
"""Create a new command shell and mock out some bits."""
_shell = shell.OpenStackShell()
_shell.command_manager = mock.Mock()
_shell.cloud = mock.Mock()
return _shell
def fake_execute(shell, cmd):
"""Pretend to execute shell commands."""
return shell.run(cmd.split())
class EnvFixture(fixtures.Fixture):
"""Environment Fixture.
This fixture replaces os.environ with provided env or an empty env.
"""
def __init__(self, env=None):
self.new_env = env or {}
def _setUp(self):
self.orig_env, os.environ = os.environ, self.new_env
self.addCleanup(self.revert)
def revert(self):
os.environ = self.orig_env
class TestShell(utils.TestCase):
def setUp(self):
super(TestShell, self).setUp()
patch = "osc_lib.shell.OpenStackShell.run_subcommand"
self.cmd_patch = mock.patch(patch)
self.cmd_save = self.cmd_patch.start()
self.addCleanup(self.cmd_patch.stop)
self.app = mock.Mock("Test Shell")
class TestShellHelp(TestShell):
"""Test the deferred help flag""" """Test the deferred help flag"""
def setUp(self): def setUp(self):
super(TestShellHelp, self).setUp() super(TestShellHelp, self).setUp()
self.useFixture(EnvFixture()) self.useFixture(utils.EnvFixture())
@testtools.skip("skip until bug 1444983 is resolved") @testtools.skip("skip until bug 1444983 is resolved")
def test_help_options(self): def test_help_options(self):
@ -186,12 +128,9 @@ class TestShellHelp(TestShell):
kwargs = { kwargs = {
"deferred_help": True, "deferred_help": True,
} }
with mock.patch( with mock.patch(self.app_patch + ".initialize_app", self.app):
"osc_lib.shell.OpenStackShell.initialize_app", _shell, _cmd = utils.make_shell(), flag
self.app, utils.fake_execute(_shell, _cmd)
):
_shell, _cmd = make_shell(), flag
fake_execute(_shell, _cmd)
self.assertEqual( self.assertEqual(
kwargs["deferred_help"], kwargs["deferred_help"],
@ -199,7 +138,7 @@ class TestShellHelp(TestShell):
) )
class TestShellOptions(TestShell): class TestShellOptions(utils.TestShell):
"""Test the option handling by argparse and os_client_config """Test the option handling by argparse and os_client_config
This covers getting the CLI options through the initial processing This covers getting the CLI options through the initial processing
@ -208,119 +147,12 @@ class TestShellOptions(TestShell):
def setUp(self): def setUp(self):
super(TestShellOptions, self).setUp() super(TestShellOptions, self).setUp()
self.useFixture(EnvFixture()) self.useFixture(utils.EnvFixture())
def _assert_initialize_app_arg(self, cmd_options, default_args): def test_empty_auth(self):
"""Check the args passed to initialize_app() os.environ = {}
self._assert_initialize_app_arg("", {})
The argv argument to initialize_app() is the remainder from parsing self._assert_cloud_config_arg("", {})
global options declared in both cliff.app and
osc_lib.OpenStackShell build_option_parser(). Any global
options passed on the command line should not be in argv but in
_shell.options.
"""
with mock.patch(
"osc_lib.shell.OpenStackShell.initialize_app",
self.app,
):
_shell, _cmd = make_shell(), cmd_options + " module list"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["module", "list"])
for k in default_args.keys():
self.assertEqual(
default_args[k],
vars(_shell.options)[k],
"%s does not match" % k,
)
def _assert_cloud_config_arg(self, cmd_options, default_args):
"""Check the args passed to cloud_config.get_one_cloud()
The argparse argument to get_one_cloud() is an argparse.Namespace
object that contains all of the options processed to this point in
initialize_app().
"""
cloud = mock.Mock(name="cloudy")
cloud.config = {}
self.occ_get_one = mock.Mock(return_value=cloud)
with mock.patch(
"os_client_config.config.OpenStackConfig.get_one_cloud",
self.occ_get_one,
):
_shell, _cmd = make_shell(), cmd_options + " module list"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["module", "list"])
opts = self.occ_get_one.call_args[1]['argparse']
for k in default_args.keys():
self.assertEqual(
default_args[k],
vars(opts)[k],
"%s does not match" % k,
)
def _test_options_init_app(self, test_opts):
"""Test options on the command line"""
for opt in test_opts.keys():
if not test_opts[opt][1]:
continue
key = opt2attr(opt)
if isinstance(test_opts[opt][0], str):
cmd = opt + " " + test_opts[opt][0]
else:
cmd = opt
kwargs = {
key: test_opts[opt][0],
}
self._assert_initialize_app_arg(cmd, kwargs)
def _test_env_init_app(self, test_opts):
"""Test options in the environment"""
for opt in test_opts.keys():
if not test_opts[opt][2]:
continue
key = opt2attr(opt)
kwargs = {
key: test_opts[opt][0],
}
env = {
opt2env(opt): test_opts[opt][0],
}
os.environ = env.copy()
self._assert_initialize_app_arg("", kwargs)
def _test_options_get_one_cloud(self, test_opts):
"""Test options sent "to os_client_config"""
for opt in test_opts.keys():
if not test_opts[opt][1]:
continue
key = opt2attr(opt)
if isinstance(test_opts[opt][0], str):
cmd = opt + " " + test_opts[opt][0]
else:
cmd = opt
kwargs = {
key: test_opts[opt][0],
}
self._assert_cloud_config_arg(cmd, kwargs)
def _test_env_get_one_cloud(self, test_opts):
"""Test environment options sent "to os_client_config"""
for opt in test_opts.keys():
if not test_opts[opt][2]:
continue
key = opt2attr(opt)
kwargs = {
key: test_opts[opt][0],
}
env = {
opt2env(opt): test_opts[opt][0],
}
os.environ = env.copy()
self._assert_cloud_config_arg("", kwargs)
def test_no_options(self): def test_no_options(self):
os.environ = {} os.environ = {}
@ -336,7 +168,7 @@ class TestShellOptions(TestShell):
self._test_env_get_one_cloud(global_options) self._test_env_get_one_cloud(global_options)
class TestShellCli(TestShell): class TestShellCli(utils.TestShell):
"""Test handling of specific global options """Test handling of specific global options
_shell.options is the parsed command line from argparse _shell.options is the parsed command line from argparse
@ -347,14 +179,23 @@ class TestShellCli(TestShell):
def setUp(self): def setUp(self):
super(TestShellCli, self).setUp() super(TestShellCli, self).setUp()
env = {} env = {}
self.useFixture(EnvFixture(env.copy())) self.useFixture(utils.EnvFixture(env.copy()))
def test_shell_args_no_options(self):
_shell = utils.make_shell()
with mock.patch(
"osc_lib.shell.OpenStackShell.initialize_app",
self.app,
):
utils.fake_execute(_shell, "list user")
self.app.assert_called_with(["list", "user"])
def test_shell_args_tls_options(self): def test_shell_args_tls_options(self):
"""Test the TLS verify and CA cert file options""" """Test the TLS verify and CA cert file options"""
_shell = make_shell() _shell = utils.make_shell()
# Default # Default
fake_execute(_shell, "module list") utils.fake_execute(_shell, "module list")
self.assertIsNone(_shell.options.verify) self.assertIsNone(_shell.options.verify)
self.assertIsNone(_shell.options.insecure) self.assertIsNone(_shell.options.insecure)
self.assertIsNone(_shell.options.cacert) self.assertIsNone(_shell.options.cacert)
@ -362,7 +203,7 @@ class TestShellCli(TestShell):
self.assertIsNone(_shell.client_manager.cacert) self.assertIsNone(_shell.client_manager.cacert)
# --verify # --verify
fake_execute(_shell, "--verify module list") utils.fake_execute(_shell, "--verify module list")
self.assertTrue(_shell.options.verify) self.assertTrue(_shell.options.verify)
self.assertIsNone(_shell.options.insecure) self.assertIsNone(_shell.options.insecure)
self.assertIsNone(_shell.options.cacert) self.assertIsNone(_shell.options.cacert)
@ -370,7 +211,7 @@ class TestShellCli(TestShell):
self.assertIsNone(_shell.client_manager.cacert) self.assertIsNone(_shell.client_manager.cacert)
# --insecure # --insecure
fake_execute(_shell, "--insecure module list") utils.fake_execute(_shell, "--insecure module list")
self.assertIsNone(_shell.options.verify) self.assertIsNone(_shell.options.verify)
self.assertTrue(_shell.options.insecure) self.assertTrue(_shell.options.insecure)
self.assertIsNone(_shell.options.cacert) self.assertIsNone(_shell.options.cacert)
@ -378,7 +219,7 @@ class TestShellCli(TestShell):
self.assertIsNone(_shell.client_manager.cacert) self.assertIsNone(_shell.client_manager.cacert)
# --os-cacert # --os-cacert
fake_execute(_shell, "--os-cacert foo module list") utils.fake_execute(_shell, "--os-cacert foo module list")
self.assertIsNone(_shell.options.verify) self.assertIsNone(_shell.options.verify)
self.assertIsNone(_shell.options.insecure) self.assertIsNone(_shell.options.insecure)
self.assertEqual('foo', _shell.options.cacert) self.assertEqual('foo', _shell.options.cacert)
@ -386,7 +227,7 @@ class TestShellCli(TestShell):
self.assertEqual('foo', _shell.client_manager.cacert) self.assertEqual('foo', _shell.client_manager.cacert)
# --os-cacert and --verify # --os-cacert and --verify
fake_execute(_shell, "--os-cacert foo --verify module list") utils.fake_execute(_shell, "--os-cacert foo --verify module list")
self.assertTrue(_shell.options.verify) self.assertTrue(_shell.options.verify)
self.assertIsNone(_shell.options.insecure) self.assertIsNone(_shell.options.insecure)
self.assertEqual('foo', _shell.options.cacert) self.assertEqual('foo', _shell.options.cacert)
@ -398,7 +239,7 @@ class TestShellCli(TestShell):
# in this combination --insecure now overrides any # in this combination --insecure now overrides any
# --os-cacert setting, where before --insecure # --os-cacert setting, where before --insecure
# was ignored if --os-cacert was set. # was ignored if --os-cacert was set.
fake_execute(_shell, "--os-cacert foo --insecure module list") utils.fake_execute(_shell, "--os-cacert foo --insecure module list")
self.assertIsNone(_shell.options.verify) self.assertIsNone(_shell.options.verify)
self.assertTrue(_shell.options.insecure) self.assertTrue(_shell.options.insecure)
self.assertEqual('foo', _shell.options.cacert) self.assertEqual('foo', _shell.options.cacert)
@ -407,28 +248,31 @@ class TestShellCli(TestShell):
def test_shell_args_cert_options(self): def test_shell_args_cert_options(self):
"""Test client cert options""" """Test client cert options"""
_shell = make_shell() _shell = utils.make_shell()
# Default # Default
fake_execute(_shell, "module list") utils.fake_execute(_shell, "module list")
self.assertEqual('', _shell.options.cert) self.assertEqual('', _shell.options.cert)
self.assertEqual('', _shell.options.key) self.assertEqual('', _shell.options.key)
self.assertIsNone(_shell.client_manager.cert) self.assertIsNone(_shell.client_manager.cert)
# --os-cert # --os-cert
fake_execute(_shell, "--os-cert mycert module list") utils.fake_execute(_shell, "--os-cert mycert module list")
self.assertEqual('mycert', _shell.options.cert) self.assertEqual('mycert', _shell.options.cert)
self.assertEqual('', _shell.options.key) self.assertEqual('', _shell.options.key)
self.assertEqual('mycert', _shell.client_manager.cert) self.assertEqual('mycert', _shell.client_manager.cert)
# --os-key # --os-key
fake_execute(_shell, "--os-key mickey module list") utils.fake_execute(_shell, "--os-key mickey module list")
self.assertEqual('', _shell.options.cert) self.assertEqual('', _shell.options.cert)
self.assertEqual('mickey', _shell.options.key) self.assertEqual('mickey', _shell.options.key)
self.assertIsNone(_shell.client_manager.cert) self.assertIsNone(_shell.client_manager.cert)
# --os-cert and --os-key # --os-cert and --os-key
fake_execute(_shell, "--os-cert mycert --os-key mickey module list") utils.fake_execute(
_shell,
"--os-cert mycert --os-key mickey module list"
)
self.assertEqual('mycert', _shell.options.cert) self.assertEqual('mycert', _shell.options.cert)
self.assertEqual('mickey', _shell.options.key) self.assertEqual('mickey', _shell.options.key)
self.assertEqual(('mycert', 'mickey'), _shell.client_manager.cert) self.assertEqual(('mycert', 'mickey'), _shell.client_manager.cert)
@ -437,9 +281,9 @@ class TestShellCli(TestShell):
def test_shell_args_cloud_no_vendor(self, config_mock): def test_shell_args_cloud_no_vendor(self, config_mock):
"""Test cloud config options without the vendor file""" """Test cloud config options without the vendor file"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1)) config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1))
_shell = make_shell() _shell = utils.make_shell()
fake_execute( utils.fake_execute(
_shell, _shell,
"--os-cloud scc module list", "--os-cloud scc module list",
) )
@ -488,9 +332,9 @@ class TestShellCli(TestShell):
"""Test cloud config options with the vendor file""" """Test cloud config options with the vendor file"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2)) config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
public_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) public_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell() _shell = utils.make_shell()
fake_execute( utils.fake_execute(
_shell, _shell,
"--os-cloud megacloud module list", "--os-cloud megacloud module list",
) )
@ -536,10 +380,10 @@ class TestShellCli(TestShell):
def test_shell_args_precedence(self, config_mock, vendor_mock): def test_shell_args_precedence(self, config_mock, vendor_mock):
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2)) config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell() _shell = utils.make_shell()
# Test command option overriding config file value # Test command option overriding config file value
fake_execute( utils.fake_execute(
_shell, _shell,
"--os-cloud megacloud --os-region-name krikkit module list", "--os-cloud megacloud --os-region-name krikkit module list",
) )
@ -577,7 +421,7 @@ class TestShellCli(TestShell):
) )
class TestShellCliPrecedence(TestShell): class TestShellCliPrecedence(utils.TestShell):
"""Test option precedencr order""" """Test option precedencr order"""
def setUp(self): def setUp(self):
@ -586,7 +430,7 @@ class TestShellCliPrecedence(TestShell):
'OS_CLOUD': 'megacloud', 'OS_CLOUD': 'megacloud',
'OS_REGION_NAME': 'occ-env', 'OS_REGION_NAME': 'occ-env',
} }
self.useFixture(EnvFixture(env.copy())) self.useFixture(utils.EnvFixture(env.copy()))
@mock.patch("os_client_config.config.OpenStackConfig._load_vendor_file") @mock.patch("os_client_config.config.OpenStackConfig._load_vendor_file")
@mock.patch("os_client_config.config.OpenStackConfig._load_config_file") @mock.patch("os_client_config.config.OpenStackConfig._load_config_file")
@ -594,10 +438,10 @@ class TestShellCliPrecedence(TestShell):
"""Test environment overriding occ""" """Test environment overriding occ"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2)) config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell() _shell = utils.make_shell()
# Test env var # Test env var
fake_execute( utils.fake_execute(
_shell, _shell,
"module list", "module list",
) )
@ -642,10 +486,10 @@ class TestShellCliPrecedence(TestShell):
"""Test command line overriding environment and occ""" """Test command line overriding environment and occ"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2)) config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_2))
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell() _shell = utils.make_shell()
# Test command option overriding config file value # Test command option overriding config file value
fake_execute( utils.fake_execute(
_shell, _shell,
"--os-region-name krikkit list user", "--os-region-name krikkit list user",
) )
@ -690,10 +534,10 @@ class TestShellCliPrecedence(TestShell):
"""Test command line overriding environment and occ""" """Test command line overriding environment and occ"""
config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1)) config_mock.return_value = ('file.yaml', copy.deepcopy(CLOUD_1))
vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1)) vendor_mock.return_value = ('file.yaml', copy.deepcopy(PUBLIC_1))
_shell = make_shell() _shell = utils.make_shell()
# Test command option overriding config file value # Test command option overriding config file value
fake_execute( utils.fake_execute(
_shell, _shell,
"--os-cloud scc --os-region-name krikkit list user", "--os-cloud scc --os-region-name krikkit list user",
) )

View File

@ -14,14 +14,69 @@
# under the License. # under the License.
# #
import copy
import json as jsonutils
import mock
import os import os
import fixtures import fixtures
from keystoneauth1 import loading
from os_client_config import cloud_config
from requests_mock.contrib import fixture
import testtools import testtools
from osc_lib import clientmanager
from osc_lib import shell
from osc_lib.tests import fakes from osc_lib.tests import fakes
def fake_execute(shell, cmd):
"""Pretend to execute shell commands."""
return shell.run(cmd.split())
def make_shell(shell_class=None):
"""Create a new command shell and mock out some bits."""
if shell_class is None:
shell_class = shell.OpenStackShell
_shell = shell_class()
_shell.command_manager = mock.Mock()
# _shell.cloud = mock.Mock()
return _shell
def opt2attr(opt):
if opt.startswith('--os-'):
attr = opt[5:]
elif opt.startswith('--'):
attr = opt[2:]
else:
attr = opt
return attr.lower().replace('-', '_')
def opt2env(opt):
return opt[2:].upper().replace('-', '_')
class EnvFixture(fixtures.Fixture):
"""Environment Fixture.
This fixture replaces os.environ with provided env or an empty env.
"""
def __init__(self, env=None):
self.new_env = env or {}
def _setUp(self):
self.orig_env, os.environ = os.environ, self.new_env
self.addCleanup(self.revert)
def revert(self):
os.environ = self.orig_env
class ParserException(Exception): class ParserException(Exception):
pass pass
@ -73,3 +128,263 @@ class TestCommand(TestCase):
self.assertIn(attr, parsed_args) self.assertIn(attr, parsed_args)
self.assertEqual(value, getattr(parsed_args, attr)) self.assertEqual(value, getattr(parsed_args, attr))
return parsed_args return parsed_args
class TestClientManager(TestCase):
"""ClientManager class test framework"""
default_password_auth = {
'auth_url': fakes.AUTH_URL,
'username': fakes.USERNAME,
'password': fakes.PASSWORD,
'project_name': fakes.PROJECT_NAME,
}
default_token_auth = {
'auth_url': fakes.AUTH_URL,
'token': fakes.AUTH_TOKEN,
}
def setUp(self):
super(TestClientManager, self).setUp()
self.mock = mock.Mock()
self.requests = self.useFixture(fixture.Fixture())
# fake v2password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT)
# fake token and token_endpoint retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT,
url='/'.join([fakes.AUTH_URL, 'v2.0/tokens']))
# fake v3password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens']))
# fake password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'auth/tokens']))
# fake password version endpoint discovery
self.stub_auth(json=fakes.TEST_VERSIONS,
url=fakes.AUTH_URL,
verb='GET')
# Mock the auth plugin
self.auth_mock = mock.Mock()
def stub_auth(self, json=None, url=None, verb=None, **kwargs):
subject_token = fakes.AUTH_TOKEN
base_url = fakes.AUTH_URL
if json:
text = jsonutils.dumps(json)
headers = {
'X-Subject-Token': subject_token,
'Content-Type': 'application/json',
}
if not url:
url = '/'.join([base_url, 'tokens'])
url = url.replace("/?", "?")
if not verb:
verb = 'POST'
self.requests.register_uri(
verb,
url,
headers=headers,
text=text,
)
def _clientmanager_class(self):
"""Allow subclasses to override the ClientManager class"""
return clientmanager.ClientManager
def _make_clientmanager(
self,
auth_args=None,
config_args=None,
identity_api_version=None,
auth_plugin_name=None,
):
if identity_api_version is None:
identity_api_version = '2.0'
if auth_plugin_name is None:
auth_plugin_name = 'password'
if auth_plugin_name.endswith('password'):
auth_dict = copy.deepcopy(self.default_password_auth)
elif auth_plugin_name.endswith('token'):
auth_dict = copy.deepcopy(self.default_token_auth)
else:
auth_dict = {}
if auth_args is not None:
auth_dict = auth_args
cli_options = {
'auth_type': auth_plugin_name,
'auth': auth_dict,
'interface': fakes.INTERFACE,
'region_name': fakes.REGION_NAME,
}
if config_args is not None:
cli_options.update(config_args)
loader = loading.get_plugin_loader(auth_plugin_name)
auth_plugin = loader.load_from_options(**auth_dict)
client_manager = self._clientmanager_class()(
cli_options=cloud_config.CloudConfig(
name='t1',
region='1',
config=cli_options,
auth_plugin=auth_plugin,
),
api_version={
'identity': identity_api_version,
},
)
client_manager.setup_auth()
client_manager.auth_ref
self.assertEqual(
auth_plugin_name,
client_manager.auth_plugin_name,
)
return client_manager
class TestShell(TestCase):
# cliff.app.App subclass
app_patch = "osc_lib.shell.OpenStackShell"
def setUp(self):
super(TestShell, self).setUp()
self.cmd_patch = mock.patch(self.app_patch + ".run_subcommand")
self.cmd_save = self.cmd_patch.start()
self.addCleanup(self.cmd_patch.stop)
self.app = mock.Mock("Test Shell")
def _assert_initialize_app_arg(self, cmd_options, default_args):
"""Check the args passed to initialize_app()
The argv argument to initialize_app() is the remainder from parsing
global options declared in both cliff.app and
osc_lib.OpenStackShell build_option_parser(). Any global
options passed on the command line should not be in argv but in
_shell.options.
"""
with mock.patch(
self.app_patch + ".initialize_app",
self.app,
):
_shell, _cmd = make_shell(), cmd_options + " module list"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["module", "list"])
for k in default_args.keys():
self.assertEqual(
default_args[k],
vars(_shell.options)[k],
"%s does not match" % k,
)
def _assert_cloud_config_arg(self, cmd_options, default_args):
"""Check the args passed to cloud_config.get_one_cloud()
The argparse argument to get_one_cloud() is an argparse.Namespace
object that contains all of the options processed to this point in
initialize_app().
"""
cloud = mock.Mock(name="cloudy")
cloud.config = {}
self.occ_get_one = mock.Mock(return_value=cloud)
with mock.patch(
"os_client_config.config.OpenStackConfig.get_one_cloud",
self.occ_get_one,
):
_shell, _cmd = make_shell(), cmd_options + " module list"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["module", "list"])
opts = self.occ_get_one.call_args[1]['argparse']
for k in default_args.keys():
self.assertEqual(
default_args[k],
vars(opts)[k],
"%s does not match" % k,
)
def _assert_token_auth(self, cmd_options, default_args):
with mock.patch(self.app_patch + ".initialize_app",
self.app):
_shell, _cmd = make_shell(), cmd_options + " list role"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "role"])
self.assertEqual(
default_args.get("token", ''),
_shell.options.token,
"token"
)
self.assertEqual(
default_args.get("auth_url", ''),
_shell.options.auth_url,
"auth_url"
)
def _test_options_init_app(self, test_opts):
"""Test options on the command line"""
for opt in test_opts.keys():
if not test_opts[opt][1]:
continue
key = opt2attr(opt)
if isinstance(test_opts[opt][0], str):
cmd = opt + " " + test_opts[opt][0]
else:
cmd = opt
kwargs = {
key: test_opts[opt][0],
}
self._assert_initialize_app_arg(cmd, kwargs)
def _test_env_init_app(self, test_opts):
"""Test options in the environment"""
for opt in test_opts.keys():
if not test_opts[opt][2]:
continue
key = opt2attr(opt)
kwargs = {
key: test_opts[opt][0],
}
env = {
opt2env(opt): test_opts[opt][0],
}
os.environ = env.copy()
self._assert_initialize_app_arg("", kwargs)
def _test_options_get_one_cloud(self, test_opts):
"""Test options sent "to os_client_config"""
for opt in test_opts.keys():
if not test_opts[opt][1]:
continue
key = opt2attr(opt)
if isinstance(test_opts[opt][0], str):
cmd = opt + " " + test_opts[opt][0]
else:
cmd = opt
kwargs = {
key: test_opts[opt][0],
}
self._assert_cloud_config_arg(cmd, kwargs)
def _test_env_get_one_cloud(self, test_opts):
"""Test environment options sent "to os_client_config"""
for opt in test_opts.keys():
if not test_opts[opt][2]:
continue
key = opt2attr(opt)
kwargs = {
key: test_opts[opt][0],
}
env = {
opt2env(opt): test_opts[opt][0],
}
os.environ = env.copy()
self._assert_cloud_config_arg("", kwargs)