remove CLI from keystoneclient

the CLI has been deprecated for a long time, and many docs and
install guides recommend using OSC instead of `keystone`.

- removes CLI
- removes man page from docs
- removes CLI tests
- removes `bootstrap` from contrib
- removes entrypoint from setup.cfg

implements bp: remove-cli

Change-Id: Icbe15814bc4faf33f513f9654440068795eae807
This commit is contained in:
Steve Martinelli
2015-12-15 18:00:16 -05:00
parent 9b028b5cdd
commit ef13bd8cf6
16 changed files with 8 additions and 2645 deletions

View File

@@ -109,10 +109,7 @@ modindex_common_prefix = ['keystoneclient.']
# Grouping the document tree for man pages.
# List of tuples 'sourcefile', 'target', 'title', 'Authors name', 'manual'
man_pages = [
('man/keystone', 'keystone', 'Client for OpenStack Identity API',
['OpenStack Contributors'], 1),
]
#man_pages = []
# -- Options for HTML output --------------------------------------------------

View File

@@ -1,158 +0,0 @@
==============================================================
:program:`keystone` command line utility (pending deprecation)
==============================================================
.. program:: keystone
.. highlight:: bash
SYNOPSIS
========
:program:`keystone` [options] <command> [command-options]
:program:`keystone help`
:program:`keystone help` <command>
DESCRIPTION
===========
.. WARNING::
The :program:`keystone` command line utility is pending deprecation. The
`OpenStackClient unified command line utility
<http://docs.openstack.org/developer/python-openstackclient/>`_ should be
used instead. The :program:`keystone` command line utility only supports V2
of the Identity API whereas the OSC program supports both V2 and V3.
The :program:`keystone` command line utility interacts with services providing
OpenStack Identity API (e.g. Keystone).
To communicate with the API, you will need to be authenticated - and the
:program:`keystone` provides multiple options for this.
While bootstrapping Keystone the authentication is accomplished with a
shared secret token and the location of the Identity API endpoint. The
shared secret token is configured in keystone.conf as "admin_token".
You can specify those values on the command line with :option:`--os-token`
and :option:`--os-endpoint`, or set them in environment variables:
.. envvar:: OS_SERVICE_TOKEN
Your Keystone administrative token
.. envvar:: OS_SERVICE_ENDPOINT
Your Identity API endpoint
The command line options will override any environment variables set.
If you already have accounts, you can use your OpenStack username and
password. You can do this with the :option:`--os-username`,
:option:`--os-password`.
Keystone allows a user to be associated with one or more projects which are
historically called tenants. To specify the project for which you want to
authorize against, you may optionally specify a :option:`--os-tenant-id` or
:option:`--os-tenant-name`.
Instead of using options, it is easier to just set them as environment
variables:
.. envvar:: OS_USERNAME
Your Keystone username.
.. envvar:: OS_PASSWORD
Your Keystone password.
.. envvar:: OS_TENANT_NAME
Name of Keystone project.
.. envvar:: OS_TENANT_ID
ID of Keystone Tenant.
.. envvar:: OS_AUTH_URL
The OpenStack API server URL.
.. envvar:: OS_IDENTITY_API_VERSION
The OpenStack Identity API version.
.. envvar:: OS_CACERT
The location for the CA truststore (PEM formatted) for this client.
.. envvar:: OS_CERT
The location for the keystore (PEM formatted) containing the public
key of this client. This keystore can also optionally contain the
private key of this client.
.. envvar:: OS_KEY
The location for the keystore (PEM formatted) containing the private
key of this client. This value can be empty if the private key is
included in the OS_CERT file.
For example, in Bash you'd use::
export OS_USERNAME=yourname
export OS_PASSWORD=yadayadayada
export OS_TENANT_NAME=myproject
export OS_AUTH_URL=http(s)://example.com:5000/v2.0/
export OS_IDENTITY_API_VERSION=2.0
export OS_CACERT=/etc/keystone/yourca.pem
export OS_CERT=/etc/keystone/yourpublickey.pem
export OS_KEY=/etc/keystone/yourprivatekey.pem
OPTIONS
=======
To get a list of available commands and options run::
keystone help
To get usage and options of a command::
keystone help <command>
EXAMPLES
========
Get information about endpoint-create command::
keystone help endpoint-create
View endpoints of OpenStack services::
keystone catalog
Create a 'service' project::
keystone tenant-create --name=service
Create service user for nova::
keystone user-create --name=nova \
--tenant_id=<project ID> \
--email=nova@nothing.com
View roles::
keystone role-list
BUGS
====
Keystone client is hosted in Launchpad so you can view current bugs at
https://bugs.launchpad.net/python-keystoneclient/.

View File

@@ -1,40 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystoneclient import utils
from keystoneclient.v2_0 import client
@utils.arg('--user-name', metavar='<user-name>', default='admin', dest='user',
help='The name of the user to be created (default="admin").')
@utils.arg('--pass', metavar='<password>', required=True, dest='passwd',
help='The password for the new user.')
@utils.arg('--role-name', metavar='<role-name>', default='admin', dest='role',
help='The name of the role to be created and granted to the user '
'(default="admin").')
@utils.arg('--tenant-name', metavar='<tenant-name>', default='admin',
dest='tenant',
help='The name of the tenant to be created (default="admin").')
def do_bootstrap(kc, args):
"""Grants a new role to a new user on a new tenant, after creating each."""
tenant = kc.tenants.create(tenant_name=args.tenant)
role = kc.roles.create(name=args.role)
user = kc.users.create(name=args.user, password=args.passwd, email=None)
kc.roles.add_user_role(user=user, role=role, tenant=tenant)
# verify the result
user_client = client.Client(
username=args.user,
password=args.passwd,
tenant_name=args.tenant,
auth_url=kc.management_url)
user_client.authenticate()

View File

@@ -1,50 +0,0 @@
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from keystoneclient.generic import client
from keystoneclient.i18n import _
from keystoneclient import utils
CLIENT_CLASS = client.Client
@utils.unauthenticated
def do_discover(cs, args):
"""Discover Keystone servers, supported API versions and extensions."""
if cs.endpoint:
versions = cs.discover(cs.endpoint)
elif cs.auth_url:
versions = cs.discover(cs.auth_url)
else:
versions = cs.discover()
if versions:
if 'message' in versions:
print(versions['message'])
for key, version in six.iteritems(versions):
if key != 'message':
print(_(" - supports version %(id)s (%(status)s) here "
"%(url)s") %
version)
extensions = cs.discover_extensions(version['url'])
if extensions:
for key, extension in six.iteritems(extensions):
if key != 'message':
print(_(" - and %(key)s: %(extension)s") %
{'key': key, 'extension': extension})
else:
print(_("No Keystone-compatible endpoint found"))

View File

