Blacken code

Another library down.

Change-Id: Id29f29331ba994a1f09376763702fcca82ec6f1c
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2023-08-30 19:27:40 +01:00
parent 6304b384bb
commit ab7cdb4c25
37 changed files with 1294 additions and 901 deletions

View File

@ -31,12 +31,19 @@ def run_apidoc(app):
return
run_already = True
package_dir = path.abspath(path.join(app.srcdir, '..', '..',
'osc_lib'))
package_dir = path.abspath(path.join(app.srcdir, '..', '..', 'osc_lib'))
source_dir = path.join(app.srcdir, 'api')
apidoc.main(['apidoc', package_dir, '-f',
'-H', 'osc-lib Modules',
'-o', source_dir])
apidoc.main(
[
'apidoc',
package_dir,
'-f',
'-H',
'osc-lib Modules',
'-o',
source_dir,
]
)
def setup(app):

View File

@ -17,21 +17,24 @@ import sys
# NOTE(blk-u): Path for our Sphinx extension, remove when
# https://launchpad.net/bugs/1260495 is fixed.
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
sys.path.insert(
0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
)
# -- General configuration ----------------------------------------------------
# If your documentation needs a minimal Sphinx version, state it here.
#needs_sphinx = '1.0'
# needs_sphinx = '1.0'
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc',
'sphinx.ext.doctest',
'sphinx.ext.todo',
'openstackdocstheme',
'sphinxcontrib.apidoc',
]
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.doctest',
'sphinx.ext.todo',
'openstackdocstheme',
'sphinxcontrib.apidoc',
]
# openstackdocstheme options
openstackdocs_repo_name = 'openstack/osc-lib'
@ -39,13 +42,13 @@ openstackdocs_auto_name = False
openstackdocs_use_storyboard = True
# Add any paths that contain templates here, relative to this directory.
#templates_path = ['_templates']
# templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8-sig'
# source_encoding = 'utf-8-sig'
# The master toctree document.
master_doc = 'index'
@ -55,13 +58,13 @@ project = 'OpenStackClient CLI Base'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# today_fmt = '%B %d, %Y'
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
@ -69,18 +72,18 @@ exclude_patterns = []
# The reST default role (used for this markup: `text`) to use for all
# documents.
#default_role = None
# default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
#add_function_parentheses = True
# add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'native'
@ -93,76 +96,76 @@ modindex_common_prefix = ['osc_lib.']
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#html_theme_path = ["."]
#html_theme = '_theme'
# html_theme_path = ["."]
# html_theme = '_theme'
html_theme = 'openstackdocs'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# html_theme_options = {}
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
#html_static_path = ['_static']
# html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# html_additional_pages = {}
# If false, no module index is generated.
#html_domain_indices = True
# html_domain_indices = True
# If false, no index is generated.
#html_use_index = True
# html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# html_show_sourcelink = True
# If true, "Created using Sphinx" is shown in the HTML footer. Default is True.
#html_show_sphinx = True
# html_show_sphinx = True
# If true, "(C) Copyright ..." is shown in the HTML footer. Default is True.
#html_show_copyright = True
# html_show_copyright = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# html_use_opensearch = ''
# This is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = None
# html_file_suffix = None
# Output file base name for HTML help builder.
htmlhelp_basename = 'OpenStackCommandLineClientdoc'
@ -171,54 +174,55 @@ htmlhelp_basename = 'OpenStackCommandLineClientdoc'
# -- Options for LaTeX output -------------------------------------------------
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
# The paper size ('letterpaper' or 'a4paper').
#'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
#'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
#'preamble': '',
}
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual])
# .
latex_documents = [
('index', 'OpenStackCommandLineClient.tex',
'OpenStack Command Line Client Documentation',
'OpenStack'),
(
'index',
'OpenStackCommandLineClient.tex',
'OpenStack Command Line Client Documentation',
'OpenStack',
),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# latex_use_parts = False
# If true, show page references after internal links.
#latex_show_pagerefs = False
# latex_show_pagerefs = False
# If true, show URL addresses after external links.
#latex_show_urls = False
# latex_show_urls = False
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# latex_appendices = []
# If false, no module index is generated.
#latex_domain_indices = True
# latex_domain_indices = True
# -- Options for manual page output -------------------------------------------
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
#man_pages = []
# man_pages = []
# If true, show URL addresses after external links.
#man_show_urls = False
# man_show_urls = False
# -- Options for Texinfo output -----------------------------------------------
@ -227,21 +231,25 @@ latex_documents = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
('index', 'OpenStackCommandLineClient',
'OpenStack Command Line Client Documentation',
'OpenStack', 'OpenStackCommandLineClient',
'One line description of project.',
'Miscellaneous'),
(
'index',
'OpenStackCommandLineClient',
'OpenStack Command Line Client Documentation',
'OpenStack',
'OpenStackCommandLineClient',
'One line description of project.',
'Miscellaneous',
),
]
# Documents to append as an appendix to all manuals.
#texinfo_appendices = []
# texinfo_appendices = []
# If false, no module index is generated.
#texinfo_domain_indices = True
# texinfo_domain_indices = True
# How to display URL addresses: 'footnote', 'no', or 'inline'.
#texinfo_show_urls = 'footnote'
# texinfo_show_urls = 'footnote'
# -- Options for sphinxcontrib.apidoc ----------------------------------------

View File

@ -42,11 +42,7 @@ class BaseAPI(object):
HEADER_NAME = "OpenStack-API-Version"
def __init__(
self,
session=None,
service_type=None,
endpoint=None,
**kwargs
self, session=None, service_type=None, endpoint=None, **kwargs
):
"""Base object that contains some common API objects and methods
@ -140,13 +136,7 @@ class BaseAPI(object):
# The basic action methods all take a Session and return dict/lists
def create(
self,
url,
session=None,
method=None,
**params
):
def create(self, url, session=None, method=None, **params):
"""Create a new resource
:param string url:
@ -166,12 +156,7 @@ class BaseAPI(object):
except json.JSONDecodeError:
return ret
def delete(
self,
url,
session=None,
**params
):
def delete(self, url, session=None, **params):
"""Delete a resource
:param string url:
@ -292,9 +277,7 @@ class BaseAPI(object):
if len(data) > 1:
msg = _("Multiple %(resource)s exist with %(attr)s='%(value)s'")
raise exceptions.CommandError(
msg % {'resource': resource,
'attr': attr,
'value': value}
msg % {'resource': resource, 'attr': attr, 'value': value}
)
# Search by id
@ -304,17 +287,10 @@ class BaseAPI(object):
return data[0]
msg = _("No %(resource)s with a %(attr)s or ID of '%(value)s' found")
raise exceptions.CommandError(
msg % {'resource': resource,
'attr': attr,
'value': value}
msg % {'resource': resource, 'attr': attr, 'value': value}
)
def find_bulk(
self,
path,
headers=None,
**kwargs
):
def find_bulk(self, path, headers=None, **kwargs):
"""Bulk load and filter locally
:param string path:
@ -342,11 +318,7 @@ class BaseAPI(object):
return ret
def find_one(
self,
path,
**kwargs
):
def find_one(self, path, **kwargs):
"""Find a resource by name or ID
:param string path:
@ -390,7 +362,8 @@ class BaseAPI(object):
try:
ret = self._request(
'GET', "/%s/%s" % (path, value),
'GET',
"/%s/%s" % (path, value),
headers=headers,
).json()
if isinstance(ret, dict):
@ -404,11 +377,7 @@ class BaseAPI(object):
if attr:
kwargs = {attr: value}
try:
ret = self.find_one(
path,
headers=headers,
**kwargs
)
ret = self.find_one(path, headers=headers, **kwargs)
except (
exceptions.NotFound,
ksa_exceptions.NotFound,

View File

@ -53,7 +53,8 @@ def get_options_list():
os_name = o.name.lower().replace('_', '-')
os_env_name = 'OS_' + os_name.upper().replace('-', '_')
OPTIONS_LIST.setdefault(
os_name, {'env': os_env_name, 'help': ''},
os_name,
{'env': os_env_name, 'help': ''},
)
# TODO(mhu) simplistic approach, would be better to only add
# help texts if they vary from one auth plugin to another
@ -67,19 +68,23 @@ def get_options_list():
def check_valid_authorization_options(options, auth_plugin_name):
"""Validate authorization options, and provide helpful error messages."""
if (options.auth.get('project_id') and not
options.auth.get('domain_id') and not
options.auth.get('domain_name') and not
options.auth.get('project_name') and not
options.auth.get('tenant_id') and not
options.auth.get('tenant_name')):
raise exc.CommandError(_(
'Missing parameter(s): '
'Set either a project or a domain scope, but not both. Set a '
'project scope with --os-project-name, OS_PROJECT_NAME, or '
'auth.project_name. Alternatively, set a domain scope with '
'--os-domain-name, OS_DOMAIN_NAME or auth.domain_name.'
))
if (
options.auth.get('project_id')
and not options.auth.get('domain_id')
and not options.auth.get('domain_name')
and not options.auth.get('project_name')
and not options.auth.get('tenant_id')
and not options.auth.get('tenant_name')
):
raise exc.CommandError(
_(
'Missing parameter(s): '
'Set either a project or a domain scope, but not both. Set a '
'project scope with --os-project-name, OS_PROJECT_NAME, or '
'auth.project_name. Alternatively, set a domain scope with '
'--os-domain-name, OS_DOMAIN_NAME or auth.domain_name.'
)
)
def check_valid_authentication_options(options, auth_plugin_name):
@ -102,31 +107,34 @@ def check_valid_authentication_options(options, auth_plugin_name):
# when no auth params are passed in, user advised to use os-cloud
if not options.auth and auth_plugin_name != 'none':
msgs.append(_(
'Set a cloud-name with --os-cloud or OS_CLOUD'
))
msgs.append(_('Set a cloud-name with --os-cloud or OS_CLOUD'))
else:
if ('password' in plugin_opts and not
(options.auth.get('username') or options.auth.get('user_id'))):
msgs.append(_(
'Set a username with --os-username, OS_USERNAME,'
' or auth.username'
' or set a user-id with --os-user-id, OS_USER_ID,'
' or auth.user_id'
))
if 'password' in plugin_opts and not (
options.auth.get('username') or options.auth.get('user_id')
):
msgs.append(
_(
'Set a username with --os-username, OS_USERNAME,'
' or auth.username'
' or set a user-id with --os-user-id, OS_USER_ID,'
' or auth.user_id'
)
)
if 'auth_url' in plugin_opts and not options.auth.get('auth_url'):
msgs.append(_(
'Set an authentication URL, with --os-auth-url,'
' OS_AUTH_URL or auth.auth_url'
))
msgs.append(
_(
'Set an authentication URL, with --os-auth-url,'
' OS_AUTH_URL or auth.auth_url'
)
)
if 'url' in plugin_opts and not options.auth.get('url'):
msgs.append(_(
'Set a service URL, with --os-url, OS_URL or auth.url'
))
msgs.append(
_('Set a service URL, with --os-url, OS_URL or auth.url')
)
if 'token' in plugin_opts and not options.auth.get('token'):
msgs.append(_(
'Set a token with --os-token, OS_TOKEN or auth.token'
))
msgs.append(
_('Set a token with --os-token, OS_TOKEN or auth.token')
)
if msgs:
raise exc.CommandError(
@ -147,20 +155,21 @@ def build_auth_plugins_option_parser(parser):
metavar='<auth-type>',
dest='auth_type',
default=utils.env('OS_AUTH_TYPE'),
help=_('Select an authentication type. Available types: %s.'
' Default: selected based on --os-username/--os-token'
' (Env: OS_AUTH_TYPE)') % ', '.join(available_plugins),
choices=available_plugins
help=_(
'Select an authentication type. Available types: %s.'
' Default: selected based on --os-username/--os-token'
' (Env: OS_AUTH_TYPE)'
)
% ', '.join(available_plugins),
choices=available_plugins,
)
# Maintain compatibility with old tenant env vars
envs = {
'OS_PROJECT_NAME': utils.env(
'OS_PROJECT_NAME',
default=utils.env('OS_TENANT_NAME')
'OS_PROJECT_NAME', default=utils.env('OS_TENANT_NAME')
),
'OS_PROJECT_ID': utils.env(
'OS_PROJECT_ID',
default=utils.env('OS_TENANT_ID')
'OS_PROJECT_ID', default=utils.env('OS_TENANT_ID')
),
}
for o in get_options_list():
@ -174,7 +183,8 @@ def build_auth_plugins_option_parser(parser):
OPTIONS_LIST[o]['env'],
utils.env(OPTIONS_LIST[o]['env']),
),
help=_('%(help)s\n(Env: %(env)s)') % {
help=_('%(help)s\n(Env: %(env)s)')
% {
'help': OPTIONS_LIST[o]['help'],
'env': OPTIONS_LIST[o]['env'],
},
@ -196,10 +206,14 @@ def build_auth_plugins_option_parser(parser):
return parser
def get_keystone2keystone_auth(local_auth, service_provider,
project_id=None, project_name=None,
project_domain_id=None,
project_domain_name=None):
def get_keystone2keystone_auth(
local_auth,
service_provider,
project_id=None,
project_name=None,
project_domain_id=None,
project_domain_name=None,
):
"""Return Keystone 2 Keystone authentication for service provider.
:param local_auth: authentication to use with the local Keystone
@ -210,9 +224,11 @@ def get_keystone2keystone_auth(local_auth, service_provider,
:param project_domain_name: name of domain to in the service provider
:return: Keystone2Keystone auth object for service provider
"""
return k2k.Keystone2Keystone(local_auth,
service_provider,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name)
return k2k.Keystone2Keystone(
local_auth,
service_provider,
project_id=project_id,
project_name=project_name,
project_domain_id=project_domain_id,
project_domain_name=project_domain_name,
)

View File

@ -60,8 +60,11 @@ def simple_filter(
if attr in d:
# Searching data fields
search_value = d[attr]
elif (property_field and property_field in d and
isinstance(d[property_field], dict)):
elif (
property_field
and property_field in d
and isinstance(d[property_field], dict)
):
# Searching a properties field - do this separately because
# we don't want to fail over to checking the fields if a
# property name is given.

View File

@ -26,7 +26,6 @@ LOG = logging.getLogger(__name__)
# Sublcass OpenStackConfig in order to munge config values
# before auth plugins are loaded
class OSC_Config(config.OpenStackConfig):
def _auth_select_default_plugin(self, config):
"""Select a default plugin based on supplied arguments
@ -66,7 +65,7 @@ class OSC_Config(config.OpenStackConfig):
Migrated from auth.build_auth_params()
"""
if ('auth_type' in config and config['auth_type'].startswith("v2")):
if 'auth_type' in config and config['auth_type'].startswith("v2"):
if 'project_id' in config['auth']:
config['auth']['tenant_id'] = config['auth']['project_id']
if 'project_name' in config['auth']:
@ -82,8 +81,9 @@ class OSC_Config(config.OpenStackConfig):
# NOTE(hieulq): If USER_DOMAIN_NAME, USER_DOMAIN_ID, PROJECT_DOMAIN_ID
# or PROJECT_DOMAIN_NAME is present and API_VERSION is 2.0, then
# ignore all domain related configs.
if (str(config.get('identity_api_version', '')).startswith('2') and
config.get('auth_type').endswith('password')):
if str(config.get('identity_api_version', '')).startswith(
'2'
) and config.get('auth_type').endswith('password'):
domain_props = [
'project_domain_id',
'project_domain_name',
@ -95,12 +95,14 @@ class OSC_Config(config.OpenStackConfig):
if config.get('cloud'):
LOG.warning(
"Ignoring domain related config %s for %s"
"because identity API version is 2.0" % (
prop, config['cloud']))
"because identity API version is 2.0"
% (prop, config['cloud'])
)
else:
LOG.warning(
"Ignoring domain related config %s because"
" identity API version is 2.0" % prop)
" identity API version is 2.0" % prop
)
return config
def _auth_default_domain(self, config):
@ -115,17 +117,18 @@ class OSC_Config(config.OpenStackConfig):
# TODO(mordred): This is a usability improvement that's broadly useful
# We should port it back up into os-client-config.
default_domain = config.get('default_domain', None)
if (identity_version == '3' and
not auth_type.startswith('v2') and
default_domain):
if (
identity_version == '3'
and not auth_type.startswith('v2')
and default_domain
):
# NOTE(stevemar): If PROJECT_DOMAIN_ID or PROJECT_DOMAIN_NAME is
# present, then do not change the behaviour. Otherwise, set the
# PROJECT_DOMAIN_ID to 'OS_DEFAULT_DOMAIN' for better usability.
if (
auth_type in ("password", "v3password", "v3totp") and
not config['auth'].get('project_domain_id') and
not config['auth'].get('project_domain_name')
auth_type in ("password", "v3password", "v3totp")
and not config['auth'].get('project_domain_id')
and not config['auth'].get('project_domain_name')
):
config['auth']['project_domain_id'] = default_domain
@ -136,9 +139,9 @@ class OSC_Config(config.OpenStackConfig):
# TODO(dtroyer): Move this to os-client-config after the plugin has
# been loaded so we can check directly if the options are accepted.
if (
auth_type in ("password", "v3password", "v3totp") and
not config['auth'].get('user_domain_id') and
not config['auth'].get('user_domain_name')
auth_type in ("password", "v3password", "v3totp")
and not config['auth'].get('user_domain_id')
and not config['auth'].get('user_domain_name')
):
config['auth']['user_domain_id'] = default_domain
return config
@ -156,8 +159,9 @@ class OSC_Config(config.OpenStackConfig):
config = self._auth_default_domain(config)
if LOG.isEnabledFor(logging.DEBUG):
LOG.debug("auth_config_hook(): %s",
strutils.mask_password(str(config)))
LOG.debug(
"auth_config_hook(): %s", strutils.mask_password(str(config))
)
return config
def _validate_auth(self, config, loader, fixed_argparse=None):
@ -176,7 +180,8 @@ class OSC_Config(config.OpenStackConfig):
winning_value = self._find_winning_auth_value(p_opt, config)
if not winning_value:
winning_value = self._find_winning_auth_value(
p_opt, config['auth'])
p_opt, config['auth']
)
# if the plugin tells us that this value is required
# then error if it's doesn't exist now
@ -184,7 +189,8 @@ class OSC_Config(config.OpenStackConfig):
msgs.append(
'Missing value {auth_key}'
' required for auth plugin {plugin}'.format(
auth_key=p_opt.name, plugin=config.get('auth_type'),
auth_key=p_opt.name,
plugin=config.get('auth_type'),
)
)
@ -198,17 +204,18 @@ class OSC_Config(config.OpenStackConfig):
# Prefer the plugin configuration dest value if the value's key
# is marked as depreciated.
if p_opt.dest is None:
config['auth'][p_opt.name.replace('-', '_')] = (
winning_value)
config['auth'][
p_opt.name.replace('-', '_')
] = winning_value
else:
config['auth'][p_opt.dest] = winning_value
# See if this needs a prompting
if (
'prompt' in vars(p_opt) and
p_opt.prompt is not None and
p_opt.dest not in config['auth'] and
self._pw_callback is not None
'prompt' in vars(p_opt)
and p_opt.prompt is not None
and p_opt.dest not in config['auth']
and self._pw_callback is not None
):
# Defer these until we know all required opts are present
prompt_options.append(p_opt)

View File

@ -25,14 +25,16 @@ def add_project_owner_option_to_parser(parser):
parser.add_argument(
'--project',
metavar='<project>',
help=_("Owner's project (name or ID)")
help=_("Owner's project (name or ID)"),
)
parser.add_argument(
'--project-domain',
metavar='<project-domain>',
help=_('Domain the project belongs to (name or ID). '
'This can be used in case collisions between project names '
'exist.'),
help=_(
'Domain the project belongs to (name or ID). '
'This can be used in case collisions between project names '
'exist.'
),
)
@ -55,14 +57,15 @@ def find_project(sdk_connection, name_or_id, domain_name_or_id=None):
"""
try:
if domain_name_or_id:
domain = sdk_connection.identity.find_domain(domain_name_or_id,
ignore_missing=False)
domain = sdk_connection.identity.find_domain(
domain_name_or_id, ignore_missing=False
)
domain_id = domain.id
else:
domain_id = None
return sdk_connection.identity.find_project(name_or_id,
ignore_missing=False,
domain_id=domain_id)
return sdk_connection.identity.find_project(
name_or_id, ignore_missing=False, domain_id=domain_id
)
# NOTE: OpenStack SDK raises HttpException for 403 response code.
# There is no specific exception class at now, so we need to catch
# HttpException and check the status code.

View File

@ -84,8 +84,15 @@ class MultiKeyValueAction(argparse.Action):
And comma(',') and equal('=') may not be used in the key or value.
"""
def __init__(self, option_strings, dest, nargs=None,
required_keys=None, optional_keys=None, **kwargs):
def __init__(
self,
option_strings,
dest,
nargs=None,
required_keys=None,
optional_keys=None,
**kwargs
):
"""Initialize the action object, and parse customized options
Required keys and optional keys can be specified when initializing
@ -99,8 +106,9 @@ class MultiKeyValueAction(argparse.Action):
msg = _("Parameter 'nargs' is not allowed, but got %s")
raise ValueError(msg % nargs)
super(MultiKeyValueAction, self).__init__(option_strings,
dest, **kwargs)
super(MultiKeyValueAction, self).__init__(
option_strings, dest, **kwargs
)
# required_keys: A list of keys that is required. None by default.
if required_keys and not isinstance(required_keys, list):
@ -128,10 +136,13 @@ class MultiKeyValueAction(argparse.Action):
"Invalid keys %(invalid_keys)s specified.\n"
"Valid keys are: %(valid_keys)s"
)
raise argparse.ArgumentTypeError(msg % {
'invalid_keys': ', '.join(invalid_keys),
'valid_keys': ', '.join(valid_keys),
})
raise argparse.ArgumentTypeError(
msg
% {
'invalid_keys': ', '.join(invalid_keys),
'valid_keys': ', '.join(valid_keys),
}
)
if self.required_keys:
missing_keys = [k for k in self.required_keys if k not in keys]
@ -140,10 +151,13 @@ class MultiKeyValueAction(argparse.Action):
"Missing required keys %(missing_keys)s.\n"
"Required keys are: %(required_keys)s"
)
raise argparse.ArgumentTypeError(msg % {
'missing_keys': ', '.join(missing_keys),
'required_keys': ', '.join(self.required_keys),
})
raise argparse.ArgumentTypeError(
msg
% {
'missing_keys': ', '.join(missing_keys),
'required_keys': ', '.join(self.required_keys),
}
)
def __call__(self, parser, namespace, values, metavar=None):
# Make sure we have an empty list rather than None
@ -245,10 +259,14 @@ class RangeAction(argparse.Action):
setattr(namespace, self.dest, (int(range[0]), int(range[1])))
else:
msg = _("Invalid range, %(min)s is not less than %(max)s")
raise argparse.ArgumentError(self, msg % {
'min': range[0],
'max': range[1],
})
raise argparse.ArgumentError(
self,
msg
% {
'min': range[0],
'max': range[1],
},
)
else:
# Too many values
msg = _("Invalid range, too many values")

