
This commit adds valid_keys and required_keys to str2dict and define a new function which can be used as argparse type validator. By this function, we can declare what fields are valid and what fields are required for dictionary option in option definition. Change-Id: Ib7b233e082c15d0bd6e16a754b8acad52e413986 Closes-Bug: #1102897
229 lines
7.5 KiB
Python
229 lines
7.5 KiB
Python
# Copyright 2011, VMware, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Borrowed from nova code base, more utilities will be added/borrowed as and
|
|
# when needed.
|
|
|
|
"""Utilities and helper functions."""
|
|
|
|
import argparse
|
|
import functools
|
|
import logging
|
|
import netaddr
|
|
import os
|
|
|
|
from oslo_utils import encodeutils
|
|
from oslo_utils import importutils
|
|
import six
|
|
|
|
from neutronclient._i18n import _
|
|
from neutronclient.common import exceptions
|
|
|
|
|
|
def env(*vars, **kwargs):
|
|
"""Returns the first environment variable set.
|
|
|
|
If none are non-empty, defaults to '' or keyword arg default.
|
|
"""
|
|
for v in vars:
|
|
value = os.environ.get(v)
|
|
if value:
|
|
return value
|
|
return kwargs.get('default', '')
|
|
|
|
|
|
def convert_to_uppercase(string):
|
|
return string.upper()
|
|
|
|
|
|
def convert_to_lowercase(string):
|
|
return string.lower()
|
|
|
|
|
|
def get_client_class(api_name, version, version_map):
|
|
"""Returns the client class for the requested API version.
|
|
|
|
:param api_name: the name of the API, e.g. 'compute', 'image', etc
|
|
:param version: the requested API version
|
|
:param version_map: a dict of client classes keyed by version
|
|
:rtype: a client class for the requested API version
|
|
"""
|
|
try:
|
|
client_path = version_map[str(version)]
|
|
except (KeyError, ValueError):
|
|
msg = _("Invalid %(api_name)s client version '%(version)s'. must be "
|
|
"one of: %(map_keys)s")
|
|
msg = msg % {'api_name': api_name, 'version': version,
|
|
'map_keys': ', '.join(version_map.keys())}
|
|
raise exceptions.UnsupportedVersion(msg)
|
|
|
|
return importutils.import_class(client_path)
|
|
|
|
|
|
def get_item_properties(item, fields, mixed_case_fields=(), formatters=None):
|
|
"""Return a tuple containing the item properties.
|
|
|
|
:param item: a single item resource (e.g. Server, Tenant, etc)
|
|
:param fields: tuple of strings with the desired field names
|
|
:param mixed_case_fields: tuple of field names to preserve case
|
|
:param formatters: dictionary mapping field names to callables
|
|
to format the values
|
|
"""
|
|
if formatters is None:
|
|
formatters = {}
|
|
|
|
row = []
|
|
|
|
for field in fields:
|
|
if field in formatters:
|
|
row.append(formatters[field](item))
|
|
else:
|
|
if field in mixed_case_fields:
|
|
field_name = field.replace(' ', '_')
|
|
else:
|
|
field_name = field.lower().replace(' ', '_')
|
|
if not hasattr(item, field_name) and isinstance(item, dict):
|
|
data = item[field_name]
|
|
else:
|
|
data = getattr(item, field_name, '')
|
|
if data is None:
|
|
data = ''
|
|
row.append(data)
|
|
return tuple(row)
|
|
|
|
|
|
def str2bool(strbool):
|
|
if strbool is None:
|
|
return None
|
|
return strbool.lower() == 'true'
|
|
|
|
|
|
def str2dict(strdict, required_keys=None, optional_keys=None):
|
|
"""Convert key1=value1,key2=value2,... string into dictionary.
|
|
|
|
:param strdict: string in the form of key1=value1,key2=value2
|
|
:param required_keys: list of required keys. All keys in this list must be
|
|
specified. Otherwise ArgumentTypeError will be raised.
|
|
If this parameter is unspecified, no required key check
|
|
will be done.
|
|
:param optional_keys: list of optional keys.
|
|
This parameter is used for valid key check.
|
|
When at least one of required_keys and optional_keys,
|
|
a key must be a member of either of required_keys or
|
|
optional_keys. Otherwise, ArgumentTypeError will be
|
|
raised. When both required_keys and optional_keys are
|
|
unspecified, no valid key check will be done.
|
|
"""
|
|
result = {}
|
|
if strdict:
|
|
for kv in strdict.split(','):
|
|
key, sep, value = kv.partition('=')
|
|
if not sep:
|
|
msg = _("invalid key-value '%s', expected format: key=value")
|
|
raise argparse.ArgumentTypeError(msg % kv)
|
|
result[key] = value
|
|
valid_keys = set(required_keys or []) | set(optional_keys or [])
|
|
if valid_keys:
|
|
invalid_keys = [k for k in result if k not in valid_keys]
|
|
if invalid_keys:
|
|
msg = _("Invalid key(s) '%(invalid_keys)s' specified. "
|
|
"Valid key(s): '%(valid_keys)s'.")
|
|
raise argparse.ArgumentTypeError(
|
|
msg % {'invalid_keys': ', '.join(sorted(invalid_keys)),
|
|
'valid_keys': ', '.join(sorted(valid_keys))})
|
|
if required_keys:
|
|
not_found_keys = [k for k in required_keys if k not in result]
|
|
if not_found_keys:
|
|
msg = _("Required key(s) '%s' not specified.")
|
|
raise argparse.ArgumentTypeError(msg % ', '.join(not_found_keys))
|
|
return result
|
|
|
|
|
|
def str2dict_type(optional_keys=None, required_keys=None):
|
|
return functools.partial(str2dict,
|
|
optional_keys=optional_keys,
|
|
required_keys=required_keys)
|
|
|
|
|
|
def http_log_req(_logger, args, kwargs):
|
|
if not _logger.isEnabledFor(logging.DEBUG):
|
|
return
|
|
|
|
string_parts = ['curl -i']
|
|
for element in args:
|
|
if element in ('GET', 'POST', 'DELETE', 'PUT'):
|
|
string_parts.append(' -X %s' % element)
|
|
else:
|
|
string_parts.append(' %s' % element)
|
|
|
|
for element in kwargs['headers']:
|
|
header = ' -H "%s: %s"' % (element, kwargs['headers'][element])
|
|
string_parts.append(header)
|
|
|
|
if 'body' in kwargs and kwargs['body']:
|
|
string_parts.append(" -d '%s'" % (kwargs['body']))
|
|
req = encodeutils.safe_encode("".join(string_parts))
|
|
_logger.debug("REQ: %s", req)
|
|
|
|
|
|
def http_log_resp(_logger, resp, body):
|
|
if not _logger.isEnabledFor(logging.DEBUG):
|
|
return
|
|
_logger.debug("RESP: %(code)s %(headers)s %(body)s",
|
|
{'code': resp.status_code,
|
|
'headers': resp.headers,
|
|
'body': body})
|
|
|
|
|
|
def _safe_encode_without_obj(data):
|
|
if isinstance(data, six.string_types):
|
|
return encodeutils.safe_encode(data)
|
|
return data
|
|
|
|
|
|
def safe_encode_list(data):
|
|
return list(map(_safe_encode_without_obj, data))
|
|
|
|
|
|
def safe_encode_dict(data):
|
|
def _encode_item(item):
|
|
k, v = item
|
|
if isinstance(v, list):
|
|
return (k, safe_encode_list(v))
|
|
elif isinstance(v, dict):
|
|
return (k, safe_encode_dict(v))
|
|
return (k, _safe_encode_without_obj(v))
|
|
|
|
return dict(list(map(_encode_item, data.items())))
|
|
|
|
|
|
def add_boolean_argument(parser, name, **kwargs):
|
|
for keyword in ('metavar', 'choices'):
|
|
kwargs.pop(keyword, None)
|
|
default = kwargs.pop('default', argparse.SUPPRESS)
|
|
parser.add_argument(
|
|
name,
|
|
metavar='{True,False}',
|
|
choices=['True', 'true', 'False', 'false'],
|
|
default=default,
|
|
**kwargs)
|
|
|
|
|
|
def is_valid_cidr(cidr):
|
|
try:
|
|
netaddr.IPNetwork(cidr)
|
|
return True
|
|
except Exception:
|
|
return False
|