@@ -1,472 +0,0 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Command-line interface to the OpenStack Identity API."""
from __future__ import print_function
import argparse
import logging
import os
import sys
import warnings
from oslo_utils import encodeutils
import six
import keystoneclient
from keystoneclient import access
from keystoneclient.contrib.bootstrap import shell as shell_bootstrap
from keystoneclient import exceptions as exc
from keystoneclient.generic import shell as shell_generic
from keystoneclient import session
from keystoneclient import utils
from keystoneclient.v2_0 import shell as shell_v2_0
def env(*vars, **kwargs):
"""Search for the first defined of possibly many env vars
Returns the first environment variable defined in vars, or
returns the default defined in kwargs.
"""
for v in vars:
value = os.environ.get(v)
if value:
return value
return kwargs.get('default', '')
class OpenStackIdentityShell(object):
def __init__(self, parser_class=argparse.ArgumentParser):
# Since Python 2.7, DeprecationWarning is ignored by default, enable
# it so that the deprecation message is displayed.
warnings.simplefilter('once', category=DeprecationWarning)
warnings.warn(
'The keystone CLI is deprecated in favor of '
'python-openstackclient. For a Python library, continue using '
'python-keystoneclient.', DeprecationWarning)
# And back to normal!
warnings.resetwarnings()
self.parser_class = parser_class
def get_base_parser(self):
parser = self.parser_class(
prog='keystone',
description=__doc__.strip(),
epilog='See "keystone help COMMAND" '
'for help on a specific command.',
add_help=False,
formatter_class=OpenStackHelpFormatter,
)
# Global arguments
parser.add_argument('-h',
'--help',
action='store_true',
help=argparse.SUPPRESS)
parser.add_argument('--version',
action='version',
version=keystoneclient.__version__,
help="Shows the client version and exits.")
parser.add_argument('--debug',
default=False,
action='store_true',
help="Prints debugging output onto the console, "
"this includes the curl request and response "
"calls. Helpful for debugging and "
"understanding the API calls.")
parser.add_argument('--os-username',
metavar='<auth-user-name>',
default=env('OS_USERNAME'),
help='Name used for authentication with the '
'OpenStack Identity service. '
'Defaults to env[OS_USERNAME].')
parser.add_argument('--os_username',
help=argparse.SUPPRESS)
parser.add_argument('--os-password',
metavar='<auth-password>',
default=env('OS_PASSWORD'),
help='Password used for authentication with the '
'OpenStack Identity service. '
'Defaults to env[OS_PASSWORD].')
parser.add_argument('--os_password',
help=argparse.SUPPRESS)
parser.add_argument('--os-tenant-name',
metavar='<auth-tenant-name>',
default=env('OS_TENANT_NAME'),
help='Tenant to request authorization on. '
'Defaults to env[OS_TENANT_NAME].')
parser.add_argument('--os_tenant_name',
help=argparse.SUPPRESS)
parser.add_argument('--os-tenant-id',
metavar='<tenant-id>',
default=env('OS_TENANT_ID'),
help='Tenant to request authorization on. '
'Defaults to env[OS_TENANT_ID].')
parser.add_argument('--os_tenant_id',
help=argparse.SUPPRESS)
parser.add_argument('--os-auth-url',
metavar='<auth-url>',
default=env('OS_AUTH_URL'),
help='Specify the Identity endpoint to use for '
'authentication. '
'Defaults to env[OS_AUTH_URL].')
parser.add_argument('--os_auth_url',
help=argparse.SUPPRESS)
parser.add_argument('--os-region-name',
metavar='<region-name>',
default=env('OS_REGION_NAME'),
help='Specify the region to use. '
'Defaults to env[OS_REGION_NAME].')
parser.add_argument('--os_region_name',
help=argparse.SUPPRESS)
parser.add_argument('--os-identity-api-version',
metavar='<identity-api-version>',
default=env('OS_IDENTITY_API_VERSION',
'KEYSTONE_VERSION'),
help='Specify Identity API version to use. '
'Defaults to env[OS_IDENTITY_API_VERSION]'
' or 2.0.')
parser.add_argument('--os_identity_api_version',
help=argparse.SUPPRESS)
parser.add_argument('--os-token',
metavar='<service-token>',
default=env('OS_SERVICE_TOKEN'),
help='Specify an existing token to use instead of '
'retrieving one via authentication (e.g. '
'with username & password). '
'Defaults to env[OS_SERVICE_TOKEN].')
parser.add_argument('--os-endpoint',
metavar='<service-endpoint>',
default=env('OS_SERVICE_ENDPOINT'),
help='Specify an endpoint to use instead of '
'retrieving one from the service catalog '
'(via authentication). '
'Defaults to env[OS_SERVICE_ENDPOINT].')
parser.add_argument('--os-cache',
default=env('OS_CACHE', default=False),
action='store_true',
help='Use the auth token cache. '
'Defaults to env[OS_CACHE].')
parser.add_argument('--os_cache',
help=argparse.SUPPRESS)
parser.add_argument('--force-new-token',
default=False,
action="store_true",
dest='force_new_token',
help="If the keyring is available and in use, "
"token will always be stored and fetched "
"from the keyring until the token has "
"expired. Use this option to request a "
"new token and replace the existing one "
"in the keyring.")
parser.add_argument('--stale-duration',
metavar='<seconds>',
default=access.STALE_TOKEN_DURATION,
dest='stale_duration',
help="Stale duration (in seconds) used to "
"determine whether a token has expired "
"when retrieving it from keyring. This "
"is useful in mitigating process or "
"network delays. Default is %s seconds." %
access.STALE_TOKEN_DURATION)
session.Session.register_cli_options(parser)
parser.add_argument('--os_cacert', help=argparse.SUPPRESS)
parser.add_argument('--os_key', help=argparse.SUPPRESS)
parser.add_argument('--os_cert', help=argparse.SUPPRESS)
return parser
def get_subcommand_parser(self, version):
parser = self.get_base_parser()
self.subcommands = {}
subparsers = parser.add_subparsers(metavar='<subcommand>')
try:
actions_module = {
'2.0': shell_v2_0,
}[version]
except KeyError:
actions_module = shell_v2_0
self._find_actions(subparsers, actions_module)
self._find_actions(subparsers, shell_generic)
self._find_actions(subparsers, shell_bootstrap)
self._find_actions(subparsers, self)
self._add_bash_completion_subparser(subparsers)
return parser
def _add_bash_completion_subparser(self, subparsers):
subparser = subparsers.add_parser(
'bash_completion',
add_help=False,
formatter_class=OpenStackHelpFormatter
)
self.subcommands['bash_completion'] = subparser
subparser.set_defaults(func=self.do_bash_completion)
def _find_actions(self, subparsers, actions_module):
for attr in (a for a in dir(actions_module) if a.startswith('do_')):
# I prefer to be hyphen-separated instead of underscores.
command = attr[3:].replace('_', '-')
callback = getattr(actions_module, attr)
desc = callback.__doc__ or ''
help = desc.strip().split('\n')[0]
arguments = getattr(callback, 'arguments', [])
subparser = subparsers.add_parser(
command,
help=help,
description=desc,
add_help=False,
formatter_class=OpenStackHelpFormatter)
subparser.add_argument('-h', '--help', action='help',
help=argparse.SUPPRESS)
self.subcommands[command] = subparser
group = subparser.add_argument_group(title='Arguments')
for (args, kwargs) in arguments:
group.add_argument(*args, **kwargs)
subparser.set_defaults(func=callback)
def auth_check(self, args):
if args.os_token or args.os_endpoint:
if not args.os_token:
raise exc.CommandError(
'Expecting a token provided via either --os-token or '
'env[OS_SERVICE_TOKEN]')
if not args.os_endpoint:
raise exc.CommandError(
'Expecting an endpoint provided via either '
'--os-endpoint or env[OS_SERVICE_ENDPOINT]')
# user supplied a token and endpoint and at least one other cred
if args.os_username or args.os_password or args.os_auth_url:
msg = ('WARNING: Bypassing authentication using a token & '
'endpoint (authentication credentials are being '
'ignored).')
print(msg)
else:
if not args.os_auth_url:
raise exc.CommandError(
'Expecting an auth URL via either --os-auth-url or '
'env[OS_AUTH_URL]')
if args.os_username or args.os_password:
if not args.os_username:
raise exc.CommandError(
'Expecting a username provided via either '
'--os-username or env[OS_USERNAME]')
if not args.os_password:
args.os_password = utils.prompt_user_password()
# No password because we didn't have a tty or the
# user Ctl-D when prompted?
if not args.os_password:
raise exc.CommandError(
'Expecting a password provided via either '
'--os-password, env[OS_PASSWORD], or '
'prompted response')
else:
raise exc.CommandError('Expecting authentication method via'
'\n either a service token, '
'--os-token or env[OS_SERVICE_TOKEN], '
'\n credentials, '
'--os-username or env[OS_USERNAME]')
def main(self, argv):
# Parse args once to find version
parser = self.get_base_parser()
(options, args) = parser.parse_known_args(argv)
# build available subcommands based on version
api_version = options.os_identity_api_version
subcommand_parser = self.get_subcommand_parser(api_version)
self.parser = subcommand_parser
# Handle top-level --help/-h before attempting to parse
# a command off the command line
if not argv or options.help:
self.do_help(options)
return 0
# Parse args again and call whatever callback was selected
args = subcommand_parser.parse_args(argv)
# Short-circuit and deal with help command right away.
if args.func == self.do_help:
self.do_help(args)
return 0
elif args.func == self.do_bash_completion:
self.do_bash_completion(args)
return 0
if args.debug:
logging_level = logging.DEBUG
iso_logger = logging.getLogger('iso8601')
iso_logger.setLevel('WARN')
else:
logging_level = logging.WARNING
logging.basicConfig(level=logging_level)
# TODO(heckj): supporting backwards compatibility with environment
# variables. To be removed after DEVSTACK is updated, ideally in
# the Grizzly release cycle.
args.os_token = args.os_token or env('SERVICE_TOKEN')
args.os_endpoint = args.os_endpoint or env('SERVICE_ENDPOINT')
if utils.isunauthenticated(args.func):
self.cs = shell_generic.CLIENT_CLASS(endpoint=args.os_auth_url,
cacert=args.os_cacert,
key=args.os_key,
cert=args.os_cert,
insecure=args.insecure,
timeout=args.timeout)
else:
self.auth_check(args)
token = None
if args.os_token and args.os_endpoint:
token = args.os_token
api_version = options.os_identity_api_version
self.cs = self.get_api_class(api_version)(
username=args.os_username,
tenant_name=args.os_tenant_name,
tenant_id=args.os_tenant_id,
token=token,
endpoint=args.os_endpoint,
password=args.os_password,
auth_url=args.os_auth_url,
region_name=args.os_region_name,
cacert=args.os_cacert,
key=args.os_key,
cert=args.os_cert,
insecure=args.insecure,
debug=args.debug,
use_keyring=args.os_cache,
force_new_token=args.force_new_token,
stale_duration=args.stale_duration,
timeout=args.timeout)
try:
args.func(self.cs, args)
except exc.Unauthorized:
raise exc.CommandError("Invalid OpenStack Identity credentials.")
except exc.AuthorizationFailure:
raise exc.CommandError("Unable to authorize user")
def get_api_class(self, version):
try:
return {
"2.0": shell_v2_0.CLIENT_CLASS,
}[version]
except KeyError:
if version:
msg = ('WARNING: unsupported identity-api-version %s, '
'falling back to 2.0' % version)
print(msg)
return shell_v2_0.CLIENT_CLASS
def do_bash_completion(self, args):
"""Prints all of the commands and options to stdout.
The keystone.bash_completion script doesn't have to hard code them.
"""
commands = set()
options = set()
for sc_str, sc in self.subcommands.items():
commands.add(sc_str)
for option in list(sc._optionals._option_string_actions):
options.add(option)
commands.remove('bash-completion')
commands.remove('bash_completion')
print(' '.join(commands | options))
@utils.arg('command', metavar='<subcommand>', nargs='?',
help='Display help for <subcommand>.')
def do_help(self, args):
"""Display help about this program or one of its subcommands."""
if getattr(args, 'command', None):
if args.command in self.subcommands:
self.subcommands[args.command].print_help()
else:
raise exc.CommandError("'%s' is not a valid subcommand" %
args.command)
else:
self.parser.print_help()
# I'm picky about my shell help.
class OpenStackHelpFormatter(argparse.HelpFormatter):
INDENT_BEFORE_ARGUMENTS = 6
MAX_WIDTH_ARGUMENTS = 32
def add_arguments(self, actions):
for action in filter(lambda x: not x.option_strings, actions):
if not action.choices:
continue
for choice in action.choices:
length = len(choice) + self.INDENT_BEFORE_ARGUMENTS
if(length > self._max_help_position and
length <= self.MAX_WIDTH_ARGUMENTS):
self._max_help_position = length
super(OpenStackHelpFormatter, self).add_arguments(actions)
def start_section(self, heading):
# Title-case the headings
heading = '%s%s' % (heading[0].upper(), heading[1:])
super(OpenStackHelpFormatter, self).start_section(heading)
def main():
try:
OpenStackIdentityShell().main(sys.argv[1:])
except KeyboardInterrupt:
print("... terminating keystone client", file=sys.stderr)
sys.exit(130)
except Exception as e:
print(encodeutils.safe_encode(six.text_type(e)), file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,143 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import re
from tempest_lib.cli import base
from tempest_lib import exceptions
class SimpleReadOnlyKeystoneClientTest(base.ClientTestBase):
"""Basic, read-only tests for Keystone CLI client.
Checks return values and output of read-only commands.
These tests do not presume any content, nor do they create
their own. They only verify the structure of output if present.
"""
def _get_clients(self):
path = os.path.join(os.path.abspath('.'), '.tox/functional/bin')
cli_dir = os.environ.get('OS_KEYSTONECLIENT_EXEC_DIR', path)
return base.CLIClient(
username=os.environ.get('OS_USERNAME'),
password=os.environ.get('OS_PASSWORD'),
tenant_name=os.environ.get('OS_TENANT_NAME'),
uri=os.environ.get('OS_AUTH_URL'),
cli_dir=cli_dir)
def keystone(self, *args, **kwargs):
return self.clients.keystone(*args, **kwargs)
def test_admin_fake_action(self):
self.assertRaises(exceptions.CommandFailed,
self.keystone,
'this-does-not-exist')
def test_admin_catalog_list(self):
out = self.keystone('catalog')
catalog = self.parser.details_multiple(out, with_label=True)
for svc in catalog:
if svc.get('__label'):
self.assertTrue(svc['__label'].startswith('Service:'),
msg=('Invalid beginning of service block: '
'%s' % svc['__label']))
# check that region and publicURL exists. One might also
# check for adminURL and internalURL. id seems to be optional
# and is missing in the catalog backend
self.assertIn('publicURL', svc)
self.assertIn('region', svc)
def test_admin_endpoint_list(self):
out = self.keystone('endpoint-list')
endpoints = self.parser.listing(out)
self.assertTableStruct(endpoints, [
'id', 'region', 'publicurl', 'internalurl',
'adminurl', 'service_id'])
def test_admin_endpoint_service_match(self):
endpoints = self.parser.listing(self.keystone('endpoint-list'))
services = self.parser.listing(self.keystone('service-list'))
svc_by_id = {}
for svc in services:
svc_by_id[svc['id']] = svc
for endpoint in endpoints:
self.assertIn(endpoint['service_id'], svc_by_id)
def test_admin_role_list(self):
roles = self.parser.listing(self.keystone('role-list'))
self.assertTableStruct(roles, ['id', 'name'])
def test_admin_service_list(self):
services = self.parser.listing(self.keystone('service-list'))
self.assertTableStruct(services, ['id', 'name', 'type', 'description'])
def test_admin_tenant_list(self):
tenants = self.parser.listing(self.keystone('tenant-list'))
self.assertTableStruct(tenants, ['id', 'name', 'enabled'])
def test_admin_user_list(self):
users = self.parser.listing(self.keystone('user-list'))
self.assertTableStruct(users, [
'id', 'name', 'enabled', 'email'])
def test_admin_user_role_list(self):
user_roles = self.parser.listing(self.keystone('user-role-list'))
self.assertTableStruct(user_roles, [
'id', 'name', 'user_id', 'tenant_id'])
def test_admin_discover(self):
discovered = self.keystone('discover')
self.assertIn('Keystone found at http', discovered)
self.assertIn('supports version', discovered)
def test_admin_help(self):
help_text = self.keystone('help')
lines = help_text.split('\n')
self.assertFirstLineStartsWith(lines, 'usage: keystone')
commands = []
cmds_start = lines.index('Positional arguments:')
cmds_end = lines.index('Optional arguments:')
command_pattern = re.compile('^ {4}([a-z0-9\-\_]+)')
for line in lines[cmds_start:cmds_end]:
match = command_pattern.match(line)
if match:
commands.append(match.group(1))
commands = set(commands)
wanted_commands = set(('catalog', 'endpoint-list', 'help',
'token-get', 'discover', 'bootstrap'))
self.assertFalse(wanted_commands - commands)
def test_admin_bashcompletion(self):
self.keystone('bash-completion')
def test_admin_ec2_credentials_list(self):
creds = self.keystone('ec2-credentials-list')
creds = self.parser.listing(creds)
self.assertTableStruct(creds, ['tenant', 'access', 'secret'])
# Optional arguments:
def test_admin_version(self):
self.keystone('', flags='--version')
def test_admin_debug_list(self):
self.keystone('catalog', flags='--debug')
def test_admin_timeout(self):
self.keystone('catalog', flags='--timeout %d' % 15)

View File

@@ -1,129 +0,0 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from six import moves
from keystoneclient.generic import shell
from keystoneclient.tests.unit import utils
class DoDiscoverTest(utils.TestCase):
"""Unit tests for do_discover function."""
foo_version = {
'id': 'foo_id',
'status': 'foo_status',
'url': 'http://foo/url',
}
bar_version = {
'id': 'bar_id',
'status': 'bar_status',
'url': 'http://bar/url',
}
foo_extension = {
'foo': 'foo_extension',
'message': 'extension_message',
'bar': 'bar_extension',
}
stub_message = 'This is a stub message'
def setUp(self):
super(DoDiscoverTest, self).setUp()
self.client_mock = mock.Mock()
self.client_mock.discover.return_value = {}
def _execute_discover(self):
"""Call do_discover function and capture output
:returns: captured output is returned
"""
with mock.patch('sys.stdout',
new_callable=moves.StringIO) as mock_stdout:
shell.do_discover(self.client_mock, args=None)
output = mock_stdout.getvalue()
return output
def _check_version_print(self, output, version):
"""Checks all api version's parameters are present in output."""
self.assertIn(version['id'], output)
self.assertIn(version['status'], output)
self.assertIn(version['url'], output)
def test_no_keystones(self):
# No servers configured for client,
# corresponding message should be printed
output = self._execute_discover()
self.assertIn('No Keystone-compatible endpoint found', output)
def test_endpoint(self):
# Endpoint is configured for client,
# client's discover method should be called with that value
self.client_mock.endpoint = 'Some non-empty value'
shell.do_discover(self.client_mock, args=None)
self.client_mock.discover.assert_called_with(self.client_mock.endpoint)
def test_auth_url(self):
# No endpoint provided for client, but there is an auth_url
# client's discover method should be called with auth_url value
self.client_mock.endpoint = False
self.client_mock.auth_url = 'Some non-empty value'
shell.do_discover(self.client_mock, args=None)
self.client_mock.discover.assert_called_with(self.client_mock.auth_url)
def test_empty(self):
# No endpoint or auth_url is configured for client.
# client.discover() should be called without parameters
self.client_mock.endpoint = False
self.client_mock.auth_url = False
shell.do_discover(self.client_mock, args=None)
self.client_mock.discover.assert_called_with()
def test_message(self):
# If client.discover() result contains message - it should be printed
self.client_mock.discover.return_value = {'message': self.stub_message}
output = self._execute_discover()
self.assertIn(self.stub_message, output)
def test_versions(self):
# Every version in client.discover() result should be printed
# and client.discover_extension() should be called on its url
self.client_mock.discover.return_value = {
'foo': self.foo_version,
'bar': self.bar_version,
}
self.client_mock.discover_extensions.return_value = {}
output = self._execute_discover()
self._check_version_print(output, self.foo_version)
self._check_version_print(output, self.bar_version)
discover_extension_calls = [
mock.call(self.foo_version['url']),
mock.call(self.bar_version['url']),
]
self.client_mock.discover_extensions.assert_has_calls(
discover_extension_calls,
any_order=True)
def test_extensions(self):
# Every extension's parameters should be printed
# Extension's message should be omitted
self.client_mock.discover.return_value = {'foo': self.foo_version}
self.client_mock.discover_extensions.return_value = self.foo_extension
output = self._execute_discover()
self.assertIn(self.foo_extension['foo'], output)
self.assertIn(self.foo_extension['bar'], output)
self.assertNotIn(self.foo_extension['message'], output)

