Support for keystone auth plugins

This patch allows the user to choose which authentication plugin
to use with the CLI. The arguments needed by the auth plugins are
automatically added to the argument parser. Some examples with
the currently available authentication plugins::

  OS_USERNAME=admin OS_PROJECT_NAME=admin OS_AUTH_URL=http://keystone:5000/v2.0 \
  OS_PASSWORD=admin openstack user list

  OS_USERNAME=admin OS_PROJECT_DOMAIN_NAME=default OS_USER_DOMAIN_NAME=default \
  OS_PROJECT_NAME=admin OS_AUTH_URL=http://keystone:5000/v3 OS_PASSWORD=admin \
  OS_IDENTITY_API_VERSION=3 OS_AUTH_PLUGIN=v3password openstack project list

  OS_TOKEN=1234 OS_URL=http://service_url:35357/v2.0 \
  OS_IDENTITY_API_VERSION=2.0 openstack user list

The --os-auth-plugin option can be omitted; if so the CLI will attempt to
guess which plugin to use from the other options.

Change-Id: I330c20ddb8d96b3a4287c68b57c36c4a0f869669
Co-Authored-By: Florent Flament <florent.flament-ext@cloudwatt.com>
This commit is contained in:
Matthieu Huin 2014-07-18 19:18:25 +02:00
parent 16326ce608
commit 21e877cf85
7 changed files with 573 additions and 289 deletions

180
openstackclient/api/auth.py Normal file
View File