View File

@ -18,7 +18,7 @@
import copy
import logging
from openstack.config import loader as config # noqa
from openstack.config import loader as config # noqa
from openstack import connection
from oslo_utils import strutils
@ -145,13 +145,16 @@ class ClientManager(object):
# Horrible hack alert...must handle prompt for null password if
# password auth is requested.
if (self.auth_plugin_name.endswith('password') and
not self._cli_options.auth.get('password')):
if self.auth_plugin_name.endswith(
'password'
) and not self._cli_options.auth.get('password'):
self._cli_options.auth['password'] = self._pw_callback()
LOG.info('Using auth plugin: %s', self.auth_plugin_name)
LOG.debug('Using parameters %s',
strutils.mask_password(self._cli_options.auth))
LOG.debug(
'Using parameters %s',
strutils.mask_password(self._cli_options.auth),
)
self.auth = self._cli_options.get_auth()
if self._cli_options.service_provider:
@ -161,7 +164,7 @@ class ClientManager(object):
self._cli_options.remote_project_id,
self._cli_options.remote_project_name,
self._cli_options.remote_project_domain_id,
self._cli_options.remote_project_domain_name
self._cli_options.remote_project_domain_name,
)
self.session = self._cli_options.get_session()
@ -193,8 +196,10 @@ class ClientManager(object):
@property
def auth_ref(self):
"""Dereference will trigger an auth if it hasn't already"""
if (not self._auth_required or
self._cli_options.config['auth_type'] == 'none'):
if (
not self._auth_required
or self._cli_options.config['auth_type'] == 'none'
):
# Forcibly skip auth if we know we do not need it
return None
if not self._auth_ref:
@ -230,8 +235,9 @@ class ClientManager(object):
LOG.debug("No service catalog")
return service_available
def get_endpoint_for_service_type(self, service_type, region_name=None,
interface='public'):
def get_endpoint_for_service_type(
self, service_type, region_name=None, interface='public'
):
"""Return the endpoint URL for the service type."""
# Overrides take priority unconditionally
override = self._override_for(service_type)

View File

@ -24,25 +24,26 @@ from osc_lib.i18n import _
class CommandMeta(abc.ABCMeta):
def __new__(mcs, name, bases, cls_dict):
if 'log' not in cls_dict:
cls_dict['log'] = logging.getLogger(
cls_dict['__module__'] + '.' + name)
cls_dict['__module__'] + '.' + name
)
return super(CommandMeta, mcs).__new__(mcs, name, bases, cls_dict)
class Command(command.Command, metaclass=CommandMeta):
def run(self, parsed_args):
self.log.debug('run(%s)', parsed_args)
return super(Command, self).run(parsed_args)
def validate_os_beta_command_enabled(self):
if not self.app.options.os_beta_command:
msg = _('Caution: This is a beta command and subject to '
'change. Use global option --os-beta-command '
'to enable this command.')
msg = _(
'Caution: This is a beta command and subject to '
'change. Use global option --os-beta-command '
'to enable this command.'
)
raise exceptions.CommandError(msg)
def deprecated_option_warning(self, old_option, new_option):

View File

@ -18,4 +18,5 @@ import cliff.commandmanager
class CommandManager(cliff.commandmanager.CommandManager):
"""Noop subclass for transition purposes."""
pass

View File

@ -26,6 +26,7 @@ class AuthorizationFailure(Exception):
class PluginAttributeError(Exception):
"""A plugin threw an AttributeError while being lazily loaded."""
# This *must not* inherit from AttributeError;
# that would defeat the whole purpose.
pass
@ -33,21 +34,25 @@ class PluginAttributeError(Exception):
class NoTokenLookupException(Exception):
"""This does not support looking up endpoints from an existing token."""
pass
class EndpointNotFound(Exception):
"""Could not find Service or Region in Service Catalog."""
pass
class UnsupportedVersion(Exception):
"""The user is trying to use an unsupported version of the API"""
pass
class InvalidValue(Exception):
"""An argument value is not valid: wrong type, out of range, etc"""
message = "Supplied value is not valid"
@ -68,36 +73,42 @@ class ClientException(Exception):
class BadRequest(ClientException):
"""HTTP 400 - Bad request: you sent some malformed data."""
http_status = 400
message = "Bad request"
class Unauthorized(ClientException):
"""HTTP 401 - Unauthorized: bad credentials."""
http_status = 401
message = "Unauthorized"
class Forbidden(ClientException):
"""HTTP 403 - Forbidden: not authorized to access to this resource."""
http_status = 403
message = "Forbidden"
class NotFound(ClientException):
"""HTTP 404 - Not found"""
http_status = 404
message = "Not found"
class Conflict(ClientException):
"""HTTP 409 - Conflict"""
http_status = 409
message = "Conflict"
class OverLimit(ClientException):
"""HTTP 413 - Over limit: reached the API limits for this time period."""
http_status = 413
message = "Over limit"
@ -105,6 +116,7 @@ class OverLimit(ClientException):
# NotImplemented is a python keyword.
class HTTPNotImplemented(ClientException):
"""HTTP 501 - Not Implemented: server does not support this operation."""
http_status = 501
message = "Not Implemented"
@ -115,11 +127,14 @@ class HTTPNotImplemented(ClientException):
# for c in ClientException.__subclasses__())
#
# Instead, we have to hardcode it:
_code_map = dict((c.http_status, c) for c in [
BadRequest,
Unauthorized,
Forbidden,
NotFound,
OverLimit,
HTTPNotImplemented
])
_code_map = dict(
(c.http_status, c)
for c in [
BadRequest,
Unauthorized,
Forbidden,
NotFound,
OverLimit,
HTTPNotImplemented,
]
)

View File

@ -81,8 +81,10 @@ def set_warning_filter(log_level):
class _FileFormatter(logging.Formatter):
"""Customize the logging format for logging handler"""
_LOG_MESSAGE_BEGIN = (
'%(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s ')
'%(asctime)s.%(msecs)03d %(process)d %(levelname)s %(name)s '
)
_LOG_MESSAGE_CONTEXT = '[%(cloud)s %(username)s %(project)s] '
_LOG_MESSAGE_END = '%(message)s'
_LOG_DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
@ -102,16 +104,17 @@ class _FileFormatter(logging.Formatter):
'username': config.auth.get('username', ''),
}
if context:
self.fmt = (self._LOG_MESSAGE_BEGIN +
(self._LOG_MESSAGE_CONTEXT % context) +
self._LOG_MESSAGE_END)
self.fmt = (
self._LOG_MESSAGE_BEGIN
+ (self._LOG_MESSAGE_CONTEXT % context)
+ self._LOG_MESSAGE_END
)
else:
self.fmt = self._LOG_MESSAGE_BEGIN + self._LOG_MESSAGE_END
logging.Formatter.__init__(self, self.fmt, self._LOG_DATE_FORMAT)
class LogConfigurator(object):
_CONSOLE_MESSAGE_FORMAT = '%(message)s'
def __init__(self, options):
@ -183,7 +186,7 @@ class LogConfigurator(object):
for k in logconfig.keys():
level = log_level_from_string(logconfig[k])
logging.getLogger(k).setLevel(level)
if (highest_level < level):
if highest_level < level:
highest_level = level
self.console_logger.setLevel(highest_level)
if self.file_logger:

View File