View File

@@ -1,534 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import json
import logging
import os
import sys
import uuid
import fixtures
import mock
import six
import testtools
from testtools import matchers
from keystoneclient import exceptions
from keystoneclient import session
from keystoneclient import shell as openstack_shell
from keystoneclient.tests.unit import utils
from keystoneclient.v2_0 import shell as shell_v2_0
DEFAULT_USERNAME = 'username'
DEFAULT_PASSWORD = 'password'
DEFAULT_TENANT_ID = 'tenant_id'
DEFAULT_TENANT_NAME = 'tenant_name'
DEFAULT_AUTH_URL = 'http://127.0.0.1:5000/v2.0/'
# Make a fake shell object, a helping wrapper to call it
def shell(cmd):
openstack_shell.OpenStackIdentityShell().main(cmd.split())
class NoExitArgumentParser(argparse.ArgumentParser):
def error(self, message):
raise exceptions.CommandError(message)
class ShellTest(utils.TestCase):
FAKE_ENV = {
'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_TENANT_ID': DEFAULT_TENANT_ID,
'OS_TENANT_NAME': DEFAULT_TENANT_NAME,
'OS_AUTH_URL': DEFAULT_AUTH_URL,
}
def _tolerant_shell(self, cmd):
t_shell = openstack_shell.OpenStackIdentityShell(NoExitArgumentParser)
t_shell.main(cmd.split())
# Patch os.environ to avoid required auth info.
def setUp(self):
super(ShellTest, self).setUp()
for var in os.environ:
if var.startswith("OS_"):
self.useFixture(fixtures.EnvironmentVariable(var, ""))
for var in self.FAKE_ENV:
self.useFixture(fixtures.EnvironmentVariable(var,
self.FAKE_ENV[var]))
def test_help_unknown_command(self):
self.assertRaises(exceptions.CommandError, shell, 'help %s'
% uuid.uuid4().hex)
def shell(self, argstr):
orig = sys.stdout
clean_env = {}
_old_env, os.environ = os.environ, clean_env.copy()
try:
sys.stdout = six.StringIO()
_shell = openstack_shell.OpenStackIdentityShell()
_shell.main(argstr.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertEqual(exc_value.code, 0)
finally:
out = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
os.environ = _old_env
return out
def test_help_no_args(self):
do_tenant_mock = mock.MagicMock()
with mock.patch('keystoneclient.shell.OpenStackIdentityShell.do_help',
do_tenant_mock):
self.shell('')
assert do_tenant_mock.called
def test_help(self):
required = 'usage:'
help_text = self.shell('help')
self.assertThat(help_text,
matchers.MatchesRegex(required))
def test_help_command(self):
required = 'usage: keystone user-create'
help_text = self.shell('help user-create')
self.assertThat(help_text,
matchers.MatchesRegex(required))
def test_help_command_with_no_action_choices(self):
required = 'usage: keystone user-update'
help_text = self.shell('help user-update')
self.assertThat(help_text,
matchers.MatchesRegex(required))
def test_auth_no_credentials(self):
with testtools.ExpectedException(
exceptions.CommandError, 'Expecting'):
self.shell('user-list')
def test_debug(self):
logging_mock = mock.MagicMock()
with mock.patch('logging.basicConfig', logging_mock):
self.assertRaises(exceptions.CommandError,
self.shell, '--debug user-list')
self.assertTrue(logging_mock.called)
self.assertEqual([(), {'level': logging.DEBUG}],
list(logging_mock.call_args))
def test_auth_password_authurl_no_username(self):
with testtools.ExpectedException(
exceptions.CommandError,
'Expecting a username provided via either'):
self.shell('--os-password=%s --os-auth-url=%s user-list'
% (uuid.uuid4().hex, uuid.uuid4().hex))
def test_auth_username_password_no_authurl(self):
with testtools.ExpectedException(
exceptions.CommandError, 'Expecting an auth URL via either'):
self.shell('--os-password=%s --os-username=%s user-list'
% (uuid.uuid4().hex, uuid.uuid4().hex))
def test_token_no_endpoint(self):
with testtools.ExpectedException(
exceptions.CommandError, 'Expecting an endpoint provided'):
self.shell('--os-token=%s user-list' % uuid.uuid4().hex)
def test_endpoint_no_token(self):
with testtools.ExpectedException(
exceptions.CommandError, 'Expecting a token provided'):
self.shell('--os-endpoint=http://10.0.0.1:5000/v2.0/ user-list')
def test_shell_args(self):
do_tenant_mock = mock.MagicMock()
with mock.patch('keystoneclient.v2_0.shell.do_user_list',
do_tenant_mock):
shell('user-list')
assert do_tenant_mock.called
((a, b), c) = do_tenant_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# Old_style options
shell('--os_auth_url http://0.0.0.0:5000/ --os_password xyzpdq '
'--os_tenant_id 1234 --os_tenant_name fred '
'--os_username barney '
'--os_identity_api_version 2.0 user-list')
assert do_tenant_mock.called
((a, b), c) = do_tenant_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = ('http://0.0.0.0:5000/', 'xyzpdq', '1234',
'fred', 'barney', '2.0')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# New-style options
shell('--os-auth-url http://1.1.1.1:5000/ --os-password xyzpdq '
'--os-tenant-id 4321 --os-tenant-name wilma '
'--os-username betty '
'--os-identity-api-version 2.0 user-list')
assert do_tenant_mock.called
((a, b), c) = do_tenant_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = ('http://1.1.1.1:5000/', 'xyzpdq', '4321',
'wilma', 'betty', '2.0')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# Test keyring options
shell('--os-auth-url http://1.1.1.1:5000/ --os-password xyzpdq '
'--os-tenant-id 4321 --os-tenant-name wilma '
'--os-username betty '
'--os-identity-api-version 2.0 '
'--os-cache '
'--stale-duration 500 '
'--force-new-token user-list')
assert do_tenant_mock.called
((a, b), c) = do_tenant_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version, b.os_cache,
b.stale_duration, b.force_new_token)
expect = ('http://1.1.1.1:5000/', 'xyzpdq', '4321',
'wilma', 'betty', '2.0', True, '500', True)
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# Test os-identity-api-version fall back to 2.0
shell('--os-identity-api-version 3.0 user-list')
assert do_tenant_mock.called
self.assertTrue(b.os_identity_api_version, '2.0')
def test_shell_user_create_args(self):
"""Test user-create args."""
do_uc_mock = mock.MagicMock()
# grab the decorators for do_user_create
uc_func = getattr(shell_v2_0, 'do_user_create')
do_uc_mock.arguments = getattr(uc_func, 'arguments', [])
with mock.patch('keystoneclient.v2_0.shell.do_user_create',
do_uc_mock):
# Old_style options
# Test case with one --tenant_id args present: ec2 creds
shell('user-create --name=FOO '
'--pass=secret --tenant_id=barrr --enabled=true')
assert do_uc_mock.called
((a, b), c) = do_uc_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant_id, b.name, b.passwd, b.enabled)
expect = ('barrr', 'FOO', 'secret', 'true')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# New-style options
# Test case with one --tenant args present: ec2 creds
shell('user-create --name=foo '
'--pass=secret --tenant=BARRR --enabled=true')
assert do_uc_mock.called
((a, b), c) = do_uc_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant, b.name, b.passwd, b.enabled)
expect = ('BARRR', 'foo', 'secret', 'true')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# New-style options
# Test case with one --tenant-id args present: ec2 creds
shell('user-create --name=foo '
'--pass=secret --tenant-id=BARRR --enabled=true')
assert do_uc_mock.called
((a, b), c) = do_uc_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant, b.name, b.passwd, b.enabled)
expect = ('BARRR', 'foo', 'secret', 'true')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# Old_style options
# Test case with --os_tenant_id and --tenant_id args present
shell('--os_tenant_id=os-tenant user-create --name=FOO '
'--pass=secret --tenant_id=barrr --enabled=true')
assert do_uc_mock.called
((a, b), c) = do_uc_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, 'os-tenant',
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant_id, b.name, b.passwd, b.enabled)
expect = ('barrr', 'FOO', 'secret', 'true')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# New-style options
# Test case with --os-tenant-id and --tenant-id args present
shell('--os-tenant-id=ostenant user-create --name=foo '
'--pass=secret --tenant-id=BARRR --enabled=true')
assert do_uc_mock.called
((a, b), c) = do_uc_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, 'ostenant',
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant, b.name, b.passwd, b.enabled)
expect = ('BARRR', 'foo', 'secret', 'true')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
def test_do_tenant_create(self):
do_tenant_mock = mock.MagicMock()
with mock.patch('keystoneclient.v2_0.shell.do_tenant_create',
do_tenant_mock):
shell('tenant-create')
assert do_tenant_mock.called
# FIXME(dtroyer): how do you test the decorators?
# shell('tenant-create --tenant-name wilma '
# '--description "fred\'s wife"')
# assert do_tenant_mock.called
def test_do_tenant_list(self):
do_tenant_mock = mock.MagicMock()
with mock.patch('keystoneclient.v2_0.shell.do_tenant_list',
do_tenant_mock):
shell('tenant-list')
assert do_tenant_mock.called
def test_shell_tenant_id_args(self):
"""Test where tenant_id is passed twice.
Test a corner case where --tenant_id appears on the
command-line twice.
"""
do_ec2_mock = mock.MagicMock()
# grab the decorators for do_ec2_create_credentials
ec2_func = getattr(shell_v2_0, 'do_ec2_credentials_create')
do_ec2_mock.arguments = getattr(ec2_func, 'arguments', [])
with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_create',
do_ec2_mock):
# Old_style options
# Test case with one --tenant_id args present: ec2 creds
shell('ec2-credentials-create '
'--tenant_id=ec2-tenant --user_id=ec2-user')
assert do_ec2_mock.called
((a, b), c) = do_ec2_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant_id, b.user_id)
expect = ('ec2-tenant', 'ec2-user')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# New-style options
# Test case with one --tenant-id args present: ec2 creds
shell('ec2-credentials-create '
'--tenant-id=dash-tenant --user-id=dash-user')
assert do_ec2_mock.called
((a, b), c) = do_ec2_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant_id, b.user_id)
expect = ('dash-tenant', 'dash-user')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# Old_style options
# Test case with two --tenant_id args present
shell('--os_tenant_id=os-tenant ec2-credentials-create '
'--tenant_id=ec2-tenant --user_id=ec2-user')
assert do_ec2_mock.called
((a, b), c) = do_ec2_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, 'os-tenant',
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant_id, b.user_id)
expect = ('ec2-tenant', 'ec2-user')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# New-style options
# Test case with two --tenant-id args present
shell('--os-tenant-id=ostenant ec2-credentials-create '
'--tenant-id=dash-tenant --user-id=dash-user')
assert do_ec2_mock.called
((a, b), c) = do_ec2_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, 'ostenant',
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.tenant_id, b.user_id)
expect = ('dash-tenant', 'dash-user')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
def test_do_ec2_get(self):
do_shell_mock = mock.MagicMock()
with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_create',
do_shell_mock):
shell('ec2-credentials-create')
assert do_shell_mock.called
with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_get',
do_shell_mock):
shell('ec2-credentials-get')
assert do_shell_mock.called
with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_list',
do_shell_mock):
shell('ec2-credentials-list')
assert do_shell_mock.called
with mock.patch('keystoneclient.v2_0.shell.do_ec2_credentials_delete',
do_shell_mock):
shell('ec2-credentials-delete')
assert do_shell_mock.called
def test_timeout_parse_invalid_type(self):
for f in ['foobar', 'xyz']:
cmd = '--timeout %s endpoint-create' % (f)
self.assertRaises(exceptions.CommandError,
self._tolerant_shell, cmd)
def test_timeout_parse_invalid_number(self):
for f in [-1, 0]:
cmd = '--timeout %s endpoint-create' % (f)
self.assertRaises(exceptions.CommandError,
self._tolerant_shell, cmd)
def test_do_timeout(self):
response_mock = mock.MagicMock()
response_mock.status_code = 200
response_mock.text = json.dumps({
'endpoints': [],
})
request_mock = mock.MagicMock(return_value=response_mock)
with mock.patch.object(session.requests, 'request',
request_mock):
shell(('--timeout 2 --os-token=blah --os-endpoint=blah'
' --os-auth-url=blah.com endpoint-list'))
request_mock.assert_called_with(mock.ANY, mock.ANY,
timeout=2,
allow_redirects=False,
headers=mock.ANY,
verify=mock.ANY)
def test_do_endpoints(self):
do_shell_mock = mock.MagicMock()
# grab the decorators for do_endpoint_create
shell_func = getattr(shell_v2_0, 'do_endpoint_create')
do_shell_mock.arguments = getattr(shell_func, 'arguments', [])
with mock.patch('keystoneclient.v2_0.shell.do_endpoint_create',
do_shell_mock):
# Old_style options
# Test create args
shell('endpoint-create '
'--service_id=2 --publicurl=http://example.com:1234/go '
'--adminurl=http://example.com:9876/adm')
assert do_shell_mock.called
((a, b), c) = do_shell_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.service, b.publicurl, b.adminurl)
expect = ('2',
'http://example.com:1234/go',
'http://example.com:9876/adm')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# New-style options
# Test create args
shell('endpoint-create '
'--service-id=3 --publicurl=http://example.com:4321/go '
'--adminurl=http://example.com:9876/adm')
assert do_shell_mock.called
((a, b), c) = do_shell_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.service, b.publicurl, b.adminurl)
expect = ('3',
'http://example.com:4321/go',
'http://example.com:9876/adm')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
# New-style options
# Test create args
shell('endpoint-create '
'--service=3 --publicurl=http://example.com:4321/go '
'--adminurl=http://example.com:9876/adm')
assert do_shell_mock.called
((a, b), c) = do_shell_mock.call_args
actual = (b.os_auth_url, b.os_password, b.os_tenant_id,
b.os_tenant_name, b.os_username,
b.os_identity_api_version)
expect = (DEFAULT_AUTH_URL, DEFAULT_PASSWORD, DEFAULT_TENANT_ID,
DEFAULT_TENANT_NAME, DEFAULT_USERNAME, '')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
actual = (b.service, b.publicurl, b.adminurl)
expect = ('3',
'http://example.com:4321/go',
'http://example.com:9876/adm')
self.assertTrue(all([x == y for x, y in zip(actual, expect)]))
def test_shell_keyboard_interrupt(self):
shell_mock = mock.MagicMock()
with mock.patch('keystoneclient.shell.OpenStackIdentityShell.main',
shell_mock):
try:
shell_mock.side_effect = KeyboardInterrupt()
openstack_shell.main()
except SystemExit as ex:
self.assertEqual(130, ex.code)

