Add --os-cloud support

This adds a new option --os-cloud that allows the configuration values
for multiple clouds to be stored in a local file and selected with
a single option.

Internal option names have had 'os_' removed to be comptible with
the options returned from OpenStackConfig().get_one_cloud().

The config file is ~/.config/openstack/clouds.yaml:

Sample
------
clouds:
  devstack:
    auth:
      auth_url: http://192.168.122.10:35357/
      project_name: demo
      username: demo
      password: 0penstack
    region_name: RegionOne
  devstack:
     auth:
       auth_url: http://192.168.122.10:35357/
       project_name: demo
       username: demo
       password: 0penstack
     region_name: RegionOne

Co-Authored-By: Monty Taylor <mordred@inaugust.com>
Change-Id: I4939acf8067e44ffe06a2e26fc28f1adf8985b7d
Depends-On: I45e2550af58aee616ca168d20a557077beeab007
This commit is contained in:
Dean Troyer 2015-03-02 17:05:35 -06:00
parent a5e79d58ae
commit 5649695c65
9 changed files with 411 additions and 321 deletions

View File

@ -84,9 +84,17 @@ def base_parser(parser):
"""
# Global arguments
parser.add_argument(
'--os-cloud',
metavar='<cloud-config-name>',
dest='cloud',
default=env('OS_CLOUD'),
help='Cloud name in clouds.yaml (Env: OS_CLOUD)',
)
parser.add_argument(
'--os-region-name',
metavar='<auth-region-name>',
dest='region_name',
default=env('OS_REGION_NAME'),
help='Authentication region name (Env: OS_REGION_NAME)',
)

View File

@ -30,6 +30,8 @@ import common
from openstackclient.api import object_store_v1 as object_store
from openstackclient.identity import client as identity_client
from os_client_config import config as cloud_config
LOG = logging.getLogger('')
@ -37,6 +39,15 @@ LOG = logging.getLogger('')
def run(opts):
"""Run the examples"""
# Look for configuration file
# To support token-flow we have no required values
# print "options: %s" % self.options
cloud = cloud_config.OpenStackConfig().get_one_cloud(
cloud=opts.cloud,
argparse=opts,
)
LOG.debug("cloud cfg: %s", cloud.config)
# Set up certificate verification and CA bundle
# NOTE(dtroyer): This converts from the usual OpenStack way to the single
# requests argument and is an app-specific thing because
@ -52,13 +63,13 @@ def run(opts):
# The returned session will have a configured auth object
# based on the selected plugin's available options.
# So to do...oh, just go to api.auth.py and look at what it does.
session = common.make_session(opts, verify=verify)
session = common.make_session(cloud, verify=verify)
# Extract an endpoint
auth_ref = session.auth.get_auth_ref(session)
if opts.os_url:
endpoint = opts.os_url
if opts.url:
endpoint = opts.url
else:
endpoint = auth_ref.service_catalog.url_for(
service_type='object-store',

View File

@ -29,6 +29,8 @@ import common
from openstackclient.common import clientmanager
from os_client_config import config as cloud_config
LOG = logging.getLogger('')
@ -36,6 +38,16 @@ LOG = logging.getLogger('')
def run(opts):
"""Run the examples"""
# Do configuration file handling
cc = cloud_config.OpenStackConfig()
LOG.debug("defaults: %s", cc.defaults)
cloud = cc.get_one_cloud(
cloud=opts.cloud,
argparse=opts,
)
LOG.debug("cloud cfg: %s", cloud.config)
# Loop through extensions to get API versions
# Currently API versions are statically selected. Once discovery
# is working this can go away...
@ -59,7 +71,7 @@ def run(opts):
# Collect the auth and config options together and give them to
# ClientManager and it will wrangle all of the goons into place.
client_manager = clientmanager.ClientManager(
cli_options=opts,
cli_options=cloud,
verify=verify,
api_version=api_version,
)

View File

@ -77,25 +77,27 @@ def select_auth_plugin(options):
auth_plugin_name = None
if options.os_auth_type in [plugin.name for plugin in get_plugin_list()]:
# A direct plugin name was given, use it
return options.os_auth_type
if options.os_url and options.os_token:
# Do the token/url check first as this must override the default
# 'password' set by os-client-config
# Also, url and token are not copied into o-c-c's auth dict (yet?)
if options.auth.get('url', None) and options.auth.get('token', None):
# service token authentication
auth_plugin_name = 'token_endpoint'
elif options.os_username:
if options.os_identity_api_version == '3':
elif options.auth_type in [plugin.name for plugin in PLUGIN_LIST]:
# A direct plugin name was given, use it
auth_plugin_name = options.auth_type
elif options.auth.get('username', None):
if options.identity_api_version == '3':
auth_plugin_name = 'v3password'
elif options.os_identity_api_version == '2.0':
elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2password'
else:
# let keystoneclient figure it out itself
auth_plugin_name = 'osc_password'
elif options.os_token:
if options.os_identity_api_version == '3':
elif options.auth.get('token', None):
if options.identity_api_version == '3':
auth_plugin_name = 'v3token'
elif options.os_identity_api_version == '2.0':
elif options.identity_api_version.startswith('2'):
auth_plugin_name = 'v2token'
else:
# let keystoneclient figure it out itself
@ -109,35 +111,27 @@ def select_auth_plugin(options):
def build_auth_params(auth_plugin_name, cmd_options):
auth_params = {}
auth_params = dict(cmd_options.auth)
if auth_plugin_name:
LOG.debug('auth_type: %s', auth_plugin_name)
auth_plugin_class = base.get_plugin_class(auth_plugin_name)
plugin_options = auth_plugin_class.get_options()
for option in plugin_options:
option_name = 'os_' + option.dest
LOG.debug('fetching option %s' % option_name)
auth_params[option.dest] = getattr(cmd_options, option_name, None)
# grab tenant from project for v2.0 API compatibility
if auth_plugin_name.startswith("v2"):
auth_params['tenant_id'] = getattr(
cmd_options,
'os_project_id',
None,
)
auth_params['tenant_name'] = getattr(
cmd_options,
'os_project_name',
None,
)
if 'project_id' in auth_params:
auth_params['tenant_id'] = auth_params['project_id']
del auth_params['project_id']
if 'project_name' in auth_params:
auth_params['tenant_name'] = auth_params['project_name']
del auth_params['project_name']
else:
LOG.debug('no auth_type')
# delay the plugin choice, grab every option
auth_plugin_class = None
plugin_options = set([o.replace('-', '_') for o in get_options_list()])
for option in plugin_options:
option_name = 'os_' + option
LOG.debug('fetching option %s' % option_name)
auth_params[option] = getattr(cmd_options, option_name, None)
LOG.debug('fetching option %s' % option)
auth_params[option] = getattr(cmd_options.auth, option, None)
return (auth_plugin_class, auth_params)
@ -146,15 +140,29 @@ def check_valid_auth_options(options, auth_plugin_name):
msg = ''
if auth_plugin_name.endswith('password'):
if not options.os_username:
msg += _('Set a username with --os-username or OS_USERNAME\n')
if not options.os_auth_url:
msg += _('Set an authentication URL, with --os-auth-url or'
' OS_AUTH_URL\n')
if (not options.os_project_id and not options.os_domain_id and not
options.os_domain_name and not options.os_project_name):
if not options.auth.get('username', None):
msg += _('Set a username with --os-username, OS_USERNAME,'
' or auth.username\n')
if not options.auth.get('auth_url', None):
msg += _('Set an authentication URL, with --os-auth-url,'
' OS_AUTH_URL or auth.auth_url\n')
if (not options.auth.get('project_id', None) and not
options.auth.get('domain_id', None) and not
options.auth.get('domain_name', None) and not
options.auth.get('project_name', None)):
msg += _('Set a scope, such as a project or domain, with '
'--os-project-name or OS_PROJECT_NAME')
'--os-project-name, OS_PROJECT_NAME or auth.project_name')
elif auth_plugin_name.endswith('token'):
if not options.auth.get('token', None):
msg += _('Set a token with --os-token, OS_TOKEN or auth.token\n')
if not options.auth.get('auth_url', None):
msg += _('Set a service AUTH_URL, with --os-auth-url, '
'OS_AUTH_URL or auth.auth_url\n')
elif auth_plugin_name == 'token_endpoint':
if not options.auth.get('token', None):
msg += _('Set a token with --os-token, OS_TOKEN or auth.token\n')
if not options.auth.get('url', None):
msg += _('Set a service URL, with --os-url, OS_URL or auth.url\n')
if msg:
raise exc.CommandError('Missing parameter(s): \n%s' % msg)
@ -171,6 +179,7 @@ def build_auth_plugins_option_parser(parser):
parser.add_argument(
'--os-auth-type',
metavar='<auth-type>',
dest='auth_type',
default=utils.env('OS_AUTH_TYPE'),
help='Select an auhentication type. Available types: ' +
', '.join(available_plugins) +
@ -178,7 +187,7 @@ def build_auth_plugins_option_parser(parser):
' (Env: OS_AUTH_TYPE)',
choices=available_plugins
)
# make sure we catch old v2.0 env values
# Maintain compatibility with old tenant env vars
envs = {
'OS_PROJECT_NAME': utils.env(
'OS_PROJECT_NAME',
@ -190,15 +199,20 @@ def build_auth_plugins_option_parser(parser):
),
}
for o in get_options_list():
# remove allusion to tenants from v2.0 API
# Remove tenant options from KSC plugins and replace them below
if 'tenant' not in o:
parser.add_argument(
'--os-' + o,
metavar='<auth-%s>' % o,
default=envs.get(OPTIONS_LIST[o]['env'],
utils.env(OPTIONS_LIST[o]['env'])),
help='%s\n(Env: %s)' % (OPTIONS_LIST[o]['help'],
OPTIONS_LIST[o]['env']),
dest=o.replace('-', '_'),
default=envs.get(
OPTIONS_LIST[o]['env'],
utils.env(OPTIONS_LIST[o]['env']),
),
help='%s\n(Env: %s)' % (
OPTIONS_LIST[o]['help'],
OPTIONS_LIST[o]['env'],
),
)
# add tenant-related options for compatibility
# this is deprecated but still used in some tempest tests...
@ -206,14 +220,12 @@ def build_auth_plugins_option_parser(parser):
'--os-tenant-name',
metavar='<auth-tenant-name>',
dest='os_project_name',
default=utils.env('OS_TENANT_NAME'),
help=argparse.SUPPRESS,
)
parser.add_argument(
'--os-tenant-id',
metavar='<auth-tenant-id>',
dest='os_project_id',
default=utils.env('OS_TENANT_ID'),
help=argparse.SUPPRESS,
)
return parser

View File

@ -58,7 +58,7 @@ class ClientManager(object):
def __init__(
self,
cli_options,
cli_options=None,
api_version=None,
verify=True,
pw_func=None,
@ -82,8 +82,8 @@ class ClientManager(object):
self._cli_options = cli_options
self._api_version = api_version
self._pw_callback = pw_func
self._url = self._cli_options.os_url
self._region_name = self._cli_options.os_region_name
self._url = self._cli_options.auth.get('url', None)
self._region_name = self._cli_options.region_name
self.timing = self._cli_options.timing
@ -121,7 +121,7 @@ class ClientManager(object):
# Horrible hack alert...must handle prompt for null password if
# password auth is requested.
if (self.auth_plugin_name.endswith('password') and
not self._cli_options.os_password):
not self._cli_options.auth.get('password', None)):
self._cli_options.os_password = self._pw_callback()
(auth_plugin, self._auth_params) = auth.build_auth_params(
@ -129,13 +129,15 @@ class ClientManager(object):
self._cli_options,
)
default_domain = self._cli_options.os_default_domain
# TODO(mordred): This is a usability improvement that's broadly useful
# We should port it back up into os-client-config.
default_domain = self._cli_options.default_domain
# NOTE(stevemar): If PROJECT_DOMAIN_ID or PROJECT_DOMAIN_NAME is
# present, then do not change the behaviour. Otherwise, set the
# PROJECT_DOMAIN_ID to 'OS_DEFAULT_DOMAIN' for better usability.
if (self._api_version.get('identity') == '3' and
not self._auth_params.get('project_domain_id') and
not self._auth_params.get('project_domain_name')):
not self._auth_params.get('project_domain_id', None) and
not self._auth_params.get('project_domain_name', None)):
self._auth_params['project_domain_id'] = default_domain
# NOTE(stevemar): If USER_DOMAIN_ID or USER_DOMAIN_NAME is present,
@ -143,8 +145,8 @@ class ClientManager(object):
# to 'OS_DEFAULT_DOMAIN' for better usability.
if (self._api_version.get('identity') == '3' and
self.auth_plugin_name.endswith('password') and
not self._auth_params.get('user_domain_id') and
not self._auth_params.get('user_domain_name')):
not self._auth_params.get('user_domain_id', None) and
not self._auth_params.get('user_domain_name', None)):
self._auth_params['user_domain_id'] = default_domain
# For compatibility until all clients can be updated

View File

@ -33,6 +33,8 @@ from openstackclient.common import exceptions as exc
from openstackclient.common import timing
from openstackclient.common import utils
from os_client_config import config as cloud_config
DEFAULT_DOMAIN = 'default'
@ -86,10 +88,6 @@ class OpenStackShell(app.App):
# Until we have command line arguments parsed, dump any stack traces
self.dump_stack_trace = True
# This is instantiated in initialize_app() only when using
# password flow auth
self.auth_client = None
# Assume TLS host certificate verification is enabled
self.verify = True
@ -165,10 +163,19 @@ class OpenStackShell(app.App):
description,
version)
# service token auth argument
parser.add_argument(
'--os-cloud',
metavar='<cloud-config-name>',
dest='cloud',
default=utils.env('OS_CLOUD'),
help='Cloud name in clouds.yaml (Env: OS_CLOUD)',
)
# Global arguments
parser.add_argument(
'--os-region-name',
metavar='<auth-region-name>',
dest='region_name',
default=utils.env('OS_REGION_NAME'),
help='Authentication region name (Env: OS_REGION_NAME)')
parser.add_argument(
@ -213,8 +220,43 @@ class OpenStackShell(app.App):
* authenticate against Identity if requested
"""
# Parent __init__ parses argv into self.options
super(OpenStackShell, self).initialize_app(argv)
# Resolve the verify/insecure exclusive pair here as cloud_config
# doesn't know about verify
self.options.insecure = (
self.options.insecure and not self.options.verify
)
# Set the default plugin to token_endpoint if rl and token are given
if (self.options.url and self.options.token):
# Use service token authentication
cloud_config.set_default('auth_type', 'token_endpoint')
else:
cloud_config.set_default('auth_type', 'osc_password')
self.log.debug("options: %s", self.options)
# Do configuration file handling
cc = cloud_config.OpenStackConfig()
self.log.debug("defaults: %s", cc.defaults)
self.cloud = cc.get_one_cloud(
cloud=self.options.cloud,
argparse=self.options,
)
self.log.debug("cloud cfg: %s", self.cloud.config)
# Set up client TLS
cacert = self.cloud.cacert
if cacert:
self.verify = cacert
else:
self.verify = not getattr(self.cloud.config, 'insecure', False)
# Neutralize verify option
self.options.verify = None
# Save default domain
self.default_domain = self.options.os_default_domain
@ -260,14 +302,8 @@ class OpenStackShell(app.App):
# Handle deferred help and exit
self.print_help_if_requested()
# Set up common client session
if self.options.os_cacert:
self.verify = self.options.os_cacert
else:
self.verify = not self.options.insecure
self.client_manager = clientmanager.ClientManager(
cli_options=self.options,
cli_options=self.cloud,
verify=self.verify,
api_version=self.api_version,
pw_func=prompt_for_password,

View File

@ -48,13 +48,14 @@ class Container(object):
class FakeOptions(object):
def __init__(self, **kwargs):
for option in auth.OPTIONS_LIST:
setattr(self, 'os_' + option.replace('-', '_'), None)
self.os_auth_type = None
self.os_identity_api_version = '2.0'
setattr(self, option.replace('-', '_'), None)
self.auth_type = None
self.identity_api_version = '2.0'
self.timing = None
self.os_region_name = None
self.os_url = None
self.os_default_domain = 'default'
self.region_name = None
self.url = None
self.auth = {}
self.default_domain = 'default'
self.__dict__.update(kwargs)
@ -86,9 +87,11 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
os_token=fakes.AUTH_TOKEN,
os_url=fakes.AUTH_URL,
os_auth_type='token_endpoint',
auth_type='token_endpoint',
auth=dict(
token=fakes.AUTH_TOKEN,
url=fakes.AUTH_URL,
),
),
api_version=API_VERSION,
verify=True
@ -114,9 +117,11 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
os_token=fakes.AUTH_TOKEN,
os_auth_url=fakes.AUTH_URL,
os_auth_type='v2token',
auth=dict(
token=fakes.AUTH_TOKEN,
auth_url=fakes.AUTH_URL,
),
auth_type='v2token',
),
api_version=API_VERSION,
verify=True
@ -138,10 +143,12 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
os_auth_url=fakes.AUTH_URL,
os_username=fakes.USERNAME,
os_password=fakes.PASSWORD,
os_project_name=fakes.PROJECT_NAME,
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
),
api_version=API_VERSION,
verify=False,
@ -198,11 +205,13 @@ class TestClientManager(utils.TestCase):
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(
os_auth_url=fakes.AUTH_URL,
os_username=fakes.USERNAME,
os_password=fakes.PASSWORD,
os_project_name=fakes.PROJECT_NAME,
os_auth_type='v2password',
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
auth_type='v2password',
),
api_version=API_VERSION,
verify='cafile',
@ -214,8 +223,8 @@ class TestClientManager(utils.TestCase):
self.assertEqual('cafile', client_manager._cacert)
def _select_auth_plugin(self, auth_params, api_version, auth_plugin_name):
auth_params['os_auth_type'] = auth_plugin_name
auth_params['os_identity_api_version'] = api_version
auth_params['auth_type'] = auth_plugin_name
auth_params['identity_api_version'] = api_version
client_manager = clientmanager.ClientManager(
cli_options=FakeOptions(**auth_params),
api_version=API_VERSION,
@ -230,19 +239,33 @@ class TestClientManager(utils.TestCase):
def test_client_manager_select_auth_plugin(self):
# test token auth
params = dict(os_token=fakes.AUTH_TOKEN,
os_auth_url=fakes.AUTH_URL)
params = dict(
auth=dict(
auth_url=fakes.AUTH_URL,
token=fakes.AUTH_TOKEN,
),
)
self._select_auth_plugin(params, '2.0', 'v2token')
self._select_auth_plugin(params, '3', 'v3token')
self._select_auth_plugin(params, 'XXX', 'token')
# test token/endpoint auth
params = dict(os_token=fakes.AUTH_TOKEN, os_url='test')
params = dict(
auth_plugin='token_endpoint',
auth=dict(
url='test',
token=fakes.AUTH_TOKEN,
),
)
self._select_auth_plugin(params, 'XXX', 'token_endpoint')
# test password auth
params = dict(os_auth_url=fakes.AUTH_URL,
os_username=fakes.USERNAME,
os_password=fakes.PASSWORD,
os_project_name=fakes.PROJECT_NAME)
params = dict(
auth=dict(
auth_url=fakes.AUTH_URL,
username=fakes.USERNAME,
password=fakes.PASSWORD,
project_name=fakes.PROJECT_NAME,
),
)
self._select_auth_plugin(params, '2.0', 'v2password')
self._select_auth_plugin(params, '3', 'v3password')
self._select_auth_plugin(params, 'XXX', 'password')

View File

@ -82,34 +82,62 @@ class TestShell(utils.TestCase):
fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "project"])
self.assertEqual(default_args["auth_url"],
_shell.options.os_auth_url)
self.assertEqual(default_args["project_id"],
_shell.options.os_project_id)
self.assertEqual(default_args["project_name"],
_shell.options.os_project_name)
self.assertEqual(default_args["domain_id"],
_shell.options.os_domain_id)
self.assertEqual(default_args["domain_name"],
_shell.options.os_domain_name)
self.assertEqual(default_args["user_domain_id"],
_shell.options.os_user_domain_id)
self.assertEqual(default_args["user_domain_name"],
_shell.options.os_user_domain_name)
self.assertEqual(default_args["project_domain_id"],
_shell.options.os_project_domain_id)
self.assertEqual(default_args["project_domain_name"],
_shell.options.os_project_domain_name)
self.assertEqual(default_args["username"],
_shell.options.os_username)
self.assertEqual(default_args["password"],
_shell.options.os_password)
self.assertEqual(default_args["region_name"],
_shell.options.os_region_name)
self.assertEqual(default_args["trust_id"],
_shell.options.os_trust_id)
self.assertEqual(default_args['auth_type'],
_shell.options.os_auth_type)
self.assertEqual(
default_args.get("auth_url", ''),
_shell.options.auth_url,
)
self.assertEqual(
default_args.get("project_id", ''),
_shell.options.project_id,
)
self.assertEqual(
default_args.get("project_name", ''),
_shell.options.project_name,
)
self.assertEqual(
default_args.get("domain_id", ''),
_shell.options.domain_id,
)
self.assertEqual(
default_args.get("domain_name", ''),
_shell.options.domain_name,
)
self.assertEqual(
default_args.get("user_domain_id", ''),
_shell.options.user_domain_id,
)
self.assertEqual(
default_args.get("user_domain_name", ''),
_shell.options.user_domain_name,
)
self.assertEqual(
default_args.get("project_domain_id", ''),
_shell.options.project_domain_id,
)
self.assertEqual(
default_args.get("project_domain_name", ''),
_shell.options.project_domain_name,
)
self.assertEqual(
default_args.get("username", ''),
_shell.options.username,
)
self.assertEqual(
default_args.get("password", ''),
_shell.options.password,
)
self.assertEqual(
default_args.get("region_name", ''),
_shell.options.region_name,
)
self.assertEqual(
default_args.get("trust_id", ''),
_shell.options.trust_id,
)
self.assertEqual(
default_args.get('auth_type', ''),
_shell.options.auth_type,
)
def _assert_token_auth(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
@ -118,9 +146,34 @@ class TestShell(utils.TestCase):
fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "role"])
self.assertEqual(default_args["os_token"], _shell.options.os_token)
self.assertEqual(default_args["os_auth_url"],
_shell.options.os_auth_url)
self.assertEqual(
default_args.get("token", ''),
_shell.options.token,
"token"
)
self.assertEqual(
default_args.get("auth_url", ''),
_shell.options.auth_url,
"auth_url"
)
def _assert_token_endpoint_auth(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
self.app):
_shell, _cmd = make_shell(), cmd_options + " list role"
fake_execute(_shell, _cmd)
self.app.assert_called_with(["list", "role"])
self.assertEqual(
default_args.get("token", ''),
_shell.options.token,
"token",
)
self.assertEqual(
default_args.get("url", ''),
_shell.options.url,
"url",
)
def _assert_cli(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
@ -178,301 +231,159 @@ class TestShellPasswordAuth(TestShell):
flag = "--os-auth-url " + DEFAULT_AUTH_URL
kwargs = {
"auth_url": DEFAULT_AUTH_URL,
"project_id": "",
"project_name": "",
"user_domain_id": "",
"domain_id": "",
"domain_name": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_project_id_flow(self):
flag = "--os-project-id " + DEFAULT_PROJECT_ID
kwargs = {
"auth_url": "",
"project_id": DEFAULT_PROJECT_ID,
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_project_name_flow(self):
flag = "--os-project-name " + DEFAULT_PROJECT_NAME
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": DEFAULT_PROJECT_NAME,
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_domain_id_flow(self):
flag = "--os-domain-id " + DEFAULT_DOMAIN_ID
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": DEFAULT_DOMAIN_ID,
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_domain_name_flow(self):
flag = "--os-domain-name " + DEFAULT_DOMAIN_NAME
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": DEFAULT_DOMAIN_NAME,
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_user_domain_id_flow(self):
flag = "--os-user-domain-id " + DEFAULT_USER_DOMAIN_ID
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": DEFAULT_USER_DOMAIN_ID,
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_user_domain_name_flow(self):
flag = "--os-user-domain-name " + DEFAULT_USER_DOMAIN_NAME
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": DEFAULT_USER_DOMAIN_NAME,
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_project_domain_id_flow(self):
flag = "--os-project-domain-id " + DEFAULT_PROJECT_DOMAIN_ID
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": DEFAULT_PROJECT_DOMAIN_ID,
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_project_domain_name_flow(self):
flag = "--os-project-domain-name " + DEFAULT_PROJECT_DOMAIN_NAME
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": DEFAULT_PROJECT_DOMAIN_NAME,
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_username_flow(self):
flag = "--os-username " + DEFAULT_USERNAME
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": DEFAULT_USERNAME,
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_password_flow(self):
flag = "--os-password " + DEFAULT_PASSWORD
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": DEFAULT_PASSWORD,
"region_name": "",
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_region_name_flow(self):
flag = "--os-region-name " + DEFAULT_REGION_NAME
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": DEFAULT_REGION_NAME,
"trust_id": "",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_trust_id_flow(self):
flag = "--os-trust-id " + "1234"
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "1234",
"auth_type": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_auth_type_flow(self):
flag = "--os-auth-type " + "v2password"
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_type": DEFAULT_AUTH_PLUGIN
}
self._assert_password_auth(flag, kwargs)
class TestShellTokenAuth(TestShell):
def test_only_token(self):
flag = "--os-token " + DEFAULT_TOKEN
kwargs = {
"token": DEFAULT_TOKEN,
"auth_url": '',
}
self._assert_token_auth(flag, kwargs)
def test_only_auth_url(self):
flag = "--os-auth-url " + DEFAULT_AUTH_URL
kwargs = {
"token": '',
"auth_url": DEFAULT_AUTH_URL,
}
self._assert_token_auth(flag, kwargs)
def test_empty_auth(self):
os.environ = {}
flag = ""
kwargs = {}
self._assert_token_auth(flag, kwargs)
class TestShellTokenAuthEnv(TestShell):
def setUp(self):
super(TestShellTokenAuth, self).setUp()
super(TestShellTokenAuthEnv, self).setUp()
env = {
"OS_TOKEN": DEFAULT_TOKEN,
"OS_AUTH_URL": DEFAULT_SERVICE_URL,
"OS_AUTH_URL": DEFAULT_AUTH_URL,
}
self.orig_env, os.environ = os.environ, env.copy()
def tearDown(self):
super(TestShellTokenAuth, self).tearDown()
super(TestShellTokenAuthEnv, self).tearDown()
os.environ = self.orig_env
def test_default_auth(self):
def test_env(self):
flag = ""
kwargs = {
"os_token": DEFAULT_TOKEN,
"os_auth_url": DEFAULT_SERVICE_URL
"token": DEFAULT_TOKEN,
"auth_url": DEFAULT_AUTH_URL,
}
self._assert_token_auth(flag, kwargs)
def test_only_token(self):
flag = "--os-token xyzpdq"
kwargs = {
"token": "xyzpdq",
"auth_url": DEFAULT_AUTH_URL,
}
self._assert_token_auth(flag, kwargs)
def test_only_auth_url(self):
flag = "--os-auth-url http://cloud.local:555"
kwargs = {
"token": DEFAULT_TOKEN,
"auth_url": "http://cloud.local:555",
}
self._assert_token_auth(flag, kwargs)
@ -480,8 +391,82 @@ class TestShellTokenAuth(TestShell):
os.environ = {}
flag = ""
kwargs = {
"os_token": "",
"os_auth_url": ""
"token": '',
"auth_url": '',
}
self._assert_token_auth(flag, kwargs)
class TestShellTokenEndpointAuth(TestShell):
def test_only_token(self):
flag = "--os-token " + DEFAULT_TOKEN
kwargs = {
"token": DEFAULT_TOKEN,
"url": '',
}
self._assert_token_endpoint_auth(flag, kwargs)
def test_only_url(self):
flag = "--os-url " + DEFAULT_SERVICE_URL
kwargs = {
"token": '',
"url": DEFAULT_SERVICE_URL,
}
self._assert_token_endpoint_auth(flag, kwargs)
def test_empty_auth(self):
os.environ = {}
flag = ""
kwargs = {
"token": '',
"auth_url": '',
}
self._assert_token_endpoint_auth(flag, kwargs)
class TestShellTokenEndpointAuthEnv(TestShell):
def setUp(self):
super(TestShellTokenEndpointAuthEnv, self).setUp()
env = {
"OS_TOKEN": DEFAULT_TOKEN,
"OS_URL": DEFAULT_SERVICE_URL,
}
self.orig_env, os.environ = os.environ, env.copy()
def tearDown(self):
super(TestShellTokenEndpointAuthEnv, self).tearDown()
os.environ = self.orig_env
def test_env(self):
flag = ""
kwargs = {
"token": DEFAULT_TOKEN,
"url": DEFAULT_SERVICE_URL,
}
self._assert_token_auth(flag, kwargs)
def test_only_token(self):
flag = "--os-token xyzpdq"
kwargs = {
"token": "xyzpdq",
"url": DEFAULT_SERVICE_URL,
}
self._assert_token_auth(flag, kwargs)
def test_only_url(self):
flag = "--os-url http://cloud.local:555"
kwargs = {
"token": DEFAULT_TOKEN,
"url": "http://cloud.local:555",
}
self._assert_token_auth(flag, kwargs)
def test_empty_auth(self):
os.environ = {}
flag = ""
kwargs = {
"token": '',
"url": '',
}
self._assert_token_auth(flag, kwargs)

View File

@ -7,6 +7,7 @@ six>=1.9.0
Babel>=1.3
cliff>=1.10.0 # Apache-2.0
cliff-tablib>=1.0
os-client-config
oslo.config>=1.9.3 # Apache-2.0
oslo.i18n>=1.5.0 # Apache-2.0
oslo.utils>=1.4.0 # Apache-2.0