@ -63,29 +63,32 @@ def prompt_for_password(prompt=None):
pass
# No password because we did't have a tty or nothing was entered
if not pw:
raise exc.CommandError(_("No password entered, or found via"
" --os-password or OS_PASSWORD"),)
raise exc.CommandError(
_(
"No password entered, or found via"
" --os-password or OS_PASSWORD"
),
)
return pw
class OpenStackShell(app.App):
CONSOLE_MESSAGE_FORMAT = '%(levelname)s: %(name)s %(message)s'
log = logging.getLogger(__name__)
timing_data = []
def __init__(
self,
description=None,
version=None,
command_manager=None,
stdin=None,
stdout=None,
stderr=None,
interactive_app_factory=None,
deferred_help=False,
self,
description=None,
version=None,
command_manager=None,
stdin=None,
stdout=None,
stderr=None,
interactive_app_factory=None,
deferred_help=False,
):
# Patch command.Command to add a default auth_required = True
command.Command.auth_required = True
@ -163,10 +166,14 @@ class OpenStackShell(app.App):
# bigger than most big default one (CRITICAL) or something like
# that (PROFILE = 60 for instance), but not sure we need it here.
self.log.warning("Trace ID: %s" % trace_id)
self.log.warning("Short trace ID "
"for OpenTracing-based drivers: %s" % short_id)
self.log.warning("Display trace data with command:\n"
"osprofiler trace show --html %s " % trace_id)
self.log.warning(
"Short trace ID "
"for OpenTracing-based drivers: %s" % short_id
)
self.log.warning(
"Display trace data with command:\n"
"osprofiler trace show --html %s " % trace_id
)
def run_subcommand(self, argv):
self.init_profile()
@ -186,8 +193,8 @@ class OpenStackShell(app.App):
def build_option_parser(self, description, version):
parser = super(OpenStackShell, self).build_option_parser(
description,
version)
description, version
)
# service token auth argument
parser.add_argument(
@ -243,11 +250,11 @@ class OpenStackShell(app.App):
'--os-default-domain',
metavar='<auth-domain>',
dest='default_domain',
default=utils.env(
'OS_DEFAULT_DOMAIN',
default=DEFAULT_DOMAIN),
help=_('Default domain ID, default=%s. '
'(Env: OS_DEFAULT_DOMAIN)') % DEFAULT_DOMAIN,
default=utils.env('OS_DEFAULT_DOMAIN', default=DEFAULT_DOMAIN),
help=_(
'Default domain ID, default=%s. ' '(Env: OS_DEFAULT_DOMAIN)'
)
% DEFAULT_DOMAIN,
)
parser.add_argument(
'--os-interface',
@ -258,18 +265,23 @@ class OpenStackShell(app.App):
# config key 'interface' will be ignored. Use OSC_Config's ctor
# option 'override_defaults' below instead.
default=utils.env('OS_INTERFACE'),
help=_('Select an interface type.'
' Valid interface types: [admin, public, internal].'
' default=%s, (Env: OS_INTERFACE)') % DEFAULT_INTERFACE,
help=_(
'Select an interface type.'
' Valid interface types: [admin, public, internal].'
' default=%s, (Env: OS_INTERFACE)'
)
% DEFAULT_INTERFACE,
)
parser.add_argument(
'--os-service-provider',
metavar='<service_provider>',
dest='service_provider',
default=utils.env('OS_SERVICE_PROVIDER'),
help=_('Authenticate with and perform the command on a service'
' provider using Keystone-to-keystone federation. Must'
' also specify the remote project option.')
help=_(
'Authenticate with and perform the command on a service'
' provider using Keystone-to-keystone federation. Must'
' also specify the remote project option.'
),
)
remote_project_group = parser.add_mutually_exclusive_group()
remote_project_group.add_argument(
@ -277,16 +289,20 @@ class OpenStackShell(app.App):
metavar='<remote_project_name>',
dest='remote_project_name',
default=utils.env('OS_REMOTE_PROJECT_NAME'),
help=_('Project name when authenticating to a service provider'
' if using Keystone-to-Keystone federation.')
help=_(
'Project name when authenticating to a service provider'
' if using Keystone-to-Keystone federation.'
),
)
remote_project_group.add_argument(
'--os-remote-project-id',
metavar='<remote_project_id>',
dest='remote_project_id',
default=utils.env('OS_REMOTE_PROJECT_ID'),
help=_('Project ID when authenticating to a service provider'
' if using Keystone-to-Keystone federation.')
help=_(
'Project ID when authenticating to a service provider'
' if using Keystone-to-Keystone federation.'
),
)
remote_project_domain_group = parser.add_mutually_exclusive_group()
remote_project_domain_group.add_argument(
@ -294,18 +310,22 @@ class OpenStackShell(app.App):
metavar='<remote_project_domain_name>',
dest='remote_project_domain_name',
default=utils.env('OS_REMOTE_PROJECT_DOMAIN_NAME'),
help=_('Domain name of the project when authenticating to a'
' service provider if using Keystone-to-Keystone'
' federation.')
help=_(
'Domain name of the project when authenticating to a'
' service provider if using Keystone-to-Keystone'
' federation.'
),
)
remote_project_domain_group.add_argument(
'--os-remote-project-domain-id',
metavar='<remote_project_domain_id>',
dest='remote_project_domain_id',
default=utils.env('OS_REMOTE_PROJECT_DOMAIN_ID'),
help=_('Domain ID of the project when authenticating to a'
' service provider if using Keystone-to-Keystone'
' federation.')
help=_(
'Domain ID of the project when authenticating to a'
' service provider if using Keystone-to-Keystone'
' federation.'
),
)
parser.add_argument(
'--timing',
@ -345,6 +365,7 @@ class OpenStackShell(app.App):
* ClientManager
"""
def _final_defaults(self):
# Set the default plugin to None
# NOTE(dtroyer): This is here to set up for setting it to a default
@ -396,10 +417,11 @@ class OpenStackShell(app.App):
# Parent __init__ parses argv into self.options
super(OpenStackShell, self).initialize_app(argv)
self.log.info("START with options: %s",
strutils.mask_password(" ".join(self.command_options)))
self.log.debug("options: %s",
strutils.mask_password(self.options))
self.log.info(
"START with options: %s",
strutils.mask_password(" ".join(self.command_options)),
)
self.log.debug("options: %s", strutils.mask_password(self.options))
# Callout for stuff between superclass init and o-c-c
self._final_defaults()
@ -434,8 +456,9 @@ class OpenStackShell(app.App):
self.log_configurator.configure(self.cloud)
self.dump_stack_trace = self.log_configurator.dump_trace
self.log.debug("defaults: %s", self.cloud_config.defaults)
self.log.debug("cloud cfg: %s",
strutils.mask_password(self.cloud.config))
self.log.debug(
"cloud cfg: %s", strutils.mask_password(self.cloud.config)
)
# Callout for stuff between o-c-c and ClientManager
# self._initialize_app_2(self.options)
@ -489,8 +512,9 @@ class OpenStackShell(app.App):
# let the command decide whether we need a scoped token
self.client_manager.validate_scope()
# Trigger the Identity client to initialize
self.client_manager.session.auth.auth_ref = \
self.client_manager.session.auth.auth_ref = (
self.client_manager.auth_ref
)
return
def clean_up(self, cmd, result, err):
@ -518,8 +542,10 @@ class OpenStackShell(app.App):
# If anything other than prettytable is specified, force csv
format = 'table'
# Check the formatter used in the actual command
if hasattr(cmd, 'formatter') \
and cmd.formatter != cmd._formatter_plugins['table'].obj:
if (
hasattr(cmd, 'formatter')
and cmd.formatter != cmd._formatter_plugins['table'].obj
):
format = 'csv'
sys.stdout.write('\n')

View File

@ -47,7 +47,6 @@ LIST_BODY = {
class TestSession(utils.TestCase):
BASE_URL = 'https://api.example.com:1234/test'
def setUp(self):

View File

@ -22,7 +22,6 @@ from osc_lib.tests.api import fakes as api_fakes
class TestBaseAPIDefault(api_fakes.TestSession):
def setUp(self):
super(TestBaseAPIDefault, self).setUp()
self.api = api.BaseAPI()
@ -89,7 +88,6 @@ class TestBaseAPIDefault(api_fakes.TestSession):
class TestBaseAPIEndpointArg(api_fakes.TestSession):
def test_baseapi_endpoint_no_endpoint(self):
x_api = api.BaseAPI(
session=self.sess,
@ -187,7 +185,6 @@ class TestBaseAPIEndpointArg(api_fakes.TestSession):
class TestBaseAPIArgs(api_fakes.TestSession):
def setUp(self):
super(TestBaseAPIArgs, self).setUp()
self.api = api.BaseAPI(
@ -222,7 +219,6 @@ class TestBaseAPIArgs(api_fakes.TestSession):
class TestBaseAPICreate(api_fakes.TestSession):
def setUp(self):
super(TestBaseAPICreate, self).setUp()
self.api = api.BaseAPI(
@ -261,7 +257,6 @@ class TestBaseAPICreate(api_fakes.TestSession):
class TestBaseAPIFind(api_fakes.TestSession):
def setUp(self):
super(TestBaseAPIFind, self).setUp()
self.api = api.BaseAPI(
@ -283,14 +278,9 @@ class TestBaseAPIFind(api_fakes.TestSession):
self.BASE_URL + '/qaz/1',
status_code=404,
)
self.assertRaises(
exceptions.NotFound,
self.api.find,
'qaz',
'1')
self.assertRaises(exceptions.NotFound, self.api.find, 'qaz', '1')
def test_baseapi_find_attr_by_id(self):
# All first requests (by name) will fail in this test
self.requests_mock.register_uri(
'GET',
@ -382,7 +372,6 @@ class TestBaseAPIFind(api_fakes.TestSession):
self.assertEqual(api_fakes.RESP_ITEM_1, ret)
def test_baseapi_find_attr_path_resource(self):
# Test resource different than path
self.requests_mock.register_uri(
'GET',
@ -462,7 +451,6 @@ class TestBaseAPIFind(api_fakes.TestSession):
class TestBaseAPIList(api_fakes.TestSession):
def setUp(self):
super(TestBaseAPIList, self).setUp()
self.api = api.BaseAPI(

View File

@ -37,8 +37,7 @@ class TestBaseAPIFilter(api_fakes.TestSession):
]
def test_simple_filter_none(self):
output = api_utils.simple_filter(
)
output = api_utils.simple_filter()
self.assertIsNone(output)
def test_simple_filter_no_attr(self):

View File

@ -16,7 +16,6 @@ from osc_lib.tests import utils
class TestOSCConfig(utils.TestCase):
def setUp(self):
super(TestOSCConfig, self).setUp()
@ -209,7 +208,7 @@ class TestOSCConfig(utils.TestCase):
'username': 'fred',
'project_id': 'id',
'project_domain_id': 'proj',
'user_domain_id': 'use'
'user_domain_id': 'use',
},
}
ret_config = self.cloud._auth_default_domain(config)

View File

@ -20,7 +20,6 @@ from osc_lib.tests import utils
class TestDictColumn(utils.TestCase):
def test_dict_column(self):
data = {
'key1': 'value1',
@ -42,7 +41,6 @@ class TestDictColumn(utils.TestCase):
class TestDictListColumn(utils.TestCase):
def test_dict_list_column(self):
data = {
'public': ['2001:db8::8', '172.24.4.6'],
@ -67,7 +65,6 @@ class TestDictListColumn(utils.TestCase):
class TestListColumn(utils.TestCase):
def test_list_column(self):
data = [
'key1',
@ -87,7 +84,6 @@ class TestListColumn(utils.TestCase):
class TestListDictColumn(utils.TestCase):
def test_list_dict_column(self):
data = [
{'key1': 'value1'},
@ -112,7 +108,6 @@ class TestListDictColumn(utils.TestCase):
class TestSizeColumn(utils.TestCase):
def test_size_column(self):
content = 1576395005
col = format_columns.SizeColumn(content)

View File

@ -22,12 +22,12 @@ from osc_lib.tests import utils as test_utils
class IdentityUtilsTestCase(test_utils.TestCase):
def test_add_project_owner_option_to_parser(self):
parser = argparse.ArgumentParser()
cli_identity.add_project_owner_option_to_parser(parser)
parsed_args = parser.parse_args(['--project', 'project1',
'--project-domain', 'domain1'])
parsed_args = parser.parse_args(
['--project', 'project1', '--project-domain', 'domain1']
)
self.assertEqual('project1', parsed_args.project)
self.assertEqual('domain1', parsed_args.project_domain)
@ -39,7 +39,8 @@ class IdentityUtilsTestCase(test_utils.TestCase):
ret = cli_identity.find_project(sdk_connection, 'project1')
self.assertEqual(mock.sentinel.project1, ret)
sdk_find_project.assert_called_once_with(
'project1', ignore_missing=False, domain_id=None)
'project1', ignore_missing=False, domain_id=None
)
def test_find_project_with_domain(self):
domain1 = mock.Mock()
@ -54,9 +55,11 @@ class IdentityUtilsTestCase(test_utils.TestCase):
ret = cli_identity.find_project(sdk_connection, 'project1', 'domain1')
self.assertEqual(mock.sentinel.project1, ret)
sdk_find_domain.assert_called_once_with(
'domain1', ignore_missing=False)
'domain1', ignore_missing=False
)
sdk_find_project.assert_called_once_with(
'project1', ignore_missing=False, domain_id='id-domain1')
'project1', ignore_missing=False, domain_id='id-domain1'
)
def test_find_project_with_forbidden_exception(self):
sdk_connection = mock.Mock()

View File

@ -20,7 +20,6 @@ from osc_lib.tests import utils
class TestKeyValueAction(utils.TestCase):
def setUp(self):
super(TestKeyValueAction, self).setUp()
@ -33,15 +32,20 @@ class TestKeyValueAction(utils.TestCase):
action=parseractions.KeyValueAction,
default={'green': '20%', 'format': '#rgb'},
help='Property to store for this volume '
'(repeat option to set multiple properties)',
'(repeat option to set multiple properties)',
)
def test_good_values(self):
results = self.parser.parse_args([
'--property', 'red=',
'--property', 'green=100%',
'--property', 'blue=50%',
])
results = self.parser.parse_args(
[
'--property',
'red=',
'--property',
'green=100%',
'--property',
'blue=50%',
]
)
actual = getattr(results, 'property', {})
# All should pass through unmolested
@ -50,17 +54,26 @@ class TestKeyValueAction(utils.TestCase):
def test_error_values(self):
data_list = [
['--property', 'red', ],
['--property', '=', ],
['--property', '=red', ]
[
'--property',
'red',
],
[
'--property',
'=',
],
[
'--property',
'=red',
],
]
for data in data_list:
self.assertRaises(argparse.ArgumentTypeError,
self.parser.parse_args, data)
self.assertRaises(
argparse.ArgumentTypeError, self.parser.parse_args, data
)
class TestKeyValueAppendAction(utils.TestCase):
def setUp(self):
super(TestKeyValueAppendAction, self).setUp()
@ -72,16 +85,21 @@ class TestKeyValueAppendAction(utils.TestCase):
metavar='<key=value>',
action=parseractions.KeyValueAppendAction,
help='Arbitrary key/value pairs to be sent to the scheduler for '
'custom use',
'custom use',
)
def test_good_values(self):
print(self.parser._get_optional_actions())
results = self.parser.parse_args([
'--hint', 'same_host=a0cf03a5-d921-4877-bb5c-86d26cf818e1',
'--hint', 'same_host=8c19174f-4220-44f0-824a-cd1eeef10287',
'--hint', 'query=[>=,$free_ram_mb,1024]',
])
results = self.parser.parse_args(
[
'--hint',
'same_host=a0cf03a5-d921-4877-bb5c-86d26cf818e1',
'--hint',
'same_host=8c19174f-4220-44f0-824a-cd1eeef10287',
'--hint',
'query=[>=,$free_ram_mb,1024]',
]
)
actual = getattr(results, 'hint', {})
expect = {
@ -97,17 +115,26 @@ class TestKeyValueAppendAction(utils.TestCase):
def test_error_values(self):
data_list = [
['--hint', 'red', ],
['--hint', '=', ],
['--hint', '=red', ]
[
'--hint',
'red',
],
[
'--hint',
'=',
],
[
'--hint',
'=red',
],
]
for data in data_list:
self.assertRaises(argparse.ArgumentTypeError,
self.parser.parse_args, data)
self.assertRaises(
argparse.ArgumentTypeError, self.parser.parse_args, data
)
class TestMultiKeyValueAction(utils.TestCase):
def setUp(self):
super(TestMultiKeyValueAction, self).setUp()
@ -122,14 +149,18 @@ class TestMultiKeyValueAction(utils.TestCase):
default=None,
required_keys=['req1', 'req2'],
optional_keys=['opt1', 'opt2'],
help='Test'
help='Test',
)
def test_good_values(self):
results = self.parser.parse_args([
'--test', 'req1=aaa,req2=bbb',
'--test', 'req1=,req2=',
])
results = self.parser.parse_args(
[
'--test',
'req1=aaa,req2=bbb',
'--test',
'req1=,req2=',
]
)
actual = getattr(results, 'test', [])
expect = [
@ -147,13 +178,17 @@ class TestMultiKeyValueAction(utils.TestCase):
default=None,
required_keys=[],
optional_keys=[],
help='Test'
help='Test',
)
results = self.parser.parse_args([
'--test-empty', 'req1=aaa,req2=bbb',
'--test-empty', 'req1=,req2=',
])
results = self.parser.parse_args(
[
'--test-empty',
'req1=aaa,req2=bbb',
'--test-empty',
'req1=,req2=',
]
)
actual = getattr(results, 'test_empty', [])
expect = [
@ -164,21 +199,32 @@ class TestMultiKeyValueAction(utils.TestCase):
def test_error_values_with_comma(self):
data_list = [
['--test', 'mmm,nnn=zzz', ],
['--test', 'nnn=zzz,=', ],
['--test', 'nnn=zzz,=zzz', ]
[
'--test',
'mmm,nnn=zzz',
],
[
'--test',
'nnn=zzz,=',
],
[
'--test',
'nnn=zzz,=zzz',
],
]
for data in data_list:
self.assertRaises(argparse.ArgumentTypeError,
self.parser.parse_args, data)
self.assertRaises(
argparse.ArgumentTypeError, self.parser.parse_args, data
)
def test_error_values_without_comma(self):
self.assertRaises(
argparse.ArgumentTypeError,
self.parser.parse_args,
[
'--test', 'mmmnnn',
]
'--test',
'mmmnnn',
],
)
def test_missing_key(self):
@ -186,8 +232,9 @@ class TestMultiKeyValueAction(utils.TestCase):
argparse.ArgumentTypeError,
self.parser.parse_args,
[
'--test', 'req2=ddd',
]
'--test',
'req2=ddd',
],
)
def test_invalid_key(self):
@ -195,8 +242,9 @@ class TestMultiKeyValueAction(utils.TestCase):
argparse.ArgumentTypeError,
self.parser.parse_args,
[
'--test', 'req1=aaa,req2=bbb,aaa=req1',
]
'--test',
'req1=aaa,req2=bbb,aaa=req1',
],
)
def test_required_keys_not_list(self):
@ -210,7 +258,7 @@ class TestMultiKeyValueAction(utils.TestCase):
default=None,
required_keys={'aaa': 'bbb'},
optional_keys=['opt1', 'opt2'],
help='Test'
help='Test',
)
def test_optional_keys_not_list(self):
@ -224,12 +272,11 @@ class TestMultiKeyValueAction(utils.TestCase):
default=None,
required_keys=['req1', 'req2'],
optional_keys={'aaa': 'bbb'},
help='Test'
help='Test',
)
class TestMultiKeyValueCommaAction(utils.TestCase):
def setUp(self):
super(TestMultiKeyValueCommaAction, self).setUp()
self.parser = argparse.ArgumentParser()
@ -247,28 +294,38 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
)
def test_mkvca_required(self):
results = self.parser.parse_args([
'--test', 'req1=aaa,bbb',
])
results = self.parser.parse_args(
[
'--test',
'req1=aaa,bbb',
]
)
actual = getattr(results, 'test', [])
expect = [
{'req1': 'aaa,bbb'},
]
self.assertCountEqual(expect, actual)
results = self.parser.parse_args([
'--test', 'req1=',
])
results = self.parser.parse_args(
[
'--test',
'req1=',
]
)
actual = getattr(results, 'test', [])
expect = [
{'req1': ''},
]
self.assertCountEqual(expect, actual)
results = self.parser.parse_args([
'--test', 'req1=aaa,bbb',
'--test', 'req1=',
])
results = self.parser.parse_args(
[
'--test',
'req1=aaa,bbb',
'--test',
'req1=',
]
)
actual = getattr(results, 'test', [])
expect = [
{'req1': 'aaa,bbb'},
@ -277,19 +334,26 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
self.assertCountEqual(expect, actual)
def test_mkvca_optional(self):
results = self.parser.parse_args([
'--test', 'req1=aaa,bbb',
])
results = self.parser.parse_args(
[
'--test',
'req1=aaa,bbb',
]
)
actual = getattr(results, 'test', [])
expect = [
{'req1': 'aaa,bbb'},
]
self.assertCountEqual(expect, actual)
results = self.parser.parse_args([
'--test', 'req1=aaa,bbb',
'--test', 'req1=,opt2=ccc',
])
results = self.parser.parse_args(
[
'--test',
'req1=aaa,bbb',
'--test',
'req1=,opt2=ccc',
]
)
actual = getattr(results, 'test', [])
expect = [
{'req1': 'aaa,bbb'},
@ -298,10 +362,14 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
self.assertCountEqual(expect, actual)
try:
results = self.parser.parse_args([
'--test', 'req1=aaa,bbb',
'--test', 'opt2=ccc',
])
results = self.parser.parse_args(
[
'--test',
'req1=aaa,bbb',
'--test',
'opt2=ccc',
]
)
self.fail('ArgumentTypeError should be raised')
except argparse.ArgumentTypeError as e:
self.assertEqual(
@ -310,14 +378,19 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
)
def test_mkvca_multiples(self):
results = self.parser.parse_args([
'--test', 'req1=aaa,bbb,opt2=ccc',
])
results = self.parser.parse_args(
[
'--test',
'req1=aaa,bbb,opt2=ccc',
]
)
actual = getattr(results, 'test', [])
expect = [{
'req1': 'aaa,bbb',
'opt2': 'ccc',
}]
expect = [
{
'req1': 'aaa,bbb',
'opt2': 'ccc',
}
]
self.assertCountEqual(expect, actual)
def test_mkvca_no_required_optional(self):
@ -332,18 +405,24 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
help='Test',
)
results = self.parser.parse_args([
'--test-empty', 'req1=aaa,bbb',
])
results = self.parser.parse_args(
[
'--test-empty',
'req1=aaa,bbb',
]
)
actual = getattr(results, 'test_empty', [])
expect = [
{'req1': 'aaa,bbb'},
]
self.assertCountEqual(expect, actual)
results = self.parser.parse_args([
'--test-empty', 'xyz=aaa,bbb',
])
results = self.parser.parse_args(
[
'--test-empty',
'xyz=aaa,bbb',
]
)
actual = getattr(results, 'test_empty', [])
expect = [
@ -353,9 +432,12 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
def test_mkvca_invalid_key(self):
try:
self.parser.parse_args([
'--test', 'req1=aaa,bbb=',
])
self.parser.parse_args(
[
'--test',
'req1=aaa,bbb=',
]
)
self.fail('ArgumentTypeError should be raised')
except argparse.ArgumentTypeError as e:
self.assertIn(
@ -364,9 +446,12 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
)
try:
self.parser.parse_args([
'--test', 'nnn=aaa',
])
self.parser.parse_args(
[
'--test',
'nnn=aaa',
]
)
self.fail('ArgumentTypeError should be raised')
except argparse.ArgumentTypeError as e:
self.assertIn(
@ -376,9 +461,12 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
def test_mkvca_value_no_key(self):
try:
self.parser.parse_args([
'--test', 'req1=aaa,=bbb',
])
self.parser.parse_args(
[
'--test',
'req1=aaa,=bbb',
]
)
self.fail('ArgumentTypeError should be raised')
except argparse.ArgumentTypeError as e:
self.assertEqual(
@ -386,9 +474,12 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
str(e),
)
try:
self.parser.parse_args([
'--test', '=nnn',
])
self.parser.parse_args(
[
'--test',
'=nnn',
]
)
self.fail('ArgumentTypeError should be raised')
except argparse.ArgumentTypeError as e:
self.assertEqual(
@ -397,9 +488,12 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
)
try:
self.parser.parse_args([
'--test', 'nnn',
])
self.parser.parse_args(
[
'--test',
'nnn',
]
)
self.fail('ArgumentTypeError should be raised')
except argparse.ArgumentTypeError as e:
self.assertIn(
@ -437,7 +531,6 @@ class TestMultiKeyValueCommaAction(utils.TestCase):
class TestNonNegativeAction(utils.TestCase):
def setUp(self):
super(TestNonNegativeAction, self).setUp()
@ -455,21 +548,17 @@ class TestNonNegativeAction(utils.TestCase):
self.assertRaises(
argparse.ArgumentTypeError,
self.parser.parse_args,
"--foo -1".split()
"--foo -1".split(),
)
def test_zero_values(self):
results = self.parser.parse_args(
'--foo 0'.split()
)
results = self.parser.parse_args('--foo 0'.split())
actual = getattr(results, 'foo', None)
self.assertEqual(actual, 0)
def test_positive_values(self):
results = self.parser.parse_args(
'--foo 1'.split()
)
results = self.parser.parse_args('--foo 1'.split())
actual = getattr(results, 'foo', None)
self.assertEqual(actual, 1)

View File

@ -21,13 +21,11 @@ from osc_lib.tests import utils as test_utils
class FakeCommand(command.Command):
def take_action(self, parsed_args):
pass
class TestCommand(test_utils.TestCase):
def test_command_has_logger(self):
cmd = FakeCommand(mock.Mock(), mock.Mock())
self.assertTrue(hasattr(cmd, 'log'))

View File

@ -28,14 +28,12 @@ timing_elapsed = 0.872809
class FakeGenericClient(object):
def __init__(self, **kwargs):
self.auth_token = kwargs['token']
self.management_url = kwargs['endpoint']
class TestTiming(utils.TestCommand):
columns = (
'URL',
'Seconds',
@ -71,16 +69,23 @@ class TestTiming(utils.TestCommand):
self.assertEqual(self.columns, columns)
datalist = [
('Total', 0.0,)
(
'Total',
0.0,
)
]
self.assertEqual(datalist, data)
def test_timing_list(self):
self.app.timing_data = [session.RequestTiming(
method=timing_method,
url=timing_url,
elapsed=datetime.timedelta(microseconds=timing_elapsed * 1000000),
)]
self.app.timing_data = [
session.RequestTiming(
method=timing_method,
url=timing_url,
elapsed=datetime.timedelta(
microseconds=timing_elapsed * 1000000
),
)
]
arglist = []
verifylist = []

View File

@ -32,8 +32,7 @@ VERSION = "3"
SERVICE_PROVIDER_ID = "bob"
TEST_RESPONSE_DICT = fixture.V2Token(token_id=AUTH_TOKEN,
user_name=USERNAME)
TEST_RESPONSE_DICT = fixture.V2Token(token_id=AUTH_TOKEN, user_name=USERNAME)
_s = TEST_RESPONSE_DICT.add_service('identity', name='keystone')
_s.add_endpoint(AUTH_URL + ':5000/v2.0')
_s = TEST_RESPONSE_DICT.add_service('network', name='neutron')
@ -54,8 +53,10 @@ TEST_VERSIONS = fixture.DiscoveryList(href=AUTH_URL)
def to_unicode_dict(catalog_dict):
"""Converts dict to unicode dict"""
if isinstance(catalog_dict, dict):
return {to_unicode_dict(key): to_unicode_dict(value)
for key, value in catalog_dict.items()}
return {
to_unicode_dict(key): to_unicode_dict(value)
for key, value in catalog_dict.items()
}
elif isinstance(catalog_dict, list):
return [to_unicode_dict(element) for element in catalog_dict]
elif isinstance(catalog_dict, str):
@ -65,7 +66,6 @@ def to_unicode_dict(catalog_dict):
class FakeStdout(object):
def __init__(self):
self.content = []
@ -80,7 +80,6 @@ class FakeStdout(object):
class FakeLog(object):
def __init__(self):
self.messages = {}
@ -101,7 +100,6 @@ class FakeLog(object):
class FakeApp(object):
def __init__(self, _stdout, _log):
self.stdout = _stdout
self.client_manager = None
@ -118,7 +116,6 @@ class FakeOptions(object):
class FakeClientManager(object):
def __init__(self):
self.compute = None
self.identity = None
@ -143,7 +140,6 @@ class FakeClientManager(object):
class FakeModule(object):
def __init__(self, name, version):
self.name = name
self.__version__ = version
@ -153,7 +149,6 @@ class FakeModule(object):
class FakeResource(object):
def __init__(self, manager=None, info=None, loaded=False, methods=None):
"""Set attributes and methods for a resource.
@ -177,7 +172,7 @@ class FakeResource(object):
self._loaded = loaded
def _add_details(self, info):
for (k, v) in info.items():
for k, v in info.items():
setattr(self, k, v)
def _add_methods(self, methods):
@ -188,13 +183,14 @@ class FakeResource(object):
@value. When users access the attribute with (), @value will be
returned, which looks like a function call.
"""
for (name, ret) in methods.items():
for name, ret in methods.items():
method = mock.MagicMock(return_value=ret)
setattr(self, name, method)
def __repr__(self):
reprkeys = sorted(k for k in self.__dict__.keys() if k[0] != '_' and
k != 'manager')
reprkeys = sorted(
k for k in self.__dict__.keys() if k[0] != '_' and k != 'manager'
)
info = ", ".join("%s=%s" % (k, getattr(self, k)) for k in reprkeys)
return "<%s %s>" % (self.__class__.__name__, info)

View File

@ -41,7 +41,7 @@ AUTH_DICT = {
'auth_url': fakes.AUTH_URL,
'username': fakes.USERNAME,
'password': fakes.PASSWORD,
'project_name': fakes.PROJECT_NAME
'project_name': fakes.PROJECT_NAME,
}
@ -58,7 +58,6 @@ class Container(object):
class TestClientCache(utils.TestCase):
def test_singleton(self):
# NOTE(dtroyer): Verify that the ClientCache descriptor only invokes
# the factory one time and always returns the same value after that.
@ -67,14 +66,14 @@ class TestClientCache(utils.TestCase):
def test_attribute_error_propagates(self):
c = Container()
err = self.assertRaises(exc.PluginAttributeError,
getattr, c, 'buggy_attr')
err = self.assertRaises(
exc.PluginAttributeError, getattr, c, 'buggy_attr'
)
self.assertNotIsInstance(err, AttributeError)
self.assertEqual("'Container' object has no attribute 'foo'", str(err))
class TestClientManager(utils.TestClientManager):
def test_client_manager_none(self):
none_auth = {
'endpoint': fakes.AUTH_URL,
@ -268,10 +267,12 @@ class TestClientManager(utils.TestClientManager):
)
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.update({
'user_domain_name': 'default',
'project_domain_name': 'default',
})
auth_args.update(
{
'user_domain_name': 'default',
'project_domain_name': 'default',
}
)
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='3',
@ -292,10 +293,12 @@ class TestClientManager(utils.TestClientManager):
# Use v3 auth args
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.update({
'user_domain_name': 'default',
'project_domain_name': 'default',
})
auth_args.update(
{
'user_domain_name': 'default',
'project_domain_name': 'default',
}
)
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='3',
@ -303,9 +306,11 @@ class TestClientManager(utils.TestClientManager):
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.pop('username')
auth_args.update({
'user_id': fakes.USER_ID,
})
auth_args.update(
{
'user_id': fakes.USER_ID,
}
)
self._make_clientmanager(
auth_args=auth_args,
identity_api_version='3',
@ -342,12 +347,14 @@ class TestClientManager(utils.TestClientManager):
loader = loading.get_plugin_loader('password')
auth_plugin = loader.load_from_options(**AUTH_DICT)
cli_options = defaults.get_defaults()
cli_options.update({
'auth_type': 'password',
'auth': AUTH_DICT,
'interface': fakes.INTERFACE,
'region_name': fakes.REGION_NAME,
})
cli_options.update(
{
'auth_type': 'password',
'auth': AUTH_DICT,
'interface': fakes.INTERFACE,
'region_name': fakes.REGION_NAME,
}
)
client_manager = self._clientmanager_class()(
cli_options=cloud_config.CloudConfig(
name='t1',
@ -372,10 +379,12 @@ class TestClientManager(utils.TestClientManager):
def test_client_manager_endpoint_disabled(self):
auth_args = copy.deepcopy(self.default_password_auth)
auth_args.update({
'user_domain_name': 'default',
'project_domain_name': 'default',
})
auth_args.update(
{
'user_domain_name': 'default',
'project_domain_name': 'default',
}
)
# v3 fake doesn't have network endpoint
client_manager = self._make_clientmanager(
auth_args=auth_args,
@ -389,14 +398,16 @@ class TestClientManager(utils.TestClientManager):
loader = loading.get_plugin_loader('password')
auth_plugin = loader.load_from_options(**AUTH_DICT)
cli_options = defaults.get_defaults()
cli_options.update({
'auth_type': 'password',
'auth': AUTH_DICT,
'interface': fakes.INTERFACE,
'region_name': fakes.REGION_NAME,
'service_provider': fakes.SERVICE_PROVIDER_ID,
'remote_project_id': fakes.PROJECT_ID
})
cli_options.update(
{
'auth_type': 'password',
'auth': AUTH_DICT,
'interface': fakes.INTERFACE,
'region_name': fakes.REGION_NAME,
'service_provider': fakes.SERVICE_PROVIDER_ID,
'remote_project_id': fakes.PROJECT_ID,
}
)
client_manager = self._clientmanager_class()(
cli_options=cloud_config.CloudConfig(
name='t1',
@ -425,27 +436,31 @@ class TestClientManager(utils.TestClientManager):
auth_plugin_name='none',
)
self.assertIsNone(
client_manager.get_endpoint_for_service_type('compute'))
client_manager.get_endpoint_for_service_type('compute')
)
def test_client_manager_endpoint_override(self):
# test token auth
client_manager = self._make_clientmanager(
auth_args={},
config_args={'compute_endpoint_override': 'http://example.com',
'foo_bar_endpoint_override': 'http://example2.com'},
config_args={
'compute_endpoint_override': 'http://example.com',
'foo_bar_endpoint_override': 'http://example2.com',
},
auth_plugin_name='none',
)
self.assertEqual(
'http://example.com',
client_manager.get_endpoint_for_service_type('compute'))
client_manager.get_endpoint_for_service_type('compute'),
)
self.assertEqual(
'http://example2.com',
client_manager.get_endpoint_for_service_type('foo-bar'))
client_manager.get_endpoint_for_service_type('foo-bar'),
)
self.assertTrue(client_manager.is_service_available('compute'))
class TestClientManagerSDK(utils.TestClientManager):
def test_client_manager_connection(self):
client_manager = self._make_clientmanager(
auth_required=True,

View File

@ -19,7 +19,6 @@ from osc_lib.tests import utils
class TestContext(utils.TestCase):
def test_log_level_from_options(self):
opts = mock.Mock()
opts.verbose_level = 0
@ -66,35 +65,47 @@ class TestContext(utils.TestCase):
class TestFileFormatter(utils.TestCase):
def test_nothing(self):
formatter = logs._FileFormatter()
self.assertEqual(('%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s %(message)s'), formatter.fmt)
self.assertEqual(
(
'%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s %(message)s'
),
formatter.fmt,
)
def test_options(self):
class Opts(object):
cloud = 'cloudy'
os_project_name = 'projecty'
username = 'usernamey'
options = Opts()
formatter = logs._FileFormatter(options=options)
self.assertEqual(('%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [cloudy usernamey projecty] %(message)s'),
formatter.fmt)
self.assertEqual(
(
'%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [cloudy usernamey projecty] %(message)s'
),
formatter.fmt,
)
def test_config(self):
config = mock.Mock()
config.config = {'cloud': 'cloudy'}
config.auth = {'project_name': 'projecty', 'username': 'usernamey'}
formatter = logs._FileFormatter(config=config)
self.assertEqual(('%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [cloudy usernamey projecty] %(message)s'),
formatter.fmt)
self.assertEqual(
(
'%(asctime)s.%(msecs)03d %(process)d %(levelname)s '
'%(name)s [cloudy usernamey projecty] %(message)s'
),
formatter.fmt,
)
class TestLogConfigurator(utils.TestCase):
def setUp(self):
super(TestLogConfigurator, self).setUp()
self.options = mock.Mock()
@ -117,7 +128,8 @@ class TestLogConfigurator(utils.TestCase):
self.requests_log,
self.cliff_log,
self.stevedore_log,
self.iso8601_log]
self.iso8601_log,
]
@mock.patch('logging.StreamHandler')
@mock.patch('logging.getLogger')
@ -186,7 +198,8 @@ class TestLogConfigurator(utils.TestCase):
cloud_config.config = {
'log_file': config_log,
'verbose_level': 1,
'log_level': 'info'}
'log_level': 'info',
}
file_logger = mock.Mock()
file_logger.setFormatter = mock.Mock()
file_logger.setLevel = mock.Mock()

View File

@ -113,7 +113,7 @@ global_options = {
'--os-default-domain': (DEFAULT_DOMAIN_NAME, True, True),
'--os-cacert': ('/dev/null', True, True),
'--timing': (True, True, False),
'--os-interface': (DEFAULT_INTERFACE, True, True)
'--os-interface': (DEFAULT_INTERFACE, True, True),
}
if shell.osprofiler_profiler:
global_options['--os-profile'] = ('SECRET_KEY', True, True)
@ -138,8 +138,8 @@ class TestShellArgV(utils.TestShell):
"""
with mock.patch(
"osc_lib.shell.OpenStackShell.run",
self.app,
"osc_lib.shell.OpenStackShell.run",
self.app,
):
# Ensure type gets through unmolested through shell.main()
argv = sys.argv
@ -220,8 +220,8 @@ class TestShellCli(utils.TestShell):
def test_shell_args_no_options(self):
_shell = utils.make_shell()
with mock.patch(
"osc_lib.shell.OpenStackShell.initialize_app",
self.app,
"osc_lib.shell.OpenStackShell.initialize_app",
self.app,
):
utils.fake_execute(_shell, "list user")
self.app.assert_called_with(["list", "user"])
@ -306,8 +306,7 @@ class TestShellCli(utils.TestShell):
# --os-cert and --os-key
utils.fake_execute(
_shell,
"--os-cert mycert --os-key mickey module list"
_shell, "--os-cert mycert --os-key mickey module list"
)
self.assertEqual('mycert', _shell.options.cert)
self.assertEqual('mickey', _shell.options.key)

View File

@ -86,17 +86,20 @@ class ParserException(Exception):
class TestCase(testtools.TestCase):
def setUp(self):
testtools.TestCase.setUp(self)
if (os.environ.get("OS_STDOUT_CAPTURE") == "True" or
os.environ.get("OS_STDOUT_CAPTURE") == "1"):
if (
os.environ.get("OS_STDOUT_CAPTURE") == "True"
or os.environ.get("OS_STDOUT_CAPTURE") == "1"
):
stdout = self.useFixture(fixtures.StringStream("stdout")).stream
self.useFixture(fixtures.MonkeyPatch("sys.stdout", stdout))
if (os.environ.get("OS_STDERR_CAPTURE") == "True" or
os.environ.get("OS_STDERR_CAPTURE") == "1"):
if (
os.environ.get("OS_STDERR_CAPTURE") == "True"
or os.environ.get("OS_STDERR_CAPTURE") == "1"
):
stderr = self.useFixture(fixtures.StringStream("stderr")).stream
self.useFixture(fixtures.MonkeyPatch("sys.stderr", stderr))
@ -162,10 +165,13 @@ class TestCommand(TestCase):
for col_expected, col_actual in zip(expected, actual):
if isinstance(col_expected, cliff_columns.FormattableColumn):
self.assertIsInstance(col_actual, col_expected.__class__)
self.assertEqual(col_expected.human_readable(),
col_actual.human_readable())
self.assertEqual(col_expected.machine_readable(),
col_actual.machine_readable())
self.assertEqual(
col_expected.human_readable(), col_actual.human_readable()
)
self.assertEqual(
col_expected.machine_readable(),
col_actual.machine_readable(),
)
else:
self.assertEqual(col_expected, col_actual)
@ -201,18 +207,24 @@ class TestClientManager(TestCase):
# fake v2password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT)
# fake token and token_endpoint retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT,
url='/'.join([fakes.AUTH_URL, 'v2.0/tokens']))
self.stub_auth(
json=fakes.TEST_RESPONSE_DICT,
url='/'.join([fakes.AUTH_URL, 'v2.0/tokens']),
)
# fake v3password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens']))
self.stub_auth(
json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'v3/auth/tokens']),
)
# fake password token retrieval
self.stub_auth(json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'auth/tokens']))
self.stub_auth(
json=fakes.TEST_RESPONSE_DICT_V3,
url='/'.join([fakes.AUTH_URL, 'auth/tokens']),
)
# fake password version endpoint discovery
self.stub_auth(json=fakes.TEST_VERSIONS,
url=fakes.AUTH_URL,
verb='GET')
self.stub_auth(
json=fakes.TEST_VERSIONS, url=fakes.AUTH_URL, verb='GET'
)
# Mock the auth plugin
self.auth_mock = mock.Mock()
@ -250,7 +262,6 @@ class TestClientManager(TestCase):
auth_plugin_name=None,
auth_required=None,
):
if identity_api_version is None:
identity_api_version = '2.0'
if auth_plugin_name is None:
@ -267,13 +278,15 @@ class TestClientManager(TestCase):
auth_dict = auth_args
cli_options = defaults.get_defaults()
cli_options.update({
'auth_type': auth_plugin_name,
'auth': auth_dict,
'interface': fakes.INTERFACE,
'region_name': fakes.REGION_NAME,
# 'workflow_api_version': '2',
})
cli_options.update(
{
'auth_type': auth_plugin_name,
'auth': auth_dict,
'interface': fakes.INTERFACE,
'region_name': fakes.REGION_NAME,
# 'workflow_api_version': '2',
}
)
if config_args is not None:
cli_options.update(config_args)
@ -302,7 +315,6 @@ class TestClientManager(TestCase):
class TestShell(TestCase):
# Full name of the OpenStackShell class to test (cliff.app.App subclass)
shell_class_name = "osc_lib.shell.OpenStackShell"
@ -325,8 +337,8 @@ class TestShell(TestCase):
"""
with mock.patch(
self.shell_class_name + ".initialize_app",
self.app,
self.shell_class_name + ".initialize_app",
self.app,
):
_shell = make_shell(shell_class=self.shell_class)
_cmd = cmd_options + " module list"
@ -352,8 +364,8 @@ class TestShell(TestCase):
cloud.config = {}
self.occ_get_one = mock.Mock(return_value=cloud)
with mock.patch(
"openstack.config.loader.OpenStackConfig.get_one",
self.occ_get_one,
"openstack.config.loader.OpenStackConfig.get_one",
self.occ_get_one,
):
_shell = make_shell(shell_class=self.shell_class)
_cmd = cmd_options + " module list"

View File

@ -16,7 +16,6 @@ from osc_lib.utils import columns as column_utils
class TestColumnUtils(test_utils.TestCase):
def test_get_column_definitions(self):
attr_map = (
('id', 'ID', column_utils.LIST_BOTH),
@ -25,7 +24,8 @@ class TestColumnUtils(test_utils.TestCase):
('summary', 'Summary', column_utils.LIST_SHORT_ONLY),
)
headers, columns = column_utils.get_column_definitions(
attr_map, long_listing=False)
attr_map, long_listing=False
)
self.assertEqual(['id', 'name', 'summary'], columns)
self.assertEqual(['ID', 'Name', 'Summary'], headers)
@ -37,7 +37,8 @@ class TestColumnUtils(test_utils.TestCase):
('summary', 'Summary', column_utils.LIST_SHORT_ONLY),
)
headers, columns = column_utils.get_column_definitions(
attr_map, long_listing=True)
attr_map, long_listing=True
)
self.assertEqual(['id', 'tenant_id', 'name'], columns)
self.assertEqual(['ID', 'Project', 'Name'], headers)

View File

@ -26,15 +26,22 @@ def help_enhancer(_h):
class TestTags(test_utils.TestCase):
def test_add_tag_filtering_option_to_parser(self):
parser = argparse.ArgumentParser()
tags.add_tag_filtering_option_to_parser(parser, 'test')
parsed_args = parser.parse_args(['--tags', 'tag1,tag2',
'--any-tags', 'tag4',
'--not-tags', 'tag5',
'--not-any-tags', 'tag6'])
parsed_args = parser.parse_args(
[
'--tags',
'tag1,tag2',
'--any-tags',
'tag4',
'--not-tags',
'tag5',
'--not-any-tags',
'tag6',
]
)
actual = getattr(parsed_args, 'tags', [])
expected = ['tag1', 'tag2']
@ -56,12 +63,24 @@ class TestTags(test_utils.TestCase):
parser = argparse.ArgumentParser()
tags.add_tag_filtering_option_to_parser(parser, 'test')
parsed_args = parser.parse_args(['--tags', 'tag1,tag2',
'--any-tags', 'tag4',
'--not-tags', 'tag5',
'--not-any-tags', 'tag6'])
expected = {'tags': 'tag1,tag2', 'any_tags': 'tag4',
'not_tags': 'tag5', 'not_any_tags': 'tag6'}
parsed_args = parser.parse_args(
[
'--tags',
'tag1,tag2',
'--any-tags',
'tag4',
'--not-tags',
'tag5',
'--not-any-tags',
'tag6',
]
)
expected = {
'tags': 'tag1,tag2',
'any_tags': 'tag4',
'not_tags': 'tag5',
'not_any_tags': 'tag6',
}
args = {}
tags.get_tag_filtering_args(parsed_args, args)
self.assertEqual(expected, args)
@ -71,8 +90,9 @@ class TestTags(test_utils.TestCase):
tags.add_tag_option_to_parser_for_create(parser, 'test')
# Test that --tag and --no-tag are mutually exclusive
self.assertRaises(SystemExit, parser.parse_args,
['--tag', 'tag1', '--no-tag'])
self.assertRaises(
SystemExit, parser.parse_args, ['--tag', 'tag1', '--no-tag']
)
parsed_args = parser.parse_args(['--tag', 'tag1'])
actual = getattr(parsed_args, 'tags', [])
@ -103,8 +123,9 @@ class TestTags(test_utils.TestCase):
tags.add_tag_option_to_parser_for_unset(parser, 'test')
# Test that --tag and --all-tag are mutually exclusive
self.assertRaises(SystemExit, parser.parse_args,
['--tag', 'tag1', '--all-tag'])
self.assertRaises(
SystemExit, parser.parse_args, ['--tag', 'tag1', '--all-tag']
)
parsed_args = parser.parse_args(['--tag', 'tag1'])
actual = getattr(parsed_args, 'tags', [])
@ -127,7 +148,8 @@ class TestTags(test_utils.TestCase):
mock_obj.tags = None
tags.update_tags_for_set(mock_client, mock_obj, mock_parsed_args)
mock_client.set_tags.assert_called_once_with(
mock_obj, list(mock_parsed_args.tags))
mock_obj, list(mock_parsed_args.tags)
)
# no-tag False path
mock_client.set_tags.reset_mock()
@ -136,8 +158,7 @@ class TestTags(test_utils.TestCase):
mock_obj.tags = ['tag2']
expected_list = ['tag1', 'tag2']
tags.update_tags_for_set(mock_client, mock_obj, mock_parsed_args)
mock_client.set_tags.assert_called_once_with(
mock_obj, expected_list)
mock_client.set_tags.assert_called_once_with(mock_obj, expected_list)
# no new tags path
mock_client.set_tags.reset_mock()
@ -164,8 +185,7 @@ class TestTags(test_utils.TestCase):
mock_parsed_args.all_tag = True
mock_parsed_args.tags = None
tags.update_tags_for_unset(mock_client, mock_obj, mock_parsed_args)
mock_client.set_tags.assert_called_once_with(
mock_obj, [])
mock_client.set_tags.assert_called_once_with(mock_obj, [])
# Remove one tag
mock_client.set_tags.reset_mock()
@ -173,12 +193,10 @@ class TestTags(test_utils.TestCase):
mock_parsed_args.all_tag = False
mock_parsed_args.tags = ['tag2']
tags.update_tags_for_unset(mock_client, mock_obj, mock_parsed_args)
mock_client.set_tags.assert_called_once_with(
mock_obj, ['tag1'])
mock_client.set_tags.assert_called_once_with(mock_obj, ['tag1'])
class TestTagHelps(test_utils.TestCase):
def _test_tag_method_help(self, meth, exp_normal, exp_enhanced):
"""Vet the help text of the options added by the tag filtering helpers.
@ -241,7 +259,8 @@ usage: run.py [-h] [--tags <tag>[,<tag>,...]] [--any-tags <tag>[,<tag>,...]]
--not-any-tags <tag>[,<tag>,...]
)sgat fo tsil detarapes-ammoC( )s(gat nevig yna evah
hcihw tset edulcxE
""")
""",
)
def test_add_tag_option_to_parser_for_create(self):
self._test_tag_method_help(
@ -263,7 +282,8 @@ usage: run.py [-h] [--tag <tag> | --no-tag]
--tag <tag> )sgat elpitlum tes ot noitpo taeper( tset eht ot dedda eb ot
gaT
--no-tag tset eht htiw detaicossa sgat oN
""")
""",
)
def test_add_tag_option_to_parser_for_set(self):
self._test_tag_method_help(
@ -287,7 +307,8 @@ usage: run.py [-h] [--tag <tag>] [--no-tag]
gaT
--no-tag sgat tnerruc etirwrevo ot gat-on-- dna gat-- htob yficepS .tset
eht htiw detaicossa sgat raelC
""")
""",
)
def test_add_tag_option_to_parser_for_unset(self):
self._test_tag_method_help(
@ -309,4 +330,5 @@ usage: run.py [-h] [--tag <tag> | --all-tag]
--tag <tag> )sgat elpitlum evomer ot noitpo taeper( tset eht morf devomer
eb ot gaT
--all-tag tset eht htiw detaicossa sgat lla raelC
""")
""",
)