View File

@@ -10,8 +10,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import sys
import six
import testresources
from testtools import matchers
@@ -103,39 +101,6 @@ class FakeObject(object):
self.name = name
class PrintTestCase(test_utils.TestCase):
def setUp(self):
super(PrintTestCase, self).setUp()
self.old_stdout = sys.stdout
self.stdout = six.moves.cStringIO()
self.addCleanup(setattr, self, 'stdout', None)
sys.stdout = self.stdout
self.addCleanup(setattr, sys, 'stdout', self.old_stdout)
def test_print_list_unicode(self):
name = six.u('\u540d\u5b57')
objs = [FakeObject(name)]
# NOTE(Jeffrey4l) If the text's encode is proper, this method will not
# raise UnicodeEncodeError exceptions
utils.print_list(objs, ['name'])
output = self.stdout.getvalue()
# In Python 2, output will be bytes, while in Python 3, it will not.
# Let's decode the value if needed.
if isinstance(output, six.binary_type):
output = output.decode('utf-8')
self.assertIn(name, output)
def test_print_dict_unicode(self):
name = six.u('\u540d\u5b57')
utils.print_dict({'name': name})
output = self.stdout.getvalue()
# In Python 2, output will be bytes, while in Python 3, it will not.
# Let's decode the value if needed.
if isinstance(output, six.binary_type):
output = output.decode('utf-8')
self.assertIn(name, output)
class HashSignedTokenTestCase(test_utils.TestCase,
testresources.ResourcedTestCase):
"""Unit tests for utils.hash_signed_token()."""