@ -0,0 +1,180 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Authentication Library"""
import argparse
import logging
import stevedore
from keystoneclient.auth import base
from openstackclient.common import exceptions as exc
from openstackclient.common import utils
LOG = logging.getLogger(__name__)
# Initialize the list of Authentication plugins early in order
# to get the command-line options
PLUGIN_LIST = stevedore.ExtensionManager(
base.PLUGIN_NAMESPACE,
invoke_on_load=False,
propagate_map_exceptions=True,
)
# TODO(dtroyer): add some method to list the plugins for the
# --os_auth_plugin option
# Get the command line options so the help action has them available
OPTIONS_LIST = {}
for plugin in PLUGIN_LIST:
for o in plugin.plugin.get_options():
os_name = o.dest.lower().replace('_', '-')
os_env_name = 'OS_' + os_name.upper().replace('-', '_')
OPTIONS_LIST.setdefault(os_name, {'env': os_env_name, 'help': ''})
# TODO(mhu) simplistic approach, would be better to only add
# help texts if they vary from one auth plugin to another
# also the text rendering is ugly in the CLI ...
OPTIONS_LIST[os_name]['help'] += 'With %s: %s\n' % (
plugin.name,
o.help,
)
def _guess_authentication_method(options):
"""If no auth plugin was specified, pick one based on other options"""
if options.os_url:
# service token authentication, do nothing
return
auth_plugin = None
if options.os_password:
if options.os_identity_api_version == '3':
auth_plugin = 'v3password'
elif options.os_identity_api_version == '2.0':
auth_plugin = 'v2password'
else:
# let keystoneclient figure it out itself
auth_plugin = 'password'
elif options.os_token:
if options.os_identity_api_version == '3':
auth_plugin = 'v3token'
elif options.os_identity_api_version == '2.0':
auth_plugin = 'v2token'
else:
# let keystoneclient figure it out itself
auth_plugin = 'token'
else:
raise exc.CommandError(
"Could not figure out which authentication method "
"to use, please set --os-auth-plugin"
)
LOG.debug("No auth plugin selected, picking %s from other "
"options" % auth_plugin)
options.os_auth_plugin = auth_plugin
def build_auth_params(cmd_options):
auth_params = {}
if cmd_options.os_url:
return {'token': cmd_options.os_token}
if cmd_options.os_auth_plugin:
auth_plugin = base.get_plugin_class(cmd_options.os_auth_plugin)
plugin_options = auth_plugin.get_options()
for option in plugin_options:
option_name = 'os_' + option.dest
LOG.debug('fetching option %s' % option_name)
auth_params[option.dest] = getattr(cmd_options, option_name, None)
# grab tenant from project for v2.0 API compatibility
if cmd_options.os_auth_plugin.startswith("v2"):
auth_params['tenant_id'] = getattr(
cmd_options,
'os_project_id',
None,
)
auth_params['tenant_name'] = getattr(
cmd_options,
'os_project_name',
None,
)
else:
# delay the plugin choice, grab every option
plugin_options = set([o.replace('-', '_') for o in OPTIONS_LIST])
for option in plugin_options:
option_name = 'os_' + option
LOG.debug('fetching option %s' % option_name)
auth_params[option] = getattr(cmd_options, option_name, None)
return auth_params
def build_auth_plugins_option_parser(parser):
"""Auth plugins options builder
Builds dynamically the list of options expected by each available
authentication plugin.
"""
available_plugins = [plugin.name for plugin in PLUGIN_LIST]
parser.add_argument(
'--os-auth-plugin',
metavar='<OS_AUTH_PLUGIN>',
default=utils.env('OS_AUTH_PLUGIN'),
help='The authentication method to use. If this option is not set, '
'openstackclient will attempt to guess the authentication method '
'to use based on the other options. If this option is set, '
'the --os-identity-api-version argument must be consistent '
'with the version of the method.\nAvailable methods are ' +
', '.join(available_plugins),
choices=available_plugins
)
# make sur we catch old v2.0 env values
envs = {
'OS_PROJECT_NAME': utils.env(
'OS_PROJECT_NAME',
default=utils.env('OS_TENANT_NAME')
),
'OS_PROJECT_ID': utils.env(
'OS_PROJECT_ID',
default=utils.env('OS_TENANT_ID')
),
}
for o in OPTIONS_LIST:
# remove allusion to tenants from v2.0 API
if 'tenant' not in o:
parser.add_argument(
'--os-' + o,
metavar='<auth-%s>' % o,
default=envs.get(OPTIONS_LIST[o]['env'],
utils.env(OPTIONS_LIST[o]['env'])),
help='%s\n(Env: %s)' % (OPTIONS_LIST[o]['help'],
OPTIONS_LIST[o]['env']),
)
# add tenant-related options for compatibility
# this is deprecated but still used in some tempest tests...
parser.add_argument(
'--os-tenant-name',
metavar='<auth-tenant-name>',
dest='os_project_name',
default=utils.env('OS_TENANT_NAME'),
help=argparse.SUPPRESS,
)
parser.add_argument(
'--os-tenant-id',
metavar='<auth-tenant-id>',
dest='os_project_id',
default=utils.env('OS_TENANT_ID'),
help=argparse.SUPPRESS,
)
return parser

View File

@ -19,9 +19,11 @@ import logging
import pkg_resources
import sys
from keystoneclient.auth.identity import v2 as v2_auth
from keystoneclient.auth.identity import v3 as v3_auth
from keystoneclient.auth import base
from keystoneclient import session
import requests
from openstackclient.api import auth
from openstackclient.identity import client as identity_client
@ -45,105 +47,66 @@ class ClientManager(object):
"""Manages access to API clients, including authentication."""
identity = ClientCache(identity_client.make_client)
def __init__(self, token=None, url=None, auth_url=None,
domain_id=None, domain_name=None,
project_name=None, project_id=None,
username=None, password=None,
user_domain_id=None, user_domain_name=None,
project_domain_id=None, project_domain_name=None,
region_name=None, api_version=None, verify=True,
trust_id=None, timing=None):
self._token = token
self._url = url
self._auth_url = auth_url
self._domain_id = domain_id
self._domain_name = domain_name
self._project_name = project_name
self._project_id = project_id
self._username = username
self._password = password
self._user_domain_id = user_domain_id
self._user_domain_name = user_domain_name
self._project_domain_id = project_domain_id
self._project_domain_name = project_domain_name
self._region_name = region_name
def __getattr__(self, name):
# this is for the auth-related parameters.
if name in ['_' + o.replace('-', '_')
for o in auth.OPTIONS_LIST]:
return self._auth_params[name[1:]]
def __init__(self, auth_options, api_version=None, verify=True):
if not auth_options.os_auth_plugin:
auth._guess_authentication_method(auth_options)
self._auth_plugin = auth_options.os_auth_plugin
self._url = auth_options.os_url
self._auth_params = auth.build_auth_params(auth_options)
self._region_name = auth_options.os_region_name
self._api_version = api_version
self._trust_id = trust_id
self._service_catalog = None
self.timing = timing
self.timing = auth_options.timing
# For compatability until all clients can be updated
if 'project_name' in self._auth_params:
self._project_name = self._auth_params['project_name']
elif 'tenant_name' in self._auth_params:
self._project_name = self._auth_params['tenant_name']
# verify is the Requests-compatible form
self._verify = verify
# also store in the form used by the legacy client libs
self._cacert = None
if verify is True or verify is False:
if isinstance(verify, bool):
self._insecure = not verify
else:
self._cacert = verify
self._insecure = False
ver_prefix = identity_client.AUTH_VERSIONS[
self._api_version[identity_client.API_NAME]
]
# Get logging from root logger
root_logger = logging.getLogger('')
LOG.setLevel(root_logger.getEffectiveLevel())
# NOTE(dtroyer): These plugins are hard-coded for the first step
# in using the new Keystone auth plugins.
if self._url:
LOG.debug('Using token auth %s', ver_prefix)
if ver_prefix == 'v2':
self.auth = v2_auth.Token(
auth_url=url,
token=token,
)
else:
self.auth = v3_auth.Token(
auth_url=url,
token=token,
)
else:
LOG.debug('Using password auth %s', ver_prefix)
if ver_prefix == 'v2':
self.auth = v2_auth.Password(
auth_url=auth_url,
username=username,
password=password,
trust_id=trust_id,
tenant_id=project_id,
tenant_name=project_name,
)
else:
self.auth = v3_auth.Password(
auth_url=auth_url,
username=username,
password=password,
trust_id=trust_id,
user_domain_id=user_domain_id,
user_domain_name=user_domain_name,
domain_id=domain_id,
domain_name=domain_name,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
)
self.session = session.Session(
auth=self.auth,
verify=verify,
)
self.session = None
if not self._url:
LOG.debug('Using auth plugin: %s' % self._auth_plugin)
auth_plugin = base.get_plugin_class(self._auth_plugin)
self.auth = auth_plugin.load_from_options(**self._auth_params)
# needed by SAML authentication
request_session = requests.session()
self.session = session.Session(
auth=self.auth,
session=request_session,
verify=verify,
)
self.auth_ref = None
if not self._url:
# Trigger the auth call
if not self._auth_plugin.endswith("token") and not self._url:
LOG.debug("Populate other password flow attributes")
self.auth_ref = self.session.auth.get_auth_ref(self.session)
# Populate other password flow attributes
self._token = self.session.auth.get_token(self.session)
self._service_catalog = self.auth_ref.service_catalog
else:
self._token = self._auth_params.get('token')
return
@ -156,7 +119,7 @@ class ClientManager(object):
service_type=service_type)
else:
# Hope we were given the correct URL.
endpoint = self._url
endpoint = self._auth_url or self._url
return endpoint

View File

@ -15,7 +15,6 @@
"""Command-line interface to the OpenStack APIs"""
import argparse
import getpass
import logging
import sys
@ -171,89 +170,13 @@ class OpenStackShell(app.App):
parser = super(OpenStackShell, self).build_option_parser(
description,
version)
# service token auth argument
parser.add_argument(
'--os-url',
metavar='<url>',
default=utils.env('OS_URL'),
help='Defaults to env[OS_URL]')
# Global arguments
parser.add_argument(
'--os-auth-url',
metavar='<auth-url>',
default=utils.env('OS_AUTH_URL'),
help='Authentication URL (Env: OS_AUTH_URL)')
parser.add_argument(
'--os-domain-name',
metavar='<auth-domain-name>',
default=utils.env('OS_DOMAIN_NAME'),
help='Domain name of the requested domain-level '
'authorization scope (Env: OS_DOMAIN_NAME)',
)
parser.add_argument(
'--os-domain-id',
metavar='<auth-domain-id>',
default=utils.env('OS_DOMAIN_ID'),
help='Domain ID of the requested domain-level '
'authorization scope (Env: OS_DOMAIN_ID)',
)
parser.add_argument(
'--os-project-name',
metavar='<auth-project-name>',
default=utils.env('OS_PROJECT_NAME',
default=utils.env('OS_TENANT_NAME')),
help='Project name of the requested project-level '
'authorization scope (Env: OS_PROJECT_NAME)',
)
parser.add_argument(
'--os-tenant-name',
metavar='<auth-tenant-name>',
dest='os_project_name',
help=argparse.SUPPRESS,
)
parser.add_argument(
'--os-project-id',
metavar='<auth-project-id>',
default=utils.env('OS_PROJECT_ID',
default=utils.env('OS_TENANT_ID')),
help='Project ID of the requested project-level '
'authorization scope (Env: OS_PROJECT_ID)',
)
parser.add_argument(
'--os-tenant-id',
metavar='<auth-tenant-id>',
dest='os_project_id',
help=argparse.SUPPRESS,
)
parser.add_argument(
'--os-username',
metavar='<auth-username>',
default=utils.env('OS_USERNAME'),
help='Authentication username (Env: OS_USERNAME)')
parser.add_argument(
'--os-password',
metavar='<auth-password>',
default=utils.env('OS_PASSWORD'),
help='Authentication password (Env: OS_PASSWORD)')
parser.add_argument(
'--os-user-domain-name',
metavar='<auth-user-domain-name>',
default=utils.env('OS_USER_DOMAIN_NAME'),
help='Domain name of the user (Env: OS_USER_DOMAIN_NAME)')
parser.add_argument(
'--os-user-domain-id',
metavar='<auth-user-domain-id>',
default=utils.env('OS_USER_DOMAIN_ID'),
help='Domain ID of the user (Env: OS_USER_DOMAIN_ID)')
parser.add_argument(
'--os-project-domain-name',
metavar='<auth-project-domain-name>',
default=utils.env('OS_PROJECT_DOMAIN_NAME'),
help='Domain name of the project which is the requested '
'project-level authorization scope '
'(Env: OS_PROJECT_DOMAIN_NAME)')
parser.add_argument(
'--os-project-domain-id',
metavar='<auth-project-domain-id>',
default=utils.env('OS_PROJECT_DOMAIN_ID'),
help='Domain ID of the project which is the requested '
'project-level authorization scope '
'(Env: OS_PROJECT_DOMAIN_ID)')
parser.add_argument(
'--os-region-name',
metavar='<auth-region-name>',
@ -284,16 +207,6 @@ class OpenStackShell(app.App):
help='Default domain ID, default=' +
DEFAULT_DOMAIN +
' (Env: OS_DEFAULT_DOMAIN)')
parser.add_argument(
'--os-token',
metavar='<token>',
default=utils.env('OS_TOKEN'),
help='Defaults to env[OS_TOKEN]')
parser.add_argument(
'--os-url',
metavar='<url>',
default=utils.env('OS_URL'),
help='Defaults to env[OS_URL]')
parser.add_argument(
'--timing',
default=False,
@ -306,20 +219,42 @@ class OpenStackShell(app.App):
def authenticate_user(self):
"""Verify the required authentication credentials are present"""
self.log.debug('validating authentication options')
if self.options.os_token or self.options.os_url:
self.log.debug("validating authentication options")
# Assuming all auth plugins will be named in the same fashion,
# ie vXpluginName
if (not self.options.os_url and
self.options.os_auth_plugin.startswith('v') and
self.options.os_auth_plugin[1] !=
self.options.os_identity_api_version[0]):
raise exc.CommandError(
"Auth plugin %s not compatible"
" with requested API version" % self.options.os_auth_plugin
)
# TODO(mhu) All these checks should be exposed at the plugin level
# or just dropped altogether, as the client instantiation will fail
# anyway
if self.options.os_url and not self.options.os_token:
# service token needed
raise exc.CommandError(
"You must provide a service token via"
" either --os-token or env[OS_TOKEN]")
if (self.options.os_auth_plugin.endswith('token') and
(self.options.os_token or self.options.os_auth_url)):
# Token flow auth takes priority
if not self.options.os_token:
raise exc.CommandError(
"You must provide a token via"
" either --os-token or env[OS_TOKEN]")
if not self.options.os_url:
if not self.options.os_auth_url:
raise exc.CommandError(
"You must provide a service URL via"
" either --os-url or env[OS_URL]")
" either --os-auth-url or env[OS_AUTH_URL]")
else:
if (not self.options.os_url and
not self.options.os_auth_plugin.endswith('token')):
# Validate password flow auth
if not self.options.os_username:
raise exc.CommandError(
@ -347,13 +282,15 @@ class OpenStackShell(app.App):
(self.options.os_domain_id
or self.options.os_domain_name) or
self.options.os_trust_id):
raise exc.CommandError(
"You must provide authentication scope as a project "
"or a domain via --os-project-id or env[OS_PROJECT_ID], "
"--os-project-name or env[OS_PROJECT_NAME], "
"--os-domain-id or env[OS_DOMAIN_ID], or"
"--os-domain-name or env[OS_DOMAIN_NAME], or "
"--os-trust-id or env[OS_TRUST_ID].")
if self.options.os_auth_plugin.endswith('password'):
raise exc.CommandError(
"You must provide authentication scope as a project "
"or a domain via --os-project-id "
"or env[OS_PROJECT_ID], "
"--os-project-name or env[OS_PROJECT_NAME], "
"--os-domain-id or env[OS_DOMAIN_ID], or"
"--os-domain-name or env[OS_DOMAIN_NAME], or "
"--os-trust-id or env[OS_TRUST_ID].")
if not self.options.os_auth_url:
raise exc.CommandError(
@ -375,24 +312,9 @@ class OpenStackShell(app.App):
"Pick one of project, domain or trust.")
self.client_manager = clientmanager.ClientManager(
token=self.options.os_token,
url=self.options.os_url,
auth_url=self.options.os_auth_url,
domain_id=self.options.os_domain_id,
domain_name=self.options.os_domain_name,
project_name=self.options.os_project_name,
project_id=self.options.os_project_id,
user_domain_id=self.options.os_user_domain_id,
user_domain_name=self.options.os_user_domain_name,
project_domain_id=self.options.os_project_domain_id,
project_domain_name=self.options.os_project_domain_name,
username=self.options.os_username,
password=self.options.os_password,
region_name=self.options.os_region_name,
auth_options=self.options,
verify=self.verify,
timing=self.options.timing,
api_version=self.api_version,
trust_id=self.options.os_trust_id,
)
return

View File

@ -12,34 +12,25 @@
# License for the specific language governing permissions and limitations
# under the License.
#
import mock
from requests_mock.contrib import fixture
from keystoneclient.auth.identity import v2 as auth_v2
from keystoneclient.openstack.common import jsonutils
from keystoneclient import service_catalog
from openstackclient.api import auth
from openstackclient.common import clientmanager
from openstackclient.common import exceptions as exc
from openstackclient.tests import fakes
from openstackclient.tests import utils
AUTH_REF = {'a': 1}
AUTH_TOKEN = "foobar"
AUTH_URL = "http://0.0.0.0"
USERNAME = "itchy"
PASSWORD = "scratchy"
SERVICE_CATALOG = {'sc': '123'}
API_VERSION = {"identity": "2.0"}
API_VERSION = {
'identity': '2.0',
}
def FakeMakeClient(instance):
return FakeClient()
class FakeClient(object):
auth_ref = AUTH_REF
auth_token = AUTH_TOKEN
service_catalog = SERVICE_CATALOG
AUTH_REF = {'version': 'v2.0'}
AUTH_REF.update(fakes.TEST_RESPONSE_DICT['access'])
SERVICE_CATALOG = service_catalog.ServiceCatalogV2(AUTH_REF)
class Container(object):
@ -49,6 +40,18 @@ class Container(object):
pass
class FakeOptions(object):
def __init__(self, **kwargs):
for option in auth.OPTIONS_LIST:
setattr(self, 'os_' + option.replace('-', '_'), None)
self.os_auth_plugin = None
self.os_identity_api_version = '2.0'
self.timing = None
self.os_region_name = None
self.os_url = None
self.__dict__.update(kwargs)
class TestClientCache(utils.TestCase):
def test_singleton(self):
@ -58,30 +61,38 @@ class TestClientCache(utils.TestCase):
self.assertEqual(c.attr, c.attr)
@mock.patch('keystoneclient.session.Session')
class TestClientManager(utils.TestCase):
def setUp(self):
super(TestClientManager, self).setUp()
self.mock = mock.Mock()
self.requests = self.useFixture(fixture.Fixture())
# fake v2password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT)
# fake v3password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'auth/tokens']))
# fake password version endpoint discovery
self.stub_auth(json=fakes.TEST_VERSIONS,
url=fakes.AUTH_URL,
verb='GET')
clientmanager.ClientManager.identity = \
clientmanager.ClientCache(FakeMakeClient)
def test_client_manager_token(self, mock):
def test_client_manager_token(self):
client_manager = clientmanager.ClientManager(
token=AUTH_TOKEN,
url=AUTH_URL,
verify=True,
auth_options=FakeOptions(os_token=fakes.AUTH_TOKEN,
os_auth_url=fakes.AUTH_URL,
os_auth_plugin='v2token'),
api_version=API_VERSION,
verify=True
)
self.assertEqual(
AUTH_TOKEN,
fakes.AUTH_TOKEN,
client_manager._token,
)
self.assertEqual(
AUTH_URL,
client_manager._url,
fakes.AUTH_URL,
client_manager._auth_url,
)
self.assertIsInstance(
client_manager.auth,
@ -90,26 +101,26 @@ class TestClientManager(utils.TestCase):
self.assertFalse(client_manager._insecure)
self.assertTrue(client_manager._verify)
def test_client_manager_password(self, mock):
def test_client_manager_password(self):
client_manager = clientmanager.ClientManager(
auth_url=AUTH_URL,
username=USERNAME,
password=PASSWORD,
verify=False,
auth_options=FakeOptions(os_auth_url=fakes.AUTH_URL,
os_username=fakes.USERNAME,
os_password=fakes.PASSWORD),
api_version=API_VERSION,
verify=False,
)
self.assertEqual(
AUTH_URL,
fakes.AUTH_URL,
client_manager._auth_url,
)
self.assertEqual(
USERNAME,
fakes.USERNAME,
client_manager._username,
)
self.assertEqual(
PASSWORD,
fakes.PASSWORD,
client_manager._password,
)
self.assertIsInstance(
@ -119,16 +130,87 @@ class TestClientManager(utils.TestCase):
self.assertTrue(client_manager._insecure)
self.assertFalse(client_manager._verify)
def test_client_manager_password_verify_ca(self, mock):
# These need to stick around until the old-style clients are gone
self.assertEqual(
AUTH_REF,
client_manager.auth_ref,
)
self.assertEqual(
fakes.AUTH_TOKEN,
client_manager._token,
)
self.assertEqual(
dir(SERVICE_CATALOG),
dir(client_manager._service_catalog),
)
def stub_auth(self, json=None, url=None, verb=None, **kwargs):
subject_token = fakes.AUTH_TOKEN
base_url = fakes.AUTH_URL
if json:
text = jsonutils.dumps(json)
headers = {'X-Subject-Token': subject_token,
'Content-Type': 'application/json'}
if not url:
url = '/'.join([base_url, 'tokens'])
url = url.replace("/?", "?")
if not verb:
verb = 'POST'
self.requests.register_uri(verb,
url,
headers=headers,
text=text)
def test_client_manager_password_verify_ca(self):
client_manager = clientmanager.ClientManager(
auth_url=AUTH_URL,
username=USERNAME,
password=PASSWORD,
verify='cafile',
auth_options=FakeOptions(os_auth_url=fakes.AUTH_URL,
os_username=fakes.USERNAME,
os_password=fakes.PASSWORD,
os_auth_plugin='v2password'),
api_version=API_VERSION,
verify='cafile',
)
self.assertFalse(client_manager._insecure)
self.assertTrue(client_manager._verify)
self.assertEqual('cafile', client_manager._cacert)
def _client_manager_guess_auth_plugin(self, auth_params,
api_version, auth_plugin):
auth_params['os_auth_plugin'] = auth_plugin
auth_params['os_identity_api_version'] = api_version
client_manager = clientmanager.ClientManager(
auth_options=FakeOptions(**auth_params),
api_version=API_VERSION,
verify=True
)
self.assertEqual(
auth_plugin,
client_manager._auth_plugin,
)
def test_client_manager_guess_auth_plugin(self):
# test token auth
params = dict(os_token=fakes.AUTH_TOKEN,
os_auth_url=fakes.AUTH_URL)
self._client_manager_guess_auth_plugin(params, '2.0', 'v2token')
self._client_manager_guess_auth_plugin(params, '3', 'v3token')
self._client_manager_guess_auth_plugin(params, 'XXX', 'token')
# test service auth
params = dict(os_token=fakes.AUTH_TOKEN, os_url='test')
self._client_manager_guess_auth_plugin(params, 'XXX', '')
# test password auth
params = dict(os_auth_url=fakes.AUTH_URL,
os_username=fakes.USERNAME,
os_password=fakes.PASSWORD)
self._client_manager_guess_auth_plugin(params, '2.0', 'v2password')
self._client_manager_guess_auth_plugin(params, '3', 'v3password')
self._client_manager_guess_auth_plugin(params, 'XXX', 'password')
def test_client_manager_guess_auth_plugin_failure(self):
self.assertRaises(exc.CommandError,
clientmanager.ClientManager,
auth_options=FakeOptions(os_auth_plugin=''),
api_version=API_VERSION,
verify=True)

View File

@ -22,6 +22,142 @@ import requests
AUTH_TOKEN = "foobar"
AUTH_URL = "http://0.0.0.0"
USERNAME = "itchy"
PASSWORD = "scratchy"
TEST_RESPONSE_DICT = {
"access": {
"metadata": {
"is_admin": 0,
"roles": [
"1234",
]
},
"serviceCatalog": [
{
"endpoints": [
{
"adminURL": AUTH_URL + "/v2.0",
"id": "1234",
"internalURL": AUTH_URL + "/v2.0",
"publicURL": AUTH_URL + "/v2.0",
"region": "RegionOne"
}
],
"endpoints_links": [],
"name": "keystone",
"type": "identity"
}
],
"token": {
"expires": "2035-01-01T00:00:01Z",
"id": AUTH_TOKEN,
"issued_at": "2013-01-01T00:00:01.692048",
"tenant": {
"description": None,
"enabled": True,
"id": "1234",
"name": "testtenant"
}
},
"user": {
"id": "5678",
"name": USERNAME,
"roles": [
{
"name": "testrole"
},
],
"roles_links": [],
"username": USERNAME
}
}
}
TEST_RESPONSE_DICT_V3 = {
"token": {
"audit_ids": [
"a"
],
"catalog": [
],
"expires_at": "2034-09-29T18:27:15.978064Z",
"extras": {},
"issued_at": "2014-09-29T17:27:15.978097Z",
"methods": [
"password"
],
"project": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "bbb",
"name": "project"
},
"roles": [
],
"user": {
"domain": {
"id": "default",
"name": "Default"
},
"id": "aaa",
"name": USERNAME
}
}
}
TEST_VERSIONS = {
"versions": {
"values": [
{
"id": "v3.0",
"links": [
{
"href": AUTH_URL,
"rel": "self"
}
],
"media-types": [
{
"base": "application/json",
"type": "application/vnd.openstack.identity-v3+json"
},
{
"base": "application/xml",
"type": "application/vnd.openstack.identity-v3+xml"
}
],
"status": "stable",
"updated": "2013-03-06T00:00:00Z"
},
{
"id": "v2.0",
"links": [
{
"href": AUTH_URL,
"rel": "self"
},
{
"href": "http://docs.openstack.org/",
"rel": "describedby",
"type": "text/html"
}
],
"media-types": [
{
"base": "application/json",
"type": "application/vnd.openstack.identity-v2.0+json"
},
{
"base": "application/xml",
"type": "application/vnd.openstack.identity-v2.0+xml"
}
],
"status": "stable",
"updated": "2014-04-17T00:00:00Z"
}
]
}
}
class FakeStdout:

View File

@ -34,6 +34,8 @@ DEFAULT_PASSWORD = "password"
DEFAULT_REGION_NAME = "ZZ9_Plural_Z_Alpha"
DEFAULT_TOKEN = "token"
DEFAULT_SERVICE_URL = "http://127.0.0.1:8771/v3.0/"
DEFAULT_AUTH_PLUGIN = "v2password"
DEFAULT_COMPUTE_API_VERSION = "2"
DEFAULT_IDENTITY_API_VERSION = "2.0"
@ -106,6 +108,8 @@ class TestShell(utils.TestCase):
default_args["region_name"])
self.assertEqual(_shell.options.os_trust_id,
default_args["trust_id"])
self.assertEqual(_shell.options.os_auth_plugin,
default_args['auth_plugin'])
def _assert_token_auth(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
@ -115,7 +119,8 @@ class TestShell(utils.TestCase):
self.app.assert_called_with(["list", "role"])
self.assertEqual(_shell.options.os_token, default_args["os_token"])
self.assertEqual(_shell.options.os_url, default_args["os_url"])
self.assertEqual(_shell.options.os_auth_url,
default_args["os_auth_url"])
def _assert_cli(self, cmd_options, default_args):
with mock.patch("openstackclient.shell.OpenStackShell.initialize_app",
@ -175,9 +180,9 @@ class TestShellPasswordAuth(TestShell):
"auth_url": DEFAULT_AUTH_URL,
"project_id": "",
"project_name": "",
"user_domain_id": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
@ -185,6 +190,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -204,6 +210,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -223,44 +230,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_tenant_id_flow(self):
flag = "--os-tenant-id " + DEFAULT_PROJECT_ID
kwargs = {
"auth_url": "",
"project_id": DEFAULT_PROJECT_ID,
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_tenant_name_flow(self):
flag = "--os-tenant-name " + DEFAULT_PROJECT_NAME
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": DEFAULT_PROJECT_NAME,
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -280,6 +250,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -299,6 +270,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -318,6 +290,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -337,6 +310,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -356,6 +330,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -375,6 +350,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -394,6 +370,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -413,6 +390,7 @@ class TestShellPasswordAuth(TestShell):
"password": DEFAULT_PASSWORD,
"region_name": "",
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -432,6 +410,7 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": DEFAULT_REGION_NAME,
"trust_id": "",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
@ -451,6 +430,27 @@ class TestShellPasswordAuth(TestShell):
"password": "",
"region_name": "",
"trust_id": "1234",
"auth_plugin": "",
}
self._assert_password_auth(flag, kwargs)
def test_only_auth_plugin_flow(self):
flag = "--os-auth-plugin " + "v2password"
kwargs = {
"auth_url": "",
"project_id": "",
"project_name": "",
"domain_id": "",
"domain_name": "",
"user_domain_id": "",
"user_domain_name": "",
"project_domain_id": "",
"project_domain_name": "",
"username": "",
"password": "",
"region_name": "",
"trust_id": "",
"auth_plugin": DEFAULT_AUTH_PLUGIN
}
self._assert_password_auth(flag, kwargs)
@ -460,7 +460,7 @@ class TestShellTokenAuth(TestShell):
super(TestShellTokenAuth, self).setUp()
env = {
"OS_TOKEN": DEFAULT_TOKEN,
"OS_URL": DEFAULT_SERVICE_URL,
"OS_AUTH_URL": DEFAULT_SERVICE_URL,
}
self.orig_env, os.environ = os.environ, env.copy()
@ -472,7 +472,7 @@ class TestShellTokenAuth(TestShell):
flag = ""
kwargs = {
"os_token": DEFAULT_TOKEN,
"os_url": DEFAULT_SERVICE_URL
"os_auth_url": DEFAULT_SERVICE_URL
}
self._assert_token_auth(flag, kwargs)
@ -481,7 +481,7 @@ class TestShellTokenAuth(TestShell):
flag = ""
kwargs = {
"os_token": "",
"os_url": ""
"os_auth_url": ""
}
self._assert_token_auth(flag, kwargs)

View File

@ -12,3 +12,4 @@ python-cinderclient>=1.1.0
python-neutronclient>=2.3.6,<3
requests>=1.2.1,!=2.4.0
six>=1.7.0
stevedore>=1.0.0