Sync cliutils from oslo

In the process of unification of the clients code we should
reuse common functionality from Oslo.

Related to blueprint common-client-library-2

Change-Id: Ia5e4e60f07561849f75d88b8a2ea3d23d6d5ff6d
This commit is contained in:
Andrey Kurilin 2013-12-27 16:29:23 +02:00 committed by Gerrit Code Review
parent 6cf101e9ee
commit 871c5fc1be
2 changed files with 214 additions and 0 deletions

View File

@ -0,0 +1,213 @@
# Copyright 2012 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# W0603: Using the global statement
# W0621: Redefining name %s from outer scope
# pylint: disable=W0603,W0621
import getpass
import inspect
import os
import sys
import textwrap
import prettytable
import six
from six import moves
from novaclient.openstack.common.apiclient import exceptions
from novaclient.openstack.common import strutils
def validate_args(fn, *args, **kwargs):
"""Check that the supplied args are sufficient for calling a function.
>>> validate_args(lambda a: None)
Traceback (most recent call last):
MissingArgs: Missing argument(s): a
>>> validate_args(lambda a, b, c, d: None, 0, c=1)
Traceback (most recent call last):
MissingArgs: Missing argument(s): b, d
:param fn: the function to check
:param arg: the positional arguments supplied
:param kwargs: the keyword arguments supplied
argspec = inspect.getargspec(fn)
num_defaults = len(argspec.defaults or [])
required_args = argspec.args[:len(argspec.args) - num_defaults]
def isbound(method):
return getattr(method, 'im_self', None) is not None
if isbound(fn):
missing = [arg for arg in required_args if arg not in kwargs]
missing = missing[len(args):]
if missing:
raise exceptions.MissingArgs(missing)
def arg(*args, **kwargs):
"""Decorator for CLI args.
>>> @arg("name", help="Name of the new entity")
... def entity_create(args):
... pass
def _decorator(func):
add_arg(func, *args, **kwargs)
return func
return _decorator
def env(*args, **kwargs):
"""Returns the first environment variable set.
If all are empty, defaults to '' or keyword arg `default`.
for arg in args:
value = os.environ.get(arg, None)
if value:
return value
return kwargs.get('default', '')
def add_arg(func, *args, **kwargs):
"""Bind CLI arguments to a `do_foo` function."""
if not hasattr(func, 'arguments'):
func.arguments = []
# NOTE(sirp): avoid dups that can occur when the module is shared across
# tests.
if (args, kwargs) not in func.arguments:
# Because of the semantics of decorator composition if we just append
# to the options list positional options will appear to be backwards.
func.arguments.insert(0, (args, kwargs))
def unauthenticated(func):
"""Adds 'unauthenticated' attribute to decorated function.
>>> @unauthenticated
... def mymethod(f):
... pass
func.unauthenticated = True
return func
def isunauthenticated(func):
"""Checks if the function does not require authentication.
Mark such functions with the `@unauthenticated` decorator.
:returns: bool
return getattr(func, 'unauthenticated', False)
def print_list(objs, fields, formatters=None, sortby_index=0,
"""Print a list or objects as a table, one row per object.
:param objs: iterable of :class:`Resource`
:param fields: attributes that correspond to columns, in order
:param formatters: `dict` of callables for field formatting
:param sortby_index: index of the field for sorting table rows
:param mixed_case_fields: fields corresponding to object attributes that
have mixed case names (e.g., 'serverId')
formatters = formatters or {}
mixed_case_fields = mixed_case_fields or []
if sortby_index is None:
kwargs = {}
kwargs = {'sortby': fields[sortby_index]}
pt = prettytable.PrettyTable(fields, caching=False)
pt.align = 'l'
for o in objs:
row = []
for field in fields:
if field in formatters:
if field in mixed_case_fields:
field_name = field.replace(' ', '_')
field_name = field.lower().replace(' ', '_')
data = getattr(o, field_name, '')
def print_dict(dct, dict_property="Property", wrap=0):
"""Print a `dict` as a table of two columns.
:param dct: `dict` to print
:param dict_property: name of the first column
:param wrap: wrapping for the second column
pt = prettytable.PrettyTable([dict_property, 'Value'], caching=False)
pt.align = 'l'
for k, v in six.iteritems(dct):
# convert dict to str to check length
if isinstance(v, dict):
v = str(v)
if wrap > 0:
v = textwrap.fill(str(v), wrap)
# if value has a newline, add in multiple rows
# e.g. fault with stacktrace
if v and isinstance(v, six.string_types) and r'\n' in v:
lines = v.strip().split(r'\n')
col1 = k
for line in lines:
pt.add_row([col1, line])
col1 = ''
pt.add_row([k, v])
def get_password(max_password_prompts=3):
"""Read password from TTY."""
verify = strutils.bool_from_string(env("OS_VERIFY_PASSWORD"))
pw = None
if hasattr(sys.stdin, "isatty") and sys.stdin.isatty():
# Check for Ctrl-D
for _ in moves.range(max_password_prompts):
pw1 = getpass.getpass("OS Password: ")
if verify:
pw2 = getpass.getpass("Please verify: ")
pw2 = pw1
if pw1 == pw2 and pw1:
pw = pw1
except EOFError:
return pw

View File

@ -9,6 +9,7 @@ module=uuidutils
# The base module to hold the copy of openstack.common