View File

@@ -1,460 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
import mock
from oslo_serialization import jsonutils
import six
from testtools import matchers
from keystoneclient import fixture
from keystoneclient.tests.unit.v2_0 import utils
DEFAULT_USERNAME = 'username'
DEFAULT_PASSWORD = 'password'
DEFAULT_TENANT_ID = 'tenant_id'
DEFAULT_TENANT_NAME = 'tenant_name'
DEFAULT_AUTH_URL = 'http://127.0.0.1:5000/v2.0/'
DEFAULT_ADMIN_URL = 'http://127.0.0.1:35357/v2.0/'
class ShellTests(utils.TestCase):
TEST_URL = DEFAULT_ADMIN_URL
def setUp(self):
"""Patch os.environ to avoid required auth info."""
super(ShellTests, self).setUp()
self.addCleanup(setattr, os, 'environ', os.environ.copy())
os.environ = {
'OS_USERNAME': DEFAULT_USERNAME,
'OS_PASSWORD': DEFAULT_PASSWORD,
'OS_TENANT_ID': DEFAULT_TENANT_ID,
'OS_TENANT_NAME': DEFAULT_TENANT_NAME,
'OS_AUTH_URL': DEFAULT_AUTH_URL,
}
import keystoneclient.shell
self.shell = keystoneclient.shell.OpenStackIdentityShell()
self.token = fixture.V2Token()
self.token.set_scope()
svc = self.token.add_service('identity')
svc.add_endpoint(public=DEFAULT_AUTH_URL,
admin=DEFAULT_ADMIN_URL)
self.stub_auth(json=self.token, base_url=DEFAULT_AUTH_URL)
def run_command(self, cmd):
orig = sys.stdout
try:
sys.stdout = six.StringIO()
if isinstance(cmd, list):
self.shell.main(cmd)
else:
self.shell.main(cmd.split())
except SystemExit:
exc_type, exc_value, exc_traceback = sys.exc_info()
self.assertEqual(exc_value.code, 0)
finally:
out = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
return out
def assert_called(self, method, path, base_url=TEST_URL):
self.assertEqual(method, self.requests_mock.last_request.method)
self.assertEqual(base_url + path.lstrip('/'),
self.requests_mock.last_request.url)
def test_user_list(self):
self.stub_url('GET', ['users'], json={'users': []})
self.run_command('user-list')
self.assert_called('GET', '/users')
def test_user_create(self):
self.stub_url('POST', ['users'], json={'user': {}})
self.run_command('user-create --name new-user')
self.assert_called('POST', '/users')
self.assertRequestBodyIs(json={'user': {'email': None,
'password': None,
'enabled': True,
'name': 'new-user',
'tenantId': None}})
@mock.patch('sys.stdin', autospec=True)
def test_user_create_password_prompt(self, mock_stdin):
self.stub_url('POST', ['users'], json={'user': {}})
with mock.patch('getpass.getpass') as mock_getpass:
del(os.environ['OS_PASSWORD'])
mock_stdin.isatty = lambda: True
mock_getpass.return_value = 'newpass'
self.run_command('user-create --name new-user --pass')
self.assert_called('POST', '/users')
self.assertRequestBodyIs(json={'user': {'email': None,
'password': 'newpass',
'enabled': True,
'name': 'new-user',
'tenantId': None}})
def test_user_get(self):
self.stub_url('GET', ['users', '1'],
json={'user': {'id': '1'}})
self.run_command('user-get 1')
self.assert_called('GET', '/users/1')
def test_user_delete(self):
self.stub_url('GET', ['users', '1'],
json={'user': {'id': '1'}})
self.stub_url('DELETE', ['users', '1'])
self.run_command('user-delete 1')
self.assert_called('DELETE', '/users/1')
def test_user_password_update(self):
self.stub_url('GET', ['users', '1'],
json={'user': {'id': '1'}})
self.stub_url('PUT', ['users', '1', 'OS-KSADM', 'password'])
self.run_command('user-password-update --pass newpass 1')
self.assert_called('PUT', '/users/1/OS-KSADM/password')
def test_user_update(self):
self.stub_url('PUT', ['users', '1'])
self.stub_url('GET', ['users', '1'],
json={"user": {"tenantId": "1",
"enabled": "true",
"id": "1",
"name": "username"}})
self.run_command('user-update --name new-user1'
' --email user@email.com --enabled true 1')
self.assert_called('PUT', '/users/1')
body = {'user': {'id': '1', 'email': 'user@email.com',
'enabled': True, 'name': 'new-user1'}}
self.assertRequestBodyIs(json=body)
required = 'User not updated, no arguments present.'
out = self.run_command('user-update 1')
self.assertThat(out, matchers.MatchesRegex(required))
self.run_command(['user-update', '--email', '', '1'])
self.assert_called('PUT', '/users/1')
self.assertRequestBodyIs(json={'user': {'id': '1', 'email': ''}})
def test_role_create(self):
self.stub_url('POST', ['OS-KSADM', 'roles'], json={'role': {}})
self.run_command('role-create --name new-role')
self.assert_called('POST', '/OS-KSADM/roles')
self.assertRequestBodyIs(json={"role": {"name": "new-role"}})
def test_role_get(self):
self.stub_url('GET', ['OS-KSADM', 'roles', '1'],
json={'role': {'id': '1'}})
self.run_command('role-get 1')
self.assert_called('GET', '/OS-KSADM/roles/1')
def test_role_list(self):
self.stub_url('GET', ['OS-KSADM', 'roles'], json={'roles': []})
self.run_command('role-list')
self.assert_called('GET', '/OS-KSADM/roles')
def test_role_delete(self):
self.stub_url('GET', ['OS-KSADM', 'roles', '1'],
json={'role': {'id': '1'}})
self.stub_url('DELETE', ['OS-KSADM', 'roles', '1'])
self.run_command('role-delete 1')
self.assert_called('DELETE', '/OS-KSADM/roles/1')
def test_user_role_add(self):
self.stub_url('GET', ['users', '1'],
json={'user': {'id': '1'}})
self.stub_url('GET', ['OS-KSADM', 'roles', '1'],
json={'role': {'id': '1'}})
self.stub_url('PUT', ['users', '1', 'roles', 'OS-KSADM', '1'])
self.run_command('user-role-add --user_id 1 --role_id 1')
self.assert_called('PUT', '/users/1/roles/OS-KSADM/1')
def test_user_role_list(self):
self.stub_url('GET', ['tenants', self.token.tenant_id],
json={'tenant': {'id': self.token.tenant_id}})
self.stub_url('GET', ['tenants', self.token.tenant_id,
'users', self.token.user_id, 'roles'],
json={'roles': []})
url = '/tenants/%s/users/%s/roles' % (self.token.tenant_id,
self.token.user_id)
self.run_command('user-role-list --user_id %s --tenant-id %s' %
(self.token.user_id, self.token.tenant_id))
self.assert_called('GET', url)
self.run_command('user-role-list --user_id %s' % self.token.user_id)
self.assert_called('GET', url)
self.run_command('user-role-list')
self.assert_called('GET', url)
def test_user_role_remove(self):
self.stub_url('GET', ['users', '1'],
json={'user': {'id': 1}})
self.stub_url('GET', ['OS-KSADM', 'roles', '1'],
json={'role': {'id': 1}})
self.stub_url('DELETE',
['users', '1', 'roles', 'OS-KSADM', '1'])
self.run_command('user-role-remove --user_id 1 --role_id 1')
self.assert_called('DELETE', '/users/1/roles/OS-KSADM/1')
def test_tenant_create(self):
self.stub_url('POST', ['tenants'], json={'tenant': {}})
self.run_command('tenant-create --name new-tenant')
self.assertRequestBodyIs(json={"tenant": {"enabled": True,
"name": "new-tenant",
"description": None}})
def test_tenant_get(self):
self.stub_url('GET', ['tenants', '2'], json={'tenant': {}})
self.run_command('tenant-get 2')
self.assert_called('GET', '/tenants/2')
def test_tenant_list(self):
self.stub_url('GET', ['tenants'], json={'tenants': []})
self.run_command('tenant-list')
self.assert_called('GET', '/tenants')
def test_tenant_update(self):
self.stub_url('GET', ['tenants', '1'],
json={'tenant': {'id': '1'}})
self.stub_url('GET', ['tenants', '2'],
json={'tenant': {'id': '2'}})
self.stub_url('POST', ['tenants', '2'],
json={'tenant': {'id': '2'}})
self.run_command('tenant-update'
' --name new-tenant1 --enabled false'
' --description desc 2')
self.assert_called('POST', '/tenants/2')
self.assertRequestBodyIs(json={"tenant": {"enabled": False,
"id": "2",
"description": "desc",
"name": "new-tenant1"}})
required = 'Tenant not updated, no arguments present.'
out = self.run_command('tenant-update 1')
self.assertThat(out, matchers.MatchesRegex(required))
def test_tenant_delete(self):
self.stub_url('GET', ['tenants', '2'],
json={'tenant': {'id': '2'}})
self.stub_url('DELETE', ['tenants', '2'])
self.run_command('tenant-delete 2')
self.assert_called('DELETE', '/tenants/2')
def test_service_create_with_required_arguments_only(self):
self.stub_url('POST', ['OS-KSADM', 'services'],
json={'OS-KSADM:service': {}})
self.run_command('service-create --type compute')
self.assert_called('POST', '/OS-KSADM/services')
json = {"OS-KSADM:service": {"type": "compute",
"name": None,
"description": None}}
self.assertRequestBodyIs(json=json)
def test_service_create_with_all_arguments(self):
self.stub_url('POST', ['OS-KSADM', 'services'],
json={'OS-KSADM:service': {}})
self.run_command('service-create --type compute '
'--name service1 --description desc1')
self.assert_called('POST', '/OS-KSADM/services')
json = {"OS-KSADM:service": {"type": "compute",
"name": "service1",
"description": "desc1"}}
self.assertRequestBodyIs(json=json)
def test_service_get(self):
self.stub_url('GET', ['OS-KSADM', 'services', '1'],
json={'OS-KSADM:service': {'id': '1'}})
self.run_command('service-get 1')
self.assert_called('GET', '/OS-KSADM/services/1')
def test_service_list(self):
self.stub_url('GET', ['OS-KSADM', 'services'],
json={'OS-KSADM:services': []})
self.run_command('service-list')
self.assert_called('GET', '/OS-KSADM/services')
def test_service_delete(self):
self.stub_url('GET', ['OS-KSADM', 'services', '1'],
json={'OS-KSADM:service': {'id': 1}})
self.stub_url('DELETE', ['OS-KSADM', 'services', '1'])
self.run_command('service-delete 1')
self.assert_called('DELETE', '/OS-KSADM/services/1')
def test_catalog(self):
self.run_command('catalog')
self.run_command('catalog --service compute')
def test_ec2_credentials_create(self):
self.stub_url('POST',
['users', self.token.user_id, 'credentials', 'OS-EC2'],
json={'credential': {}})
url = '/users/%s/credentials/OS-EC2' % self.token.user_id
self.run_command('ec2-credentials-create --tenant-id 1 '
'--user-id %s' % self.token.user_id)
self.assert_called('POST', url)
self.assertRequestBodyIs(json={'tenant_id': '1'})
self.run_command('ec2-credentials-create --tenant-id 1')
self.assert_called('POST', url)
self.assertRequestBodyIs(json={'tenant_id': '1'})
self.run_command('ec2-credentials-create')
self.assert_called('POST', url)
self.assertRequestBodyIs(json={'tenant_id': self.token.tenant_id})
def test_ec2_credentials_delete(self):
self.stub_url('DELETE',
['users', self.token.user_id,
'credentials', 'OS-EC2', '2'])
self.run_command('ec2-credentials-delete --access 2 --user-id %s' %
self.token.user_id)
url = '/users/%s/credentials/OS-EC2/2' % self.token.user_id
self.assert_called('DELETE', url)
self.run_command('ec2-credentials-delete --access 2')
self.assert_called('DELETE', url)
def test_ec2_credentials_list(self):
self.stub_url('GET',
['users', self.token.user_id, 'credentials', 'OS-EC2'],
json={'credentials': []})
self.run_command('ec2-credentials-list --user-id %s'
% self.token.user_id)
url = '/users/%s/credentials/OS-EC2' % self.token.user_id
self.assert_called('GET', url)
self.run_command('ec2-credentials-list')
self.assert_called('GET', url)
def test_ec2_credentials_get(self):
self.stub_url('GET',
['users', '1', 'credentials', 'OS-EC2', '2'],
json={'credential': {}})
self.run_command('ec2-credentials-get --access 2 --user-id 1')
self.assert_called('GET', '/users/1/credentials/OS-EC2/2')
def test_bootstrap(self):
user = {'user': {'id': '1'}}
role = {'role': {'id': '1'}}
tenant = {'tenant': {'id': '1'}}
token = fixture.V2Token(user_id=1, tenant_id=1)
token.add_role(id=1)
svc = token.add_service('identity')
svc.add_endpoint(public=DEFAULT_AUTH_URL,
admin=DEFAULT_ADMIN_URL)
self.stub_auth(json=token)
self.stub_url('POST', ['OS-KSADM', 'roles'], json=role)
self.stub_url('GET', ['OS-KSADM', 'roles', '1'], json=role)
self.stub_url('POST', ['tenants'], json=tenant)
self.stub_url('GET', ['tenants', '1'], json=tenant)
self.stub_url('POST', ['users'], json=user)
self.stub_url('GET', ['users', '1'], json=user)
self.stub_url('PUT',
['tenants', '1', 'users', '1', 'roles', 'OS-KSADM', '1'],
json=role)
self.run_command('bootstrap --user-name new-user'
' --pass 1 --role-name admin'
' --tenant-name new-tenant')
def called_anytime(method, path, json=None):
test_url = self.TEST_URL.strip('/')
for r in self.requests_mock.request_history:
if not r.method == method:
continue
if not r.url == test_url + path:
continue
if json:
json_body = jsonutils.loads(r.body)
if not json_body == json:
continue
return True
raise AssertionError('URL never called')
called_anytime('POST', '/users', {'user': {'email': None,
'password': '1',
'enabled': True,
'name': 'new-user',
'tenantId': None}})
called_anytime('POST', '/tenants', {"tenant": {"enabled": True,
"name": "new-tenant",
"description": None}})
called_anytime('POST', '/OS-KSADM/roles',
{"role": {"name": "admin"}})
called_anytime('PUT', '/tenants/1/users/1/roles/OS-KSADM/1')
def test_bash_completion(self):
self.run_command('bash-completion')
def test_help(self):
out = self.run_command('help')
required = 'usage: keystone'
self.assertThat(out, matchers.MatchesRegex(required))
def test_password_update(self):
self.stub_url('PATCH',
['OS-KSCRUD', 'users', self.token.user_id],
base_url=DEFAULT_AUTH_URL)
self.run_command('password-update --current-password oldpass'
' --new-password newpass')
self.assert_called('PATCH',
'/OS-KSCRUD/users/%s' % self.token.user_id,
base_url=DEFAULT_AUTH_URL)
self.assertRequestBodyIs(json={'user': {'original_password': 'oldpass',
'password': 'newpass'}})
def test_endpoint_create(self):
self.stub_url('GET', ['OS-KSADM', 'services', '1'],
json={'OS-KSADM:service': {'id': '1'}})
self.stub_url('POST', ['endpoints'], json={'endpoint': {}})
self.run_command('endpoint-create --service-id 1 '
'--publicurl=http://example.com:1234/go')
self.assert_called('POST', '/endpoints')
json = {'endpoint': {'adminurl': None,
'service_id': '1',
'region': 'regionOne',
'internalurl': None,
'publicurl': "http://example.com:1234/go"}}
self.assertRequestBodyIs(json=json)
def test_endpoint_list(self):
self.stub_url('GET', ['endpoints'], json={'endpoints': []})
self.run_command('endpoint-list')
self.assert_called('GET', '/endpoints')