View File

@ -31,7 +31,6 @@ DROWSSAP = "dr0w$$aP"
class FakeOddballResource(fakes.FakeResource):
def get(self, attr):
"""get() is needed for utils.find_resource()"""
if attr == 'id':
@ -43,7 +42,6 @@ class FakeOddballResource(fakes.FakeResource):
class TestUtils(test_utils.TestCase):
def _get_test_items(self):
item1 = {'a': 1, 'b': 2}
item2 = {'a': 1, 'b': 3}
@ -130,18 +128,18 @@ class TestUtils(test_utils.TestCase):
mock_stdin = mock.Mock()
mock_stdin.isatty = mock.Mock()
mock_stdin.isatty.return_value = False
self.assertRaises(exceptions.CommandError,
utils.get_password,
mock_stdin)
self.assertRaises(
exceptions.CommandError, utils.get_password, mock_stdin
)
def test_get_password_cntrl_d(self):
with mock.patch("getpass.getpass", side_effect=EOFError()):
mock_stdin = mock.Mock()
mock_stdin.isatty = mock.Mock()
mock_stdin.isatty.return_value = True
self.assertRaises(exceptions.CommandError,
utils.get_password,
mock_stdin)
self.assertRaises(
exceptions.CommandError, utils.get_password, mock_stdin
)
def test_sort_items_with_one_key(self):
items = self._get_test_items()
@ -187,16 +185,16 @@ class TestUtils(test_utils.TestCase):
def test_sort_items_with_invalid_key(self):
items = self._get_test_items()
sort_str = 'c'
self.assertRaises(exceptions.CommandError,
utils.sort_items,
items, sort_str)
self.assertRaises(
exceptions.CommandError, utils.sort_items, items, sort_str
)
def test_sort_items_with_invalid_direction(self):
items = self._get_test_items()
sort_str = 'a:bad_dir'
self.assertRaises(exceptions.CommandError,
utils.sort_items,
items, sort_str)
self.assertRaises(
exceptions.CommandError, utils.sort_items, items, sort_str
)
def test_sort_items_with_different_type_exception(self):
item1 = {'a': 2}
@ -216,8 +214,9 @@ class TestUtils(test_utils.TestCase):
sort_str = 'a'
sort_type = int
expect_items = [item3, item4, item1, item2]
self.assertEqual(expect_items, utils.sort_items(items, sort_str,
sort_type))
self.assertEqual(
expect_items, utils.sort_items(items, sort_str, sort_type)
)
def test_sort_items_with_different_type_str(self):
item1 = {'a': 'a'}
@ -228,21 +227,22 @@ class TestUtils(test_utils.TestCase):
sort_str = 'a'
sort_type = str
expect_items = [item3, item2, item1, item4]
self.assertEqual(expect_items, utils.sort_items(items, sort_str,
sort_type))
self.assertEqual(
expect_items, utils.sort_items(items, sort_str, sort_type)
)
@mock.patch.object(time, 'sleep')
def test_wait_for_delete_ok(self, mock_sleep):
# Tests the normal flow that the resource is deleted with a 404 coming
# back on the 2nd iteration of the wait loop.
resource = mock.MagicMock(status='ACTIVE', progress=None)
mock_get = mock.Mock(side_effect=[resource,
exceptions.NotFound(404)])
mock_get = mock.Mock(side_effect=[resource, exceptions.NotFound(404)])
manager = mock.MagicMock(get=mock_get)
res_id = str(uuid.uuid4())
callback = mock.Mock()
self.assertTrue(utils.wait_for_delete(manager, res_id,
callback=callback))
self.assertTrue(
utils.wait_for_delete(manager, res_id, callback=callback)
)
mock_sleep.assert_called_once_with(5)
callback.assert_called_once_with(0)
@ -253,8 +253,9 @@ class TestUtils(test_utils.TestCase):
mock_get = mock.Mock(return_value=resource)
manager = mock.MagicMock(get=mock_get)
res_id = str(uuid.uuid4())
self.assertFalse(utils.wait_for_delete(manager, res_id, sleep_time=1,
timeout=1))
self.assertFalse(
utils.wait_for_delete(manager, res_id, sleep_time=1, timeout=1)
)
mock_sleep.assert_called_once_with(1)
@mock.patch.object(time, 'sleep')
@ -274,9 +275,14 @@ class TestUtils(test_utils.TestCase):
mock_get = mock.Mock(return_value=resource)
manager = mock.MagicMock(get=mock_get)
res_id = str(uuid.uuid4())
self.assertFalse(utils.wait_for_delete(manager, res_id,
status_field='my_status',
error_status=['failed']))
self.assertFalse(
utils.wait_for_delete(
manager,
res_id,
status_field='my_status',
error_status=['failed'],
)
)
mock_sleep.assert_not_called()
@mock.patch.object(time, 'sleep')
@ -285,8 +291,11 @@ class TestUtils(test_utils.TestCase):
mock_get = mock.Mock(side_effect=Exception)
manager = mock.MagicMock(get=mock_get)
res_id = str(uuid.uuid4())
self.assertTrue(utils.wait_for_delete(manager, res_id,
exception_name=['Exception']))
self.assertTrue(
utils.wait_for_delete(
manager, res_id, exception_name=['Exception']
)
)
mock_sleep.assert_not_called()
@mock.patch.object(time, 'sleep')
@ -295,7 +304,12 @@ class TestUtils(test_utils.TestCase):
resource = mock.MagicMock(status='ACTIVE')
status_f = mock.Mock(return_value=resource)
res_id = str(uuid.uuid4())
self.assertTrue(utils.wait_for_status(status_f, res_id,))
self.assertTrue(
utils.wait_for_status(
status_f,
res_id,
)
)
mock_sleep.assert_not_called()
@mock.patch.object(time, 'sleep')
@ -304,9 +318,14 @@ class TestUtils(test_utils.TestCase):
resource = mock.MagicMock(my_status='COMPLETE')
status_f = mock.Mock(return_value=resource)
res_id = str(uuid.uuid4())
self.assertTrue(utils.wait_for_status(status_f, res_id,
status_field='my_status',
success_status=['complete']))
self.assertTrue(
utils.wait_for_status(
status_f,
res_id,
status_field='my_status',
success_status=['complete'],
)
)
mock_sleep.assert_not_called()
@mock.patch.object(time, 'sleep')
@ -324,14 +343,20 @@ class TestUtils(test_utils.TestCase):
resource = mock.MagicMock(my_status='FAILED')
status_f = mock.Mock(return_value=resource)
res_id = str(uuid.uuid4())
self.assertFalse(utils.wait_for_status(status_f, res_id,
status_field='my_status',
error_status=['failed']))
self.assertFalse(
utils.wait_for_status(
status_f,
res_id,
status_field='my_status',
error_status=['failed'],
)
)
mock_sleep.assert_not_called()
def test_build_kwargs_dict_value_set(self):
self.assertEqual({'arg_bla': 'bla'},
utils.build_kwargs_dict('arg_bla', 'bla'))
self.assertEqual(
{'arg_bla': 'bla'}, utils.build_kwargs_dict('arg_bla', 'bla')
)
def test_build_kwargs_dict_value_None(self):
self.assertEqual({}, utils.build_kwargs_dict('arg_bla', None))
@ -349,21 +374,17 @@ class TestUtils(test_utils.TestCase):
self.assertEqual("999", utils.format_size(999))
self.assertEqual("100K", utils.format_size(100000))
self.assertEqual("2M", utils.format_size(2000000))
self.assertEqual(
"16.4M", utils.format_size(16361280)
)
self.assertEqual(
"1.6G", utils.format_size(1576395005)
)
self.assertEqual("16.4M", utils.format_size(16361280))
self.assertEqual("1.6G", utils.format_size(1576395005))
self.assertEqual("0", utils.format_size(None))
def test_backward_compat_col_lister(self):
fake_col_headers = ['ID', 'Name', 'Size']
columns = ['Display Name']
column_map = {'Display Name': 'Name'}
results = utils.backward_compat_col_lister(fake_col_headers,
columns,
column_map)
results = utils.backward_compat_col_lister(
fake_col_headers, columns, column_map
)
self.assertIsInstance(results, list)
self.assertIn('Display Name', results)
self.assertNotIn('Name', results)
@ -374,9 +395,9 @@ class TestUtils(test_utils.TestCase):
fake_col_headers = ['ID', 'Name', 'Size']
columns = []
column_map = {'Display Name': 'Name'}
results = utils.backward_compat_col_lister(fake_col_headers,
columns,
column_map)
results = utils.backward_compat_col_lister(
fake_col_headers, columns, column_map
)
self.assertIsInstance(results, list)
self.assertNotIn('Display Name', results)
self.assertIn('Name', results)
@ -387,9 +408,9 @@ class TestUtils(test_utils.TestCase):
fake_col_headers = ('ID', 'Name', 'Size')
columns = ['Display Name']
column_map = {'Display Name': 'Name'}
results = utils.backward_compat_col_lister(fake_col_headers,
columns,
column_map)
results = utils.backward_compat_col_lister(
fake_col_headers, columns, column_map
)
self.assertIsInstance(results, list)
self.assertIn('Display Name', results)
self.assertNotIn('Name', results)
@ -397,14 +418,16 @@ class TestUtils(test_utils.TestCase):
self.assertIn('Size', results)
def test_backward_compat_col_showone(self):
fake_object = {'id': 'fake-id',
'name': 'fake-name',
'size': 'fake-size'}
fake_object = {
'id': 'fake-id',
'name': 'fake-name',
'size': 'fake-size',
}
columns = ['display_name']
column_map = {'display_name': 'name'}
results = utils.backward_compat_col_showone(fake_object,
columns,
column_map)
results = utils.backward_compat_col_showone(
fake_object, columns, column_map
)
self.assertIsInstance(results, dict)
self.assertIn('display_name', results)
self.assertIn('id', results)
@ -412,14 +435,16 @@ class TestUtils(test_utils.TestCase):
self.assertIn('size', results)
def test_backward_compat_col_showone_no_specify_column(self):
fake_object = {'id': 'fake-id',
'name': 'fake-name',
'size': 'fake-size'}
fake_object = {
'id': 'fake-id',
'name': 'fake-name',
'size': 'fake-size',
}
columns = []
column_map = {'display_name': 'name'}
results = utils.backward_compat_col_showone(fake_object,
columns,
column_map)
results = utils.backward_compat_col_showone(
fake_object, columns, column_map
)
self.assertIsInstance(results, dict)
self.assertNotIn('display_name', results)
self.assertIn('id', results)
@ -429,8 +454,9 @@ class TestUtils(test_utils.TestCase):
def _test_get_item_properties_with_formatter(self, formatters):
names = ('id', 'attr')
item = fakes.FakeResource(info={'id': 'fake-id', 'attr': ['a', 'b']})
res_id, res_attr = utils.get_item_properties(item, names,
formatters=formatters)
res_id, res_attr = utils.get_item_properties(
item, names, formatters=formatters
)
self.assertEqual('fake-id', res_id)
return res_attr
@ -447,8 +473,9 @@ class TestUtils(test_utils.TestCase):
def _test_get_dict_properties_with_formatter(self, formatters):
names = ('id', 'attr')
item = {'id': 'fake-id', 'attr': ['a', 'b']}
res_id, res_attr = utils.get_dict_properties(item, names,
formatters=formatters)
res_id, res_attr = utils.get_dict_properties(
item, names, formatters=formatters
)
self.assertEqual('fake-id', res_id)
return res_attr
@ -462,14 +489,16 @@ class TestUtils(test_utils.TestCase):
res_attr = self._test_get_dict_properties_with_formatter(formatters)
self.assertIsInstance(res_attr, format_columns.ListColumn)
def _test_calculate_header_and_attrs(self, parsed_args_columns,
expected_headers, expected_attrs):
def _test_calculate_header_and_attrs(
self, parsed_args_columns, expected_headers, expected_attrs
):
column_headers = ('ID', 'Name', 'Fixed IP Addresses')
columns = ('id', 'name', 'fixed_ips')
parsed_args = mock.Mock()
parsed_args.columns = parsed_args_columns
ret_headers, ret_attrs = utils.calculate_header_and_attrs(
column_headers, columns, parsed_args)
column_headers, columns, parsed_args
)
self.assertEqual(expected_headers, ret_headers)
self.assertEqual(expected_attrs, ret_attrs)
if parsed_args_columns:
@ -481,25 +510,27 @@ class TestUtils(test_utils.TestCase):
self._test_calculate_header_and_attrs(
[],
('ID', 'Name', 'Fixed IP Addresses'),
('id', 'name', 'fixed_ips'))
('id', 'name', 'fixed_ips'),
)
def test_calculate_header_and_attrs_with_known_columns(self):
self._test_calculate_header_and_attrs(
['Name', 'ID'],
['Name', 'ID'],
['name', 'id'])
['Name', 'ID'], ['Name', 'ID'], ['name', 'id']
)
def test_calculate_header_and_attrs_with_unknown_columns(self):
self._test_calculate_header_and_attrs(
['Name', 'ID', 'device_id'],
['Name', 'ID', 'device_id'],
['name', 'id', 'device_id'])
['name', 'id', 'device_id'],
)
def test_calculate_header_and_attrs_with_attrname_columns(self):
self._test_calculate_header_and_attrs(
['name', 'id', 'device_id'],
['Name', 'ID', 'device_id'],
['name', 'id', 'device_id'])
['name', 'id', 'device_id'],
)
def test_subtest(self):
for i in range(3):
@ -512,7 +543,6 @@ class NoUniqueMatch(Exception):
class TestFindResource(test_utils.TestCase):
def setUp(self):
super(TestFindResource, self).setUp()
self.name = 'legos'
@ -572,36 +602,39 @@ class TestFindResource(test_utils.TestCase):
self.manager.find = mock.Mock(
side_effect=exceptions.NotFound(404, "2")
)
result = self.assertRaises(exceptions.CommandError,
utils.find_resource,
self.manager,
self.name)
self.assertEqual("No lego with a name or ID of 'legos' exists.",
str(result))
result = self.assertRaises(
exceptions.CommandError,
utils.find_resource,
self.manager,
self.name,
)
self.assertEqual(
"No lego with a name or ID of 'legos' exists.", str(result)
)
self.manager.get.assert_called_with(self.name)
self.manager.find.assert_called_with(name=self.name)
def test_find_resource_list_forbidden(self):
self.manager.get = mock.Mock(side_effect=Exception('Boom!'))
self.manager.find = mock.Mock(side_effect=Exception('Boom!'))
self.manager.list = mock.Mock(
side_effect=exceptions.Forbidden(403)
self.manager.list = mock.Mock(side_effect=exceptions.Forbidden(403))
self.assertRaises(
exceptions.Forbidden, utils.find_resource, self.manager, self.name
)
self.assertRaises(exceptions.Forbidden,
utils.find_resource,
self.manager,
self.name)
self.manager.list.assert_called_with()
def test_find_resource_find_no_unique(self):
self.manager.get = mock.Mock(side_effect=Exception('Boom!'))
self.manager.find = mock.Mock(side_effect=NoUniqueMatch())
result = self.assertRaises(exceptions.CommandError,
utils.find_resource,
self.manager,
self.name)
self.assertEqual("More than one lego exists with the name 'legos'.",
str(result))
result = self.assertRaises(
exceptions.CommandError,
utils.find_resource,
self.manager,
self.name,
)
self.assertEqual(
"More than one lego exists with the name 'legos'.", str(result)
)
self.manager.get.assert_called_with(self.name)
self.manager.find.assert_called_with(name=self.name)
@ -620,7 +653,9 @@ class TestFindResource(test_utils.TestCase):
loaded=True,
)
self.manager.list = mock.Mock(
return_value=[silly_resource, ],
return_value=[
silly_resource,
],
)
result = utils.find_resource(self.manager, self.name)
self.assertEqual(silly_resource, result)
@ -637,12 +672,13 @@ class TestFindResource(test_utils.TestCase):
)
)
self.manager.list = mock.Mock(return_value=[])
result = self.assertRaises(exceptions.CommandError,
utils.find_resource,
self.manager,
self.name)
self.assertEqual("Could not find resource legos",
str(result))
result = self.assertRaises(
exceptions.CommandError,
utils.find_resource,
self.manager,
self.name,
)
self.assertEqual("Could not find resource legos", str(result))
self.manager.get.assert_called_with(self.name)
self.manager.find.assert_called_with(name=self.name)
@ -665,34 +701,39 @@ class TestFindResource(test_utils.TestCase):
{'id': 'abcde', 'name': self.name},
loaded=True,
)
self.manager.list = mock.Mock(return_value=[silly_resource,
silly_resource_same])
result = self.assertRaises(exceptions.CommandError,
utils.find_resource,
self.manager,
self.name)
self.assertEqual("More than one resource exists "
"with the name or ID 'legos'.", str(result))
self.manager.list = mock.Mock(
return_value=[silly_resource, silly_resource_same]
)
result = self.assertRaises(
exceptions.CommandError,
utils.find_resource,
self.manager,
self.name,
)
self.assertEqual(
"More than one resource exists " "with the name or ID 'legos'.",
str(result),
)
self.manager.get.assert_called_with(self.name)
self.manager.find.assert_called_with(name=self.name)
def test_format_dict(self):
expected = "a='b', c='d', e='f'"
self.assertEqual(expected,
utils.format_dict({'a': 'b', 'c': 'd', 'e': 'f'}))
self.assertEqual(expected,
utils.format_dict({'e': 'f', 'c': 'd', 'a': 'b'}))
self.assertEqual(
expected, utils.format_dict({'a': 'b', 'c': 'd', 'e': 'f'})
)
self.assertEqual(
expected, utils.format_dict({'e': 'f', 'c': 'd', 'a': 'b'})
)
self.assertIsNone(utils.format_dict(None))
def test_format_dict_recursive(self):
expected = "a='b', c.1='d', c.2=''"
self.assertEqual(
expected,
utils.format_dict({'a': 'b', 'c': {'1': 'd', '2': ''}})
expected, utils.format_dict({'a': 'b', 'c': {'1': 'd', '2': ''}})
)
self.assertEqual(
expected,
utils.format_dict({'c': {'1': 'd', '2': ''}, 'a': 'b'})
expected, utils.format_dict({'c': {'1': 'd', '2': ''}, 'a': 'b'})
)
self.assertIsNone(utils.format_dict(None))
@ -710,7 +751,7 @@ class TestFindResource(test_utils.TestCase):
'b2': 'D',
},
}
)
),
)
self.assertEqual(
expected,
@ -725,45 +766,64 @@ class TestFindResource(test_utils.TestCase):
},
'a1': 'A',
}
)
),
)
def test_format_dict_of_list(self):
expected = "a=a1, a2; b=b1, b2; c=c1, c2; e="
self.assertEqual(expected,
utils.format_dict_of_list({'a': ['a2', 'a1'],
'b': ['b2', 'b1'],
'c': ['c1', 'c2'],
'd': None,
'e': []})
)
self.assertEqual(expected,
utils.format_dict_of_list({'c': ['c1', 'c2'],
'a': ['a2', 'a1'],
'b': ['b2', 'b1'],
'e': []})
)
self.assertEqual(
expected,
utils.format_dict_of_list(
{
'a': ['a2', 'a1'],
'b': ['b2', 'b1'],
'c': ['c1', 'c2'],
'd': None,
'e': [],
}
),
)
self.assertEqual(
expected,
utils.format_dict_of_list(
{
'c': ['c1', 'c2'],
'a': ['a2', 'a1'],
'b': ['b2', 'b1'],
'e': [],
}
),
)
self.assertIsNone(utils.format_dict_of_list(None))
def test_format_dict_of_list_with_separator(self):
expected = "a=a1, a2\nb=b1, b2\nc=c1, c2\ne="
self.assertEqual(expected,
utils.format_dict_of_list({'a': ['a2', 'a1'],
'b': ['b2', 'b1'],
'c': ['c1', 'c2'],
'd': None,
'e': []},
separator='\n')
)
self.assertEqual(expected,
utils.format_dict_of_list({'c': ['c1', 'c2'],
'a': ['a2', 'a1'],
'b': ['b2', 'b1'],
'e': []},
separator='\n')
)
self.assertIsNone(utils.format_dict_of_list(None,
separator='\n'))
self.assertEqual(
expected,
utils.format_dict_of_list(
{
'a': ['a2', 'a1'],
'b': ['b2', 'b1'],
'c': ['c1', 'c2'],
'd': None,
'e': [],
},
separator='\n',
),
)
self.assertEqual(
expected,
utils.format_dict_of_list(
{
'c': ['c1', 'c2'],
'a': ['a2', 'a1'],
'b': ['b2', 'b1'],
'e': [],
},
separator='\n',
),
)
self.assertIsNone(utils.format_dict_of_list(None, separator='\n'))
def test_format_list(self):
expected = 'a, b, c'
@ -790,35 +850,43 @@ class TestFindResource(test_utils.TestCase):
class TestAssertItemEqual(test_utils.TestCommand):
def test_assert_normal_item(self):
expected = ['a', 'b', 'c']
actual = ['a', 'b', 'c']
self.assertItemEqual(expected, actual)
def test_assert_item_with_formattable_columns(self):
expected = [format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z'])]
actual = [format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z'])]
expected = [
format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z']),
]
actual = [
format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z']),
]
self.assertItemEqual(expected, actual)
def test_assert_item_different_length(self):
expected = ['a', 'b', 'c']
actual = ['a', 'b']
self.assertRaises(AssertionError,
self.assertItemEqual, expected, actual)
self.assertRaises(
AssertionError, self.assertItemEqual, expected, actual
)
def test_assert_item_formattable_columns_vs_legacy_formatter(self):
expected = [format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z'])]
actual = [utils.format_dict({'a': 1, 'b': 2}),
utils.format_list(['x', 'y', 'z'])]
self.assertRaises(AssertionError,
self.assertItemEqual, expected, actual)
expected = [
format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z']),
]
actual = [
utils.format_dict({'a': 1, 'b': 2}),
utils.format_list(['x', 'y', 'z']),
]
self.assertRaises(
AssertionError, self.assertItemEqual, expected, actual
)
def test_assert_item_different_formattable_columns(self):
class ExceptionColumn(cliff_columns.FormattableColumn):
def human_readable(self):
raise Exception('always fail')
@ -828,63 +896,80 @@ class TestAssertItemEqual(test_utils.TestCommand):
# AssertionError is a subclass of Exception
# so raising AssertionError ensures ExceptionColumn.human_readable()
# is not called.
self.assertRaises(AssertionError,
self.assertItemEqual, expected, actual)
self.assertRaises(
AssertionError, self.assertItemEqual, expected, actual
)
def test_assert_list_item(self):
expected = [
['a', 'b', 'c'],
[format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z'])]
[
format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z']),
],
]
actual = [
['a', 'b', 'c'],
[format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z'])]
[
format_columns.DictColumn({'a': 1, 'b': 2}),
format_columns.ListColumn(['x', 'y', 'z']),
],
]
self.assertListItemEqual(expected, actual)
class TestSDKUtils(test_utils.TestCase):
def setUp(self):
super(TestSDKUtils, self).setUp()
def _test_get_osc_show_columns_for_sdk_resource(
self, sdk_resource, column_map,
expected_display_columns, expected_attr_columns):
display_columns, attr_columns = \
utils.get_osc_show_columns_for_sdk_resource(
sdk_resource, column_map)
self,
sdk_resource,
column_map,
expected_display_columns,
expected_attr_columns,
):
(
display_columns,
attr_columns,
) = utils.get_osc_show_columns_for_sdk_resource(
sdk_resource, column_map
)
self.assertEqual(expected_display_columns, display_columns)
self.assertEqual(expected_attr_columns, attr_columns)
def test_get_osc_show_columns_for_sdk_resource_empty(self):
self._test_get_osc_show_columns_for_sdk_resource(
{}, {}, tuple(), tuple())
{}, {}, tuple(), tuple()
)
def test_get_osc_show_columns_for_sdk_resource_empty_map(self):
self._test_get_osc_show_columns_for_sdk_resource(
{'foo': 'foo1'}, {},
('foo',), ('foo',))
{'foo': 'foo1'}, {}, ('foo',), ('foo',)
)
def test_get_osc_show_columns_for_sdk_resource_empty_data(self):
self._test_get_osc_show_columns_for_sdk_resource(
{}, {'foo': 'foo_map'},
('foo_map',), ('foo_map',))
{}, {'foo': 'foo_map'}, ('foo_map',), ('foo_map',)
)
def test_get_osc_show_columns_for_sdk_resource_map(self):
self._test_get_osc_show_columns_for_sdk_resource(
{'foo': 'foo1'}, {'foo': 'foo_map'},
('foo_map',), ('foo',))
{'foo': 'foo1'}, {'foo': 'foo_map'}, ('foo_map',), ('foo',)
)
def test_get_osc_show_columns_for_sdk_resource_map_dup(self):
self._test_get_osc_show_columns_for_sdk_resource(
{'foo': 'foo1', 'foo_map': 'foo1'}, {'foo': 'foo_map'},
('foo_map',), ('foo',))
{'foo': 'foo1', 'foo_map': 'foo1'},
{'foo': 'foo_map'},
('foo_map',),
('foo',),
)
def test_get_osc_show_columns_for_sdk_resource_map_full(self):
self._test_get_osc_show_columns_for_sdk_resource(
{'foo': 'foo1', 'bar': 'bar1'},
{'foo': 'foo_map', 'new': 'bar'},
('bar', 'foo_map'), ('bar', 'foo'))
('bar', 'foo_map'),
('bar', 'foo'),
)