View File

@@ -15,12 +15,10 @@ import hashlib
import logging
import sys
from oslo_utils import encodeutils
from oslo_utils import timeutils
# NOTE(stevemar): do not remove positional. We need this to stay for a while
# since versions of auth_token require it here.
from positional import positional # noqa
import prettytable
import six
from keystoneclient import exceptions
@@ -29,73 +27,6 @@ from keystoneclient import exceptions
logger = logging.getLogger(__name__)
# Decorator for cli-args
def arg(*args, **kwargs):
def _decorator(func):
# Because of the semantics of decorator composition if we just append
# to the options list positional options will appear to be backwards.
func.__dict__.setdefault('arguments', []).insert(0, (args, kwargs))
return func
return _decorator
def pretty_choice_list(l):
return ', '.join("'%s'" % i for i in l)
def print_list(objs, fields, formatters={}, order_by=None):
pt = prettytable.PrettyTable([f for f in fields],
caching=False, print_empty=False)
pt.aligns = ['l' for f in fields]
for o in objs:
row = []
for field in fields:
if field in formatters:
row.append(formatters[field](o))
else:
field_name = field.lower().replace(' ', '_')
data = getattr(o, field_name, '')
if data is None:
data = ''
row.append(data)
pt.add_row(row)
if order_by is None:
order_by = fields[0]
encoded = encodeutils.safe_encode(pt.get_string(sortby=order_by))
if six.PY3:
encoded = encoded.decode()
print(encoded)
def _word_wrap(string, max_length=0):
"""wrap long strings to be no longer than max_length."""
if max_length <= 0:
return string
return '\n'.join([string[i:i + max_length] for i in
range(0, len(string), max_length)])
def print_dict(d, wrap=0):
"""pretty table prints dictionaries.
Wrap values to max_length wrap if wrap>0
"""
pt = prettytable.PrettyTable(['Property', 'Value'],
caching=False, print_empty=False)
pt.aligns = ['l', 'l']
for (prop, value) in six.iteritems(d):
if value is None:
value = ''
value = _word_wrap(value, max_length=wrap)
pt.add_row([prop, value])
encoded = encodeutils.safe_encode(pt.get_string(sortby='Property'))
if six.PY3:
encoded = encoded.decode()
print(encoded)
def find_resource(manager, name_or_id):
"""Helper for the _find_* methods."""

View File

@@ -1,547 +0,0 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack Foundation
# Copyright 2011 Nebula, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
This module is deprecated as of the 1.7.0 release in favor of
python-openstackclient and may be removed in the 2.0.0 release.
Bug fixes are welcome, but new features should be exposed to the CLI by
python-openstackclient after being added to the python-keystoneclient library.
"""
import argparse
import getpass
import sys
from oslo_utils import strutils
import six
from keystoneclient.i18n import _
from keystoneclient import utils
from keystoneclient.v2_0 import client
CLIENT_CLASS = client.Client
ASK_FOR_PASSWORD = object()
def require_service_catalog(f):
msg = _('Configuration error: Client configured to run without a service '
'catalog. Run the client using --os-auth-url or OS_AUTH_URL, '
'instead of --os-endpoint or OS_SERVICE_ENDPOINT, for example.')
def wrapped(kc, args):
if not kc.has_service_catalog():
raise Exception(msg)
return f(kc, args)
# Change __doc__ attribute back to origin function's __doc__
wrapped.__doc__ = f.__doc__
return wrapped
@utils.arg('--tenant', '--tenant-id', metavar='<tenant>',
help='Tenant; lists all users if not specified.')
@utils.arg('--tenant_id', help=argparse.SUPPRESS)
def do_user_list(kc, args):
"""List users."""
if args.tenant:
tenant_id = utils.find_resource(kc.tenants, args.tenant).id
else:
tenant_id = None
users = kc.users.list(tenant_id=tenant_id)
utils.print_list(users, ['id', 'name', 'enabled', 'email'],
order_by='name')
@utils.arg('user', metavar='<user>', help='Name or ID of user to display.')
def do_user_get(kc, args):
"""Display user details."""
user = utils.find_resource(kc.users, args.user)
utils.print_dict(user._info)
@utils.arg('--name', metavar='<user-name>', required=True,
help='New user name (must be unique).')
@utils.arg('--tenant', '--tenant-id', metavar='<tenant>',
help='New user default tenant.')
@utils.arg('--tenant_id', help=argparse.SUPPRESS)
@utils.arg('--pass', metavar='<pass>', dest='passwd', nargs='?',
const=ASK_FOR_PASSWORD, help='New user password; '
'required for some auth backends.')
@utils.arg('--email', metavar='<email>',
help='New user email address.')
@utils.arg('--enabled', metavar='<true|false>', default=True,
help='Initial user enabled status. Default is true.')
def do_user_create(kc, args):
"""Create new user."""
if args.tenant:
tenant_id = utils.find_resource(kc.tenants, args.tenant).id
elif args.tenant_id:
tenant_id = args.tenant_id
else:
tenant_id = None
new_passwd = args.passwd
if args.passwd is ASK_FOR_PASSWORD:
new_passwd = utils.prompt_for_password()
user = kc.users.create(args.name, new_passwd, args.email,
tenant_id=tenant_id,
enabled=strutils.bool_from_string(args.enabled))
utils.print_dict(user._info)
@utils.arg('--name', metavar='<user-name>',
help='Desired new user name.')
@utils.arg('--email', metavar='<email>',
help='Desired new email address.')
@utils.arg('--enabled', metavar='<true|false>',
help='Enable or disable user.')
@utils.arg('user', metavar='<user>', help='Name or ID of user to update.')
def do_user_update(kc, args):
"""Update user's name, email, and enabled status."""
kwargs = {}
if args.name:
kwargs['name'] = args.name
if args.email is not None:
kwargs['email'] = args.email
if args.enabled:
kwargs['enabled'] = strutils.bool_from_string(args.enabled)
if not len(kwargs):
print(_("User not updated, no arguments present."))
return
user = utils.find_resource(kc.users, args.user)
try:
kc.users.update(user, **kwargs)
print(_('User has been updated.'))
except Exception as e:
print(_('Unable to update user: %s') % e)
@utils.arg('--pass', metavar='<password>', dest='passwd', required=False,
help='Desired new password.')
@utils.arg('user', metavar='<user>',
help='Name or ID of user to update password.')
def do_user_password_update(kc, args):
"""Update user password."""
user = utils.find_resource(kc.users, args.user)
new_passwd = args.passwd or utils.prompt_for_password()
if new_passwd is None:
msg = (_("\nPlease specify password using the --pass option "
"or using the prompt"))
sys.exit(msg)
kc.users.update_password(user, new_passwd)
@utils.arg('--current-password', metavar='<current-password>',
dest='currentpasswd', required=False, help='Current password, '
'Defaults to the password as set by --os-password or '
'env[OS_PASSWORD].')
@utils.arg('--new-password ', metavar='<new-password>', dest='newpasswd',
required=False, help='Desired new password.')
def do_password_update(kc, args):
"""Update own password."""
# we are prompting for these passwords if they are not passed in
# this gives users the option not to have their password
# appear in bash history etc..
currentpasswd = args.os_password
if args.currentpasswd is not None:
currentpasswd = args.currentpasswd
if currentpasswd is None:
currentpasswd = getpass.getpass(_('Current Password: '))
newpasswd = args.newpasswd
while newpasswd is None:
passwd1 = getpass.getpass(_('New Password: '))
passwd2 = getpass.getpass(_('Repeat New Password: '))
if passwd1 == passwd2:
newpasswd = passwd1
kc.users.update_own_password(currentpasswd, newpasswd)
if args.os_password != newpasswd:
print(_("You should update the password you are using to authenticate "
"to match your new password"))
@utils.arg('user', metavar='<user>', help='Name or ID of user to delete.')
def do_user_delete(kc, args):
"""Delete user."""
user = utils.find_resource(kc.users, args.user)
kc.users.delete(user)
def do_tenant_list(kc, args):
"""List all tenants."""
tenants = kc.tenants.list()
utils.print_list(tenants, ['id', 'name', 'enabled'], order_by='name')
@utils.arg('tenant', metavar='<tenant>',
help='Name or ID of tenant to display.')
def do_tenant_get(kc, args):
"""Display tenant details."""
tenant = utils.find_resource(kc.tenants, args.tenant)
utils.print_dict(tenant._info)
@utils.arg('--name', metavar='<tenant-name>', required=True,
help='New tenant name (must be unique).')
@utils.arg('--description', metavar='<tenant-description>', default=None,
help='Description of new tenant. Default is none.')
@utils.arg('--enabled', metavar='<true|false>', default=True,
help='Initial tenant enabled status. Default is true.')
def do_tenant_create(kc, args):
"""Create new tenant."""
tenant = kc.tenants.create(args.name,
description=args.description,
enabled=strutils.bool_from_string(args.enabled))
utils.print_dict(tenant._info)
@utils.arg('--name', metavar='<tenant_name>',
help='Desired new name of tenant.')
@utils.arg('--description', metavar='<tenant-description>', default=None,
help='Desired new description of tenant.')
@utils.arg('--enabled', metavar='<true|false>',
help='Enable or disable tenant.')
@utils.arg('tenant', metavar='<tenant>',
help='Name or ID of tenant to update.')
def do_tenant_update(kc, args):
"""Update tenant name, description, enabled status."""
tenant = utils.find_resource(kc.tenants, args.tenant)
kwargs = {}
if args.name:
kwargs.update({'name': args.name})
if args.description is not None:
kwargs.update({'description': args.description})
if args.enabled:
kwargs.update({'enabled': strutils.bool_from_string(args.enabled)})
if kwargs == {}:
print(_("Tenant not updated, no arguments present."))
return
tenant.update(**kwargs)
@utils.arg('tenant', metavar='<tenant>',
help='Name or ID of tenant to delete.')
def do_tenant_delete(kc, args):
"""Delete tenant."""
tenant = utils.find_resource(kc.tenants, args.tenant)
kc.tenants.delete(tenant)
@utils.arg('--type', metavar='<type>', required=True,
help='Service type (one of: identity, compute, network, '
'image, object-store, or other service identifier string).')
@utils.arg('--name', metavar='<name>',
help='Name of new service (must be unique).')
@utils.arg('--description', metavar='<service-description>',
help='Description of service.')
def do_service_create(kc, args):
"""Add service to Service Catalog."""
service = kc.services.create(args.name,
args.type,
args.description)
utils.print_dict(service._info)
def do_service_list(kc, args):
"""List all services in Service Catalog."""
services = kc.services.list()
utils.print_list(services, ['id', 'name', 'type', 'description'],
order_by='name')
@utils.arg('service', metavar='<service>',
help='Name or ID of service to display.')
def do_service_get(kc, args):
"""Display service from Service Catalog."""
service = utils.find_resource(kc.services, args.service)
utils.print_dict(service._info)
@utils.arg('service', metavar='<service>',
help='Name or ID of service to delete.')
def do_service_delete(kc, args):
"""Delete service from Service Catalog."""
service = utils.find_resource(kc.services, args.service)
kc.services.delete(service.id)
def do_role_list(kc, args):
"""List all roles."""
roles = kc.roles.list()
utils.print_list(roles, ['id', 'name'], order_by='name')
@utils.arg('role', metavar='<role>', help='Name or ID of role to display.')
def do_role_get(kc, args):
"""Display role details."""
role = utils.find_resource(kc.roles, args.role)
utils.print_dict(role._info)
@utils.arg('--name', metavar='<role-name>', required=True,
help='Name of new role.')
def do_role_create(kc, args):
"""Create new role."""
role = kc.roles.create(args.name)
utils.print_dict(role._info)
@utils.arg('role', metavar='<role>', help='Name or ID of role to delete.')
def do_role_delete(kc, args):
"""Delete role."""
role = utils.find_resource(kc.roles, args.role)
kc.roles.delete(role)
@utils.arg('--user', '--user-id', '--user_id', metavar='<user>',
required=True, help='Name or ID of user.')
@utils.arg('--role', '--role-id', '--role_id', metavar='<role>',
required=True, help='Name or ID of role.')
@utils.arg('--tenant', '--tenant-id', metavar='<tenant>',
help='Name or ID of tenant.')
@utils.arg('--tenant_id', help=argparse.SUPPRESS)
def do_user_role_add(kc, args):
"""Add role to user."""
user = utils.find_resource(kc.users, args.user)
role = utils.find_resource(kc.roles, args.role)
if args.tenant:
tenant = utils.find_resource(kc.tenants, args.tenant)
elif args.tenant_id:
tenant = args.tenant_id
else:
tenant = None
kc.roles.add_user_role(user, role, tenant)
@utils.arg('--user', '--user-id', '--user_id', metavar='<user>',
required=True, help='Name or ID of user.')
@utils.arg('--role', '--role-id', '--role_id', metavar='<role>',
required=True, help='Name or ID of role.')
@utils.arg('--tenant', '--tenant-id', metavar='<tenant>',
help='Name or ID of tenant.')
@utils.arg('--tenant_id', help=argparse.SUPPRESS)
def do_user_role_remove(kc, args):
"""Remove role from user."""
user = utils.find_resource(kc.users, args.user)
role = utils.find_resource(kc.roles, args.role)
if args.tenant:
tenant = utils.find_resource(kc.tenants, args.tenant)
elif args.tenant_id:
tenant = args.tenant_id
else:
tenant = None
kc.roles.remove_user_role(user, role, tenant)
@utils.arg('--user', '--user-id', metavar='<user>',
help='List roles granted to specified user.')
@utils.arg('--user_id', help=argparse.SUPPRESS)
@utils.arg('--tenant', '--tenant-id', metavar='<tenant>',
help='List only roles granted on specified tenant.')
@utils.arg('--tenant_id', help=argparse.SUPPRESS)
def do_user_role_list(kc, args):
"""List roles granted to a user."""
if args.tenant:
tenant_id = utils.find_resource(kc.tenants, args.tenant).id
elif args.tenant_id:
tenant_id = args.tenant_id
else:
# use the authenticated tenant id as a default
tenant_id = kc.auth_tenant_id
if args.user:
user_id = utils.find_resource(kc.users, args.user).id
elif args.user_id:
user_id = args.user_id
else:
# use the authenticated user id as a default
user_id = kc.auth_user_id
roles = kc.roles.roles_for_user(user=user_id, tenant=tenant_id)
# this makes the command output a bit more intuitive
for role in roles:
role.user_id = user_id
role.tenant_id = tenant_id
utils.print_list(roles, ['id', 'name', 'user_id', 'tenant_id'],
order_by='name')
@utils.arg('--user-id', metavar='<user-id>',
help='User ID for which to create credentials. If not specified, '
'the authenticated user will be used.')
@utils.arg('--user_id', help=argparse.SUPPRESS)
@utils.arg('--tenant-id', metavar='<tenant-id>',
help='Tenant ID for which to create credentials. If not '
'specified, the authenticated tenant ID will be used.')
@utils.arg('--tenant_id', help=argparse.SUPPRESS)
def do_ec2_credentials_create(kc, args):
"""Create EC2-compatible credentials for user per tenant."""
if not args.tenant_id:
# use the authenticated tenant id as a default
args.tenant_id = kc.auth_tenant_id
if not args.user_id:
# use the authenticated user id as a default
args.user_id = kc.auth_user_id
credentials = kc.ec2.create(args.user_id, args.tenant_id)
utils.print_dict(credentials._info)
@utils.arg('--user-id', metavar='<user-id>', help='User ID.')
@utils.arg('--user_id', help=argparse.SUPPRESS)
@utils.arg('--access', metavar='<access-key>', required=True,
help='Access Key.')
def do_ec2_credentials_get(kc, args):
"""Display EC2-compatible credentials."""
if not args.user_id:
# use the authenticated user id as a default
args.user_id = kc.auth_user_id
cred = kc.ec2.get(args.user_id, args.access)
if cred:
utils.print_dict(cred._info)
@utils.arg('--user-id', metavar='<user-id>', help='User ID.')
@utils.arg('--user_id', help=argparse.SUPPRESS)
def do_ec2_credentials_list(kc, args):
"""List EC2-compatible credentials for a user."""
if not args.user_id:
# use the authenticated user id as a default
args.user_id = kc.auth_user_id
credentials = kc.ec2.list(args.user_id)
for cred in credentials:
try:
cred.tenant = getattr(kc.tenants.get(cred.tenant_id), 'name')
except Exception:
# FIXME(dtroyer): Retrieving the tenant name fails for normal
# users; stuff in the tenant_id instead.
cred.tenant = cred.tenant_id
utils.print_list(credentials, ['tenant', 'access', 'secret'])
@utils.arg('--user-id', metavar='<user-id>', help='User ID.')
@utils.arg('--user_id', help=argparse.SUPPRESS)
@utils.arg('--access', metavar='<access-key>', required=True,
help='Access Key.')
def do_ec2_credentials_delete(kc, args):
"""Delete EC2-compatible credentials."""
if not args.user_id:
# use the authenticated user id as a default
args.user_id = kc.auth_user_id
try:
kc.ec2.delete(args.user_id, args.access)
print(_('Credential has been deleted.'))
except Exception as e:
print(_('Unable to delete credential: %s') % e)
@utils.arg('--service', metavar='<service-type>', default=None,
help='Service type to return.')
@require_service_catalog
def do_catalog(kc, args):
"""List service catalog, possibly filtered by service."""
endpoints = kc.service_catalog.get_endpoints(service_type=args.service)
for (service, service_endpoints) in six.iteritems(endpoints):
if len(service_endpoints) > 0:
print(_("Service: %s") % service)
for ep in service_endpoints:
utils.print_dict(ep)
@utils.arg('--service', metavar='<service-type>', required=True,
help='Service type to select.')
@utils.arg('--endpoint-type', metavar='<endpoint-type>', default='publicURL',
help='Endpoint type to select.')
@utils.arg('--endpoint_type', default='publicURL',
help=argparse.SUPPRESS)
@utils.arg('--attr', metavar='<service-attribute>',
help='Service attribute to match for selection.')
@utils.arg('--value', metavar='<value>',
help='Value of attribute to match.')
@require_service_catalog
def do_endpoint_get(kc, args):
"""Find endpoint filtered by a specific attribute or service type."""
kwargs = {
'service_type': args.service,
'endpoint_type': args.endpoint_type,
}
if args.attr and args.value:
kwargs.update({'attr': args.attr, 'filter_value': args.value})
elif args.attr or args.value:
print(_('Both --attr and --value required.'))
return
url = kc.service_catalog.url_for(**kwargs)
utils.print_dict({'%s.%s' % (args.service, args.endpoint_type): url})
def do_endpoint_list(kc, args):
"""List configured service endpoints."""
endpoints = kc.endpoints.list()
utils.print_list(endpoints,
['id', 'region', 'publicurl',
'internalurl', 'adminurl', 'service_id'])
@utils.arg('--region', metavar='<endpoint-region>',
help='Endpoint region.', default='regionOne')
@utils.arg('--service', '--service-id', '--service_id',
metavar='<service>', required=True,
help='Name or ID of service associated with endpoint.')
@utils.arg('--publicurl', metavar='<public-url>', required=True,
help='Public URL endpoint.')
@utils.arg('--adminurl', metavar='<admin-url>',
help='Admin URL endpoint.')
@utils.arg('--internalurl', metavar='<internal-url>',
help='Internal URL endpoint.')
def do_endpoint_create(kc, args):
"""Create a new endpoint associated with a service."""
service_id = utils.find_resource(kc.services, args.service).id
endpoint = kc.endpoints.create(args.region,
service_id,
args.publicurl,
args.adminurl,
args.internalurl)
utils.print_dict(endpoint._info)
@utils.arg('id', metavar='<endpoint-id>', help='ID of endpoint to delete.')
def do_endpoint_delete(kc, args):
"""Delete a service endpoint."""
try:
kc.endpoints.delete(args.id)
print(_('Endpoint has been deleted.'))
except Exception:
print(_('Unable to delete endpoint.'))
@utils.arg('--wrap', metavar='<integer>', default=0,
help='Wrap PKI tokens to a specified length, or 0 to disable.')
@require_service_catalog
def do_token_get(kc, args):
"""Display the current user token."""
utils.print_dict(kc.service_catalog.get_token(),
wrap=int(args.wrap))

View File

@@ -0,0 +1,7 @@
---
prelude: >
The ``keystone`` CLI has been removed.
other:
- The ``keystone`` CLI has been removed, using the ``openstack``
CLI is recommended. This feature has been deprecated since the
Liberty release of Keystone.

View File

@@ -12,7 +12,6 @@ oslo.i18n>=2.1.0 # Apache-2.0
oslo.serialization>=1.10.0 # Apache-2.0
oslo.utils>=3.5.0 # Apache-2.0
positional>=1.0.1 # Apache-2.0
PrettyTable<0.8,>=0.7 # BSD
requests!=2.9.0,>=2.8.1 # Apache-2.0
six>=1.9.0 # MIT
stevedore>=1.5.0 # Apache-2.0

View File

@@ -23,9 +23,6 @@ packages =
keystoneclient
[entry_points]
console_scripts =
keystone = keystoneclient.shell:main
keystoneclient.auth.plugin =
password = keystoneclient.auth.identity.generic:Password
token = keystoneclient.auth.identity.generic:Token