View File

@ -53,11 +53,13 @@ def backward_compat_col_lister(column_headers, columns, column_map):
column_headers = list(column_headers)
for old_col, new_col in column_map.items():
if old_col in columns:
LOG.warning(_('The column "%(old_column)s" was deprecated, '
'please use "%(new_column)s" replace.') % {
'old_column': old_col,
'new_column': new_col}
)
LOG.warning(
_(
'The column "%(old_column)s" was deprecated, '
'please use "%(new_column)s" replace.'
)
% {'old_column': old_col, 'new_column': new_col}
)
if new_col in column_headers:
column_headers[column_headers.index(new_col)] = old_col
return column_headers
@ -81,11 +83,13 @@ def backward_compat_col_showone(show_object, columns, column_map):
show_object = copy.deepcopy(show_object)
for old_col, new_col in column_map.items():
if old_col in columns:
LOG.warning(_('The column "%(old_column)s" was deprecated, '
'please use "%(new_column)s" replace.') % {
'old_column': old_col,
'new_column': new_col}
)
LOG.warning(
_(
'The column "%(old_column)s" was deprecated, '
'please use "%(new_column)s" replace.'
)
% {'old_column': old_col, 'new_column': new_col}
)
if new_col in show_object:
show_object.update({old_col: show_object.pop(new_col)})
return show_object
@ -118,11 +122,13 @@ def calculate_header_and_attrs(column_headers, attrs, parsed_args):
"""
if parsed_args.columns:
header_attr_map = dict(zip(column_headers, attrs))
expected_attrs = [header_attr_map.get(c, c)
for c in parsed_args.columns]
expected_attrs = [
header_attr_map.get(c, c) for c in parsed_args.columns
]
attr_header_map = dict(zip(attrs, column_headers))
expected_headers = [attr_header_map.get(c, c)
for c in parsed_args.columns]
expected_headers = [
attr_header_map.get(c, c) for c in parsed_args.columns
]
# If attribute name is used in parsed_args.columns
# convert it into display names because cliff expects
# name in parsed_args.columns and name in column_headers matches.
@ -215,9 +221,11 @@ def find_resource(manager, name_or_id, **kwargs):
# Eventually this should be pulled from a common set
# of client exceptions.
except Exception as ex:
if (type(ex).__name__ == 'NotFound' or
type(ex).__name__ == 'HTTPNotFound' or
type(ex).__name__ == 'TypeError'):
if (
type(ex).__name__ == 'NotFound'
or type(ex).__name__ == 'HTTPNotFound'
or type(ex).__name__ == 'TypeError'
):
pass
else:
raise
@ -246,21 +254,25 @@ def find_resource(manager, name_or_id, **kwargs):
# of client exceptions.
except Exception as ex:
if type(ex).__name__ == 'NotFound':
msg = _(
"No %(resource)s with a name or ID of '%(id)s' exists."
msg = _("No %(resource)s with a name or ID of '%(id)s' exists.")
raise exceptions.CommandError(
msg
% {
'resource': manager.resource_class.__name__.lower(),
'id': name_or_id,
}
)
raise exceptions.CommandError(msg % {
'resource': manager.resource_class.__name__.lower(),
'id': name_or_id,
})
if type(ex).__name__ == 'NoUniqueMatch':
msg = _(
"More than one %(resource)s exists with the name '%(id)s'."
)
raise exceptions.CommandError(msg % {
'resource': manager.resource_class.__name__.lower(),
'id': name_or_id,
})
raise exceptions.CommandError(
msg
% {
'resource': manager.resource_class.__name__.lower(),
'id': name_or_id,
}
)
else:
pass
@ -268,8 +280,10 @@ def find_resource(manager, name_or_id, **kwargs):
# to find a matching name or ID.
count = 0
for resource in manager.list():
if (resource.get('id') == name_or_id or
resource.get('name') == name_or_id):
if (
resource.get('id') == name_or_id
or resource.get('name') == name_or_id
):
count += 1
_resource = resource
if count == 0:
@ -403,17 +417,21 @@ def get_client_class(api_name, version, version_map):
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
sorted_versions = sorted(version_map.keys(),
key=lambda s: list(map(int, s.split('.'))))
sorted_versions = sorted(
version_map.keys(), key=lambda s: list(map(int, s.split('.')))
)
msg = _(
"Invalid %(api_name)s client version '%(version)s'. "
"must be one of: %(version_map)s"
)
raise exceptions.UnsupportedVersion(msg % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(sorted_versions),
})
raise exceptions.UnsupportedVersion(
msg
% {
'api_name': api_name,
'version': version,
'version_map': ', '.join(sorted_versions),
}
)
return importutils.import_class(client_path)
@ -443,16 +461,15 @@ def get_dict_properties(item, fields, mixed_case_fields=None, formatters=None):
if field in formatters:
formatter = formatters[field]
# columns must be either a subclass of FormattableColumn
if (
isinstance(formatter, type) and
issubclass(formatter, cliff_columns.FormattableColumn)
if isinstance(formatter, type) and issubclass(
formatter, cliff_columns.FormattableColumn
):
data = formatter(data)
# or a partial wrapping one (to allow us to pass extra parameters)
elif (
isinstance(formatter, functools.partial) and
isinstance(formatter.func, type) and
issubclass(formatter.func, cliff_columns.FormattableColumn)
isinstance(formatter, functools.partial)
and isinstance(formatter.func, type)
and issubclass(formatter.func, cliff_columns.FormattableColumn)
):
data = formatter(data)
# otherwise it's probably a legacy-style function
@ -461,7 +478,8 @@ def get_dict_properties(item, fields, mixed_case_fields=None, formatters=None):
'The usage of formatter functions is now discouraged. '
'Consider using cliff.columns.FormattableColumn instead. '
'See reviews linked with bug 1687955 for more detail.',
category=DeprecationWarning)
category=DeprecationWarning,
)
if data is not None:
data = formatter(data)
else:
@ -520,15 +538,17 @@ def get_item_properties(item, fields, mixed_case_fields=None, formatters=None):
data = getattr(item, field_name, '')
if field in formatters:
formatter = formatters[field]
if (isinstance(formatter, type) and issubclass(
formatter, cliff_columns.FormattableColumn)):
if isinstance(formatter, type) and issubclass(
formatter, cliff_columns.FormattableColumn
):
data = formatter(data)
elif callable(formatter):
warnings.warn(
'The usage of formatter functions is now discouraged. '
'Consider using cliff.columns.FormattableColumn instead. '
'See reviews linked with bug 1687955 for more detail.',
category=DeprecationWarning)
category=DeprecationWarning,
)
if data is not None:
data = formatter(data)
else:
@ -561,8 +581,11 @@ def get_password(stdin, prompt=None, confirm=True):
def is_ascii(string):
try:
(string.decode('ascii') if isinstance(string, bytes)
else string.encode('ascii'))
(
string.decode('ascii')
if isinstance(string, bytes)
else string.encode('ascii')
)
return True
except (UnicodeEncodeError, UnicodeDecodeError):
return False
@ -607,10 +630,13 @@ def sort_items(items, sort_str, sort_type=None):
"'%(direction)s' is not a valid sort direction for "
"sort key %(sort_key)s, use 'asc' or 'desc' instead"
)
raise exceptions.CommandError(msg % {
'direction': direction,
'sort_key': sort_key,
})
raise exceptions.CommandError(
msg
% {
'direction': direction,
'sort_key': sort_key,
}
)
if direction == 'desc':
reverse = True
@ -632,14 +658,16 @@ def sort_items(items, sort_str, sort_type=None):
return items
def wait_for_delete(manager,
res_id,
status_field='status',
error_status=['error'],
exception_name=['NotFound'],
sleep_time=5,
timeout=300,
callback=None):
def wait_for_delete(
manager,
res_id,
status_field='status',
error_status=['error'],
exception_name=['NotFound'],
sleep_time=5,
timeout=300,
callback=None,
):
"""Wait for resource deletion
:param manager: the manager from which we can get the resource
@ -683,13 +711,15 @@ def wait_for_delete(manager,
return False
def wait_for_status(status_f,
res_id,
status_field='status',
success_status=['active'],
error_status=['error'],
sleep_time=5,
callback=None):
def wait_for_status(
status_f,
res_id,
status_field='status',
success_status=['active'],
error_status=['error'],
sleep_time=5,
callback=None,
):
"""Wait for status change on a resource during a long-running operation
:param status_f: a status function that takes a single id argument
@ -718,9 +748,7 @@ def wait_for_status(status_f,
def get_osc_show_columns_for_sdk_resource(
sdk_resource,
osc_column_map,
invisible_columns=None
sdk_resource, osc_column_map, invisible_columns=None
):
"""Get and filter the display and attribute columns for an SDK resource.
@ -740,7 +768,8 @@ def get_osc_show_columns_for_sdk_resource(
# 100% sdk compatible. Unless we introduce SDK test/fake resources we
# should check presence of the specific method
resource_dict = sdk_resource.to_dict(
body=True, headers=False, ignore_none=False)
body=True, headers=False, ignore_none=False
)
else:
# We might land here with not a real SDK Resource (during the
# transition period).

View File

@ -44,15 +44,29 @@ def get_column_definitions(attr_map, long_listing):
"""
if long_listing:
headers = [hdr for col, hdr, listing_mode in attr_map
if listing_mode in (LIST_BOTH, LIST_LONG_ONLY)]
columns = [col for col, hdr, listing_mode in attr_map
if listing_mode in (LIST_BOTH, LIST_LONG_ONLY)]
headers = [
hdr
for col, hdr, listing_mode in attr_map
if listing_mode in (LIST_BOTH, LIST_LONG_ONLY)
]
columns = [
col
for col, hdr, listing_mode in attr_map
if listing_mode in (LIST_BOTH, LIST_LONG_ONLY)
]
else:
headers = [hdr for col, hdr, listing_mode in attr_map if listing_mode
if listing_mode in (LIST_BOTH, LIST_SHORT_ONLY)]
columns = [col for col, hdr, listing_mode in attr_map if listing_mode
if listing_mode in (LIST_BOTH, LIST_SHORT_ONLY)]
headers = [
hdr
for col, hdr, listing_mode in attr_map
if listing_mode
if listing_mode in (LIST_BOTH, LIST_SHORT_ONLY)
]
columns = [
col
for col, hdr, listing_mode in attr_map
if listing_mode
if listing_mode in (LIST_BOTH, LIST_SHORT_ONLY)
]
return headers, columns
@ -92,8 +106,8 @@ def get_columns(item, attr_map=None):
attr_map = attr_map or tuple([])
_attr_map_dict = dict((col, hdr) for col, hdr, listing_mode in attr_map)
columns = [(column, _attr_map_dict.get(column, column))
for column in item.keys()]
columns = [
(column, _attr_map_dict.get(column, column)) for column in item.keys()
]
columns = sorted(columns, key=operator.itemgetter(1))
return (tuple(col[0] for col in columns),
tuple(col[1] for col in columns))
return (tuple(col[0] for col in columns), tuple(col[1] for col in columns))

View File

@ -17,13 +17,13 @@ from osc_lib.i18n import _
class _CommaListAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, values.split(','))
def add_tag_filtering_option_to_parser(
parser, resource_name, enhance_help=lambda _h: _h):
parser, resource_name, enhance_help=lambda _h: _h
):
"""Add tag filtering options to a parser.
:param parser: argparse.Argument parser object.
@ -39,32 +39,48 @@ def add_tag_filtering_option_to_parser(
metavar='<tag>[,<tag>,...]',
action=_CommaListAction,
help=enhance_help(
_('List %s which have all given tag(s) '
'(Comma-separated list of tags)') % resource_name)
_(
'List %s which have all given tag(s) '
'(Comma-separated list of tags)'
)
% resource_name
),
)
parser.add_argument(
'--any-tags',
metavar='<tag>[,<tag>,...]',
action=_CommaListAction,
help=enhance_help(
_('List %s which have any given tag(s) '
'(Comma-separated list of tags)') % resource_name)
_(
'List %s which have any given tag(s) '
'(Comma-separated list of tags)'
)
% resource_name
),
)
parser.add_argument(
'--not-tags',
metavar='<tag>[,<tag>,...]',
action=_CommaListAction,
help=enhance_help(
_('Exclude %s which have all given tag(s) '
'(Comma-separated list of tags)') % resource_name)
_(
'Exclude %s which have all given tag(s) '
'(Comma-separated list of tags)'
)
% resource_name
),
)
parser.add_argument(
'--not-any-tags',
metavar='<tag>[,<tag>,...]',
action=_CommaListAction,
help=enhance_help(
_('Exclude %s which have any given tag(s) '
'(Comma-separated list of tags)') % resource_name)
_(
'Exclude %s which have any given tag(s) '
'(Comma-separated list of tags)'
)
% resource_name
),
)
@ -88,7 +104,8 @@ def get_tag_filtering_args(parsed_args, args):
def add_tag_option_to_parser_for_create(
parser, resource_name, enhance_help=lambda _h: _h):
parser, resource_name, enhance_help=lambda _h: _h
):
"""Add tag options to a parser for create commands.
:param parser: argparse.Argument parser object.
@ -106,18 +123,23 @@ def add_tag_option_to_parser_for_create(
dest='tags',
metavar='<tag>',
help=enhance_help(
_("Tag to be added to the %s "
"(repeat option to set multiple tags)") % resource_name)
_(
"Tag to be added to the %s "
"(repeat option to set multiple tags)"
)
% resource_name
),
)
tag_group.add_argument(
'--no-tag',
action='store_true',
help=enhance_help(_("No tags associated with the %s") % resource_name)
help=enhance_help(_("No tags associated with the %s") % resource_name),
)
def add_tag_option_to_parser_for_set(
parser, resource_name, enhance_help=lambda _h: _h):
parser, resource_name, enhance_help=lambda _h: _h
):
"""Add tag options to a parser for set commands.
:param parser: argparse.Argument parser object.
@ -134,20 +156,29 @@ def add_tag_option_to_parser_for_set(
dest='tags',
metavar='<tag>',
help=enhance_help(
_("Tag to be added to the %s "
"(repeat option to set multiple tags)") % resource_name)
_(
"Tag to be added to the %s "
"(repeat option to set multiple tags)"
)
% resource_name
),
)
parser.add_argument(
'--no-tag',
action='store_true',
help=enhance_help(
_("Clear tags associated with the %s. Specify both "
"--tag and --no-tag to overwrite current tags") % resource_name)
_(
"Clear tags associated with the %s. Specify both "
"--tag and --no-tag to overwrite current tags"
)
% resource_name
),
)
def add_tag_option_to_parser_for_unset(
parser, resource_name, enhance_help=lambda _h: _h):
parser, resource_name, enhance_help=lambda _h: _h
):
"""Add tag options to a parser for set commands.
:param parser: argparse.Argument parser object.
@ -165,13 +196,20 @@ def add_tag_option_to_parser_for_unset(
dest='tags',
metavar='<tag>',
help=enhance_help(
_("Tag to be removed from the %s "
"(repeat option to remove multiple tags)") % resource_name))
_(
"Tag to be removed from the %s "
"(repeat option to remove multiple tags)"
)
% resource_name
),
)
tag_group.add_argument(
'--all-tag',
action='store_true',
help=enhance_help(
_("Clear all tags associated with the %s") % resource_name))
_("Clear all tags associated with the %s") % resource_name
),
)
def update_tags_for_set(client, obj, parsed_args):

View File

@ -32,6 +32,7 @@
# -- General configuration ------------------------------------------------
from sphinx.util import logging
# According to the discussion in
# https://github.com/sphinx-doc/sphinx/issues/10112 this may be applied as a
# dirty hack until the issue with replacing extlinks is resolved
@ -248,10 +249,8 @@ htmlhelp_basename = 'OSC_LIBReleaseNotesdoc'
latex_elements = {
# The paper size ('letterpaper' or 'a4paper').
# 'papersize': 'letterpaper',
# The font size ('10pt', '11pt' or '12pt').
# 'pointsize': '10pt',
# Additional stuff for the LaTeX preamble.
# 'preamble': '',
}
@ -259,12 +258,14 @@ latex_elements = {
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [(
'index',
'OSC_LIBReleaseNotes.tex',
'osc-lib Release Notes Documentation',
'osc-lib Developers',
'manual'),
latex_documents = [
(
'index',
'OSC_LIBReleaseNotes.tex',
'osc-lib Release Notes Documentation',
'osc-lib Developers',
'manual',
),
]
# The name of an image file (relative to this directory) to place at the top of
@ -298,13 +299,15 @@ latex_documents = [(
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [(
'index',
'osc_libreleasenotes',
'osc-lib Release Notes Documentation',
['osc-lib Developers'],
1,
)]
man_pages = [
(
'index',
'osc_libreleasenotes',
'osc-lib Release Notes Documentation',
['osc-lib Developers'],
1,
)
]
# If true, show URL addresses after external links.
#
@ -316,15 +319,17 @@ man_pages = [(
# Grouping the document tree into Texinfo files. List of tuples
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [(
'index',
'OSC_LIBReleaseNotes',
'osc-lib Release Notes Documentation',
'osc-lib Developers',
'OSC_LIBReleaseNotes',
'Common base library for OpenStackClient plugins.',
'Miscellaneous',
)]
texinfo_documents = [
(
'index',
'OSC_LIBReleaseNotes',
'osc-lib Release Notes Documentation',
'osc-lib Developers',
'OSC_LIBReleaseNotes',
'Common base library for OpenStackClient plugins.',
'Miscellaneous',
)
]
# Documents to append as an appendix to all manuals.
#

View File

@ -15,6 +15,4 @@
import setuptools
setuptools.setup(
setup_requires=['pbr>=2.0.0'],
pbr=True)
setuptools.setup(setup_requires=['pbr>=2.0.0'], pbr=True)

16
tox.ini
View File

@ -62,9 +62,17 @@ commands =
[flake8]
show-source = True
exclude = .venv,.git,.tox,dist,doc,*lib/python*,*egg,build,tools
# If 'ignore' is not set there are default errors and warnings that are set
# Doc: http://flake8.readthedocs.org/en/latest/config.html#default
ignore = W504
# The following are ignored on purpose. It's not super worth it to fix them.
# However, if you feel strongly about it, patches will be accepted to fix them
# if they fix ALL of the occurances of one and only one of them.
# E203 Black will put spaces after colons in list comprehensions
# E501 Black takes care of line length for us
# H238 New Style Classes are the default in Python3
# H301 Black will put commas after imports that can't fit on one line
# H4 Are about docstrings and there's just a huge pile of pre-existing issues.
# W503 Is supposed to be off by default but in the latest pycodestyle isn't.
# Also, both openstacksdk and Donald Knuth disagree with the rule. Line
# breaks should occur before the binary operator for readability.
ignore = E203, E501, H301, H238, H4, W503
import-order-style = pep8
application-import-names = osc_lib
filename = *.py