Sync with oslo-incubator

According to the latest agreement in Oslo team, apiclient and
clituils will be removed from Oslo incubator. We should keep
the local copy by ourselfs:
1. rename openstack/common/apiclient to common/apiclient
2. Because Ironic only use base.py and exception.py, so delete others.
3. rename openstack/common/cliutils.py to common/clituils.py
4. Delete openstack/common/_i18n becuase we have common/i18n.py
5. Delete openstack folder because it is empty
6. Add related unit tests and add oslotest to test-requirements.

Change-Id: I9eecd0457e86e82e65db1d0bc9a4c5c0ed4bbf74
This commit is contained in:
Lin Tan 2015-11-12 14:03:40 +08:00 committed by Jim Rollenhagen
parent be5e26a176
commit 4d966622ef
26 changed files with 997 additions and 1008 deletions

View File

@ -20,19 +20,6 @@
Base utilities to build API operation managers and objects on top of.
"""
########################################################################
#
# THIS MODULE IS DEPRECATED
#
# Please refer to
# https://etherpad.openstack.org/p/kilo-ironicclient-library-proposals for
# the discussion leading to this deprecation.
#
# We recommend checking out the python-openstacksdk project
# (https://launchpad.net/python-openstacksdk) instead.
#
########################################################################
# E1102: %s is not callable
# pylint: disable=E1102
@ -44,8 +31,8 @@ from oslo_utils import strutils
import six
from six.moves.urllib import parse
from ironicclient.openstack.common._i18n import _
from ironicclient.openstack.common.apiclient import exceptions
from ironicclient.common.apiclient import exceptions
from ironicclient.common.i18n import _
def getid(obj):
@ -467,8 +454,7 @@ class Resource(object):
@property
def human_id(self):
"""Human-readable ID which can be used for bash completion.
"""
"""Human-readable ID which can be used for bash completion."""
if self.HUMAN_ID:
name = getattr(self, self.NAME_ATTR, None)
if name is not None:

View File

@ -20,30 +20,17 @@
Exception definitions.
"""
########################################################################
#
# THIS MODULE IS DEPRECATED
#
# Please refer to
# https://etherpad.openstack.org/p/kilo-ironicclient-library-proposals for
# the discussion leading to this deprecation.
#
# We recommend checking out the python-openstacksdk project
# (https://launchpad.net/python-openstacksdk) instead.
#
########################################################################
import inspect
import sys
import six
from ironicclient.openstack.common._i18n import _
from ironicclient.common.i18n import _
class ClientException(Exception):
"""The base exception class for all exceptions this library raises.
"""
"""The base exception class for all exceptions this library raises."""
pass
@ -118,8 +105,7 @@ class AmbiguousEndpoints(EndpointException):
class HttpError(ClientException):
"""The base exception class for all HTTP exceptions.
"""
"""The base exception class for all HTTP exceptions."""
http_status = 0
message = _("HTTP Error")

View File

@ -21,7 +21,7 @@ import copy
import six.moves.urllib.parse as urlparse
from ironicclient.openstack.common.apiclient import base
from ironicclient.common.apiclient import base
def getid(obj):

View File

@ -30,7 +30,7 @@ import prettytable
import six
from six import moves
from ironicclient.openstack.common._i18n import _
from ironicclient.common.i18n import _
class MissingArgs(Exception):

View File

@ -10,8 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from ironicclient.openstack.common.apiclient import exceptions
from ironicclient.openstack.common.apiclient.exceptions import * # noqa
from ironicclient.common.apiclient import exceptions
from ironicclient.common.apiclient.exceptions import * # noqa
# NOTE(akurilin): This alias is left here since v.0.1.3 to support backwards

View File

@ -1,45 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""oslo.i18n integration module.
See http://docs.openstack.org/developer/oslo.i18n/usage.html
"""
try:
import oslo_i18n
# NOTE(dhellmann): This reference to o-s-l-o will be replaced by the
# application name when this module is synced into the separate
# repository. It is OK to have more than one translation function
# using the same domain, since there will still only be one message
# catalog.
_translators = oslo_i18n.TranslatorFactory(domain='ironicclient')
# The primary translation function using the well-known name "_"
_ = _translators.primary
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = _translators.log_info
_LW = _translators.log_warning
_LE = _translators.log_error
_LC = _translators.log_critical
except ImportError:
# NOTE(dims): Support for cases where a project wants to use
# code from oslo-incubator, but is not ready to be internationalized
# (like tempest)
_ = _LI = _LW = _LE = _LC = lambda x: x

View File

@ -1,234 +0,0 @@
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Spanish National Research Council.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# E0202: An attribute inherited from %s hide this method
# pylint: disable=E0202
########################################################################
#
# THIS MODULE IS DEPRECATED
#
# Please refer to
# https://etherpad.openstack.org/p/kilo-ironicclient-library-proposals for
# the discussion leading to this deprecation.
#
# We recommend checking out the python-openstacksdk project
# (https://launchpad.net/python-openstacksdk) instead.
#
########################################################################
import abc
import argparse
import os
import six
from stevedore import extension
from ironicclient.openstack.common.apiclient import exceptions
_discovered_plugins = {}
def discover_auth_systems():
"""Discover the available auth-systems.
This won't take into account the old style auth-systems.
"""
global _discovered_plugins
_discovered_plugins = {}
def add_plugin(ext):
_discovered_plugins[ext.name] = ext.plugin
ep_namespace = "ironicclient.openstack.common.apiclient.auth"
mgr = extension.ExtensionManager(ep_namespace)
mgr.map(add_plugin)
def load_auth_system_opts(parser):
"""Load options needed by the available auth-systems into a parser.
This function will try to populate the parser with options from the
available plugins.
"""
group = parser.add_argument_group("Common auth options")
BaseAuthPlugin.add_common_opts(group)
for name, auth_plugin in six.iteritems(_discovered_plugins):
group = parser.add_argument_group(
"Auth-system '%s' options" % name,
conflict_handler="resolve")
auth_plugin.add_opts(group)
def load_plugin(auth_system):
try:
plugin_class = _discovered_plugins[auth_system]
except KeyError:
raise exceptions.AuthSystemNotFound(auth_system)
return plugin_class(auth_system=auth_system)
def load_plugin_from_args(args):
"""Load required plugin and populate it with options.
Try to guess auth system if it is not specified. Systems are tried in
alphabetical order.
:type args: argparse.Namespace
:raises: AuthPluginOptionsMissing
"""
auth_system = args.os_auth_system
if auth_system:
plugin = load_plugin(auth_system)
plugin.parse_opts(args)
plugin.sufficient_options()
return plugin
for plugin_auth_system in sorted(six.iterkeys(_discovered_plugins)):
plugin_class = _discovered_plugins[plugin_auth_system]
plugin = plugin_class()
plugin.parse_opts(args)
try:
plugin.sufficient_options()
except exceptions.AuthPluginOptionsMissing:
continue
return plugin
raise exceptions.AuthPluginOptionsMissing(["auth_system"])
@six.add_metaclass(abc.ABCMeta)
class BaseAuthPlugin(object):
"""Base class for authentication plugins.
An authentication plugin needs to override at least the authenticate
method to be a valid plugin.
"""
auth_system = None
opt_names = []
common_opt_names = [
"auth_system",
"username",
"password",
"tenant_name",
"token",
"auth_url",
]
def __init__(self, auth_system=None, **kwargs):
self.auth_system = auth_system or self.auth_system
self.opts = dict((name, kwargs.get(name))
for name in self.opt_names)
@staticmethod
def _parser_add_opt(parser, opt):
"""Add an option to parser in two variants.
:param opt: option name (with underscores)
"""
dashed_opt = opt.replace("_", "-")
env_var = "OS_%s" % opt.upper()
arg_default = os.environ.get(env_var, "")
arg_help = "Defaults to env[%s]." % env_var
parser.add_argument(
"--os-%s" % dashed_opt,
metavar="<%s>" % dashed_opt,
default=arg_default,
help=arg_help)
parser.add_argument(
"--os_%s" % opt,
metavar="<%s>" % dashed_opt,
help=argparse.SUPPRESS)
@classmethod
def add_opts(cls, parser):
"""Populate the parser with the options for this plugin.
"""
for opt in cls.opt_names:
# use `BaseAuthPlugin.common_opt_names` since it is never
# changed in child classes
if opt not in BaseAuthPlugin.common_opt_names:
cls._parser_add_opt(parser, opt)
@classmethod
def add_common_opts(cls, parser):
"""Add options that are common for several plugins.
"""
for opt in cls.common_opt_names:
cls._parser_add_opt(parser, opt)
@staticmethod
def get_opt(opt_name, args):
"""Return option name and value.
:param opt_name: name of the option, e.g., "username"
:param args: parsed arguments
"""
return (opt_name, getattr(args, "os_%s" % opt_name, None))
def parse_opts(self, args):
"""Parse the actual auth-system options if any.
This method is expected to populate the attribute `self.opts` with a
dict containing the options and values needed to make authentication.
"""
self.opts.update(dict(self.get_opt(opt_name, args)
for opt_name in self.opt_names))
def authenticate(self, http_client):
"""Authenticate using plugin defined method.
The method usually analyses `self.opts` and performs
a request to authentication server.
:param http_client: client object that needs authentication
:type http_client: HTTPClient
:raises: AuthorizationFailure
"""
self.sufficient_options()
self._do_authenticate(http_client)
@abc.abstractmethod
def _do_authenticate(self, http_client):
"""Protected method for authentication.
"""
def sufficient_options(self):
"""Check if all required options are present.
:raises: AuthPluginOptionsMissing
"""
missing = [opt
for opt in self.opt_names
if not self.opts.get(opt)]
if missing:
raise exceptions.AuthPluginOptionsMissing(missing)
@abc.abstractmethod
def token_and_endpoint(self, endpoint_type, service_type):
"""Return token and endpoint.
:param service_type: Service type of the endpoint
:type service_type: string
:param endpoint_type: Type of endpoint.
Possible values: public or publicURL,
internal or internalURL,
admin or adminURL
:type endpoint_type: string
:returns: tuple of token and endpoint strings
:raises: EndpointException
"""

View File

@ -1,388 +0,0 @@
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack Foundation
# Copyright 2011 Piston Cloud Computing, Inc.
# Copyright 2013 Alessio Ababilov
# Copyright 2013 Grid Dynamics
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
OpenStack Client interface. Handles the REST calls and responses.
"""
# E0202: An attribute inherited from %s hide this method
# pylint: disable=E0202
import hashlib
import logging
import time
try:
import simplejson as json
except ImportError:
import json
from oslo_utils import encodeutils
from oslo_utils import importutils
import requests
from ironicclient.openstack.common._i18n import _
from ironicclient.openstack.common.apiclient import exceptions
_logger = logging.getLogger(__name__)
SENSITIVE_HEADERS = ('X-Auth-Token', 'X-Subject-Token',)
class HTTPClient(object):
"""This client handles sending HTTP requests to OpenStack servers.
Features:
- share authentication information between several clients to different
services (e.g., for compute and image clients);
- reissue authentication request for expired tokens;
- encode/decode JSON bodies;
- raise exceptions on HTTP errors;
- pluggable authentication;
- store authentication information in a keyring;
- store time spent for requests;
- register clients for particular services, so one can use
`http_client.identity` or `http_client.compute`;
- log requests and responses in a format that is easy to copy-and-paste
into terminal and send the same request with curl.
"""
user_agent = "ironicclient.openstack.common.apiclient"
def __init__(self,
auth_plugin,
region_name=None,
endpoint_type="publicURL",
original_ip=None,
verify=True,
cert=None,
timeout=None,
timings=False,
keyring_saver=None,
debug=False,
user_agent=None,
http=None):
self.auth_plugin = auth_plugin
self.endpoint_type = endpoint_type
self.region_name = region_name
self.original_ip = original_ip
self.timeout = timeout
self.verify = verify
self.cert = cert
self.keyring_saver = keyring_saver
self.debug = debug
self.user_agent = user_agent or self.user_agent
self.times = [] # [("item", starttime, endtime), ...]
self.timings = timings
# requests within the same session can reuse TCP connections from pool
self.http = http or requests.Session()
self.cached_token = None
self.last_request_id = None
def _safe_header(self, name, value):
if name in SENSITIVE_HEADERS:
# because in python3 byte string handling is ... ug
v = value.encode('utf-8')
h = hashlib.sha1(v)
d = h.hexdigest()
return encodeutils.safe_decode(name), "{SHA1}%s" % d
else:
return (encodeutils.safe_decode(name),
encodeutils.safe_decode(value))
def _http_log_req(self, method, url, kwargs):
if not self.debug:
return
string_parts = [
"curl -g -i",
"-X '%s'" % method,
"'%s'" % url,
]
for element in kwargs['headers']:
header = ("-H '%s: %s'" %
self._safe_header(element, kwargs['headers'][element]))
string_parts.append(header)
_logger.debug("REQ: %s" % " ".join(string_parts))
if 'data' in kwargs:
_logger.debug("REQ BODY: %s\n" % (kwargs['data']))
def _http_log_resp(self, resp):
if not self.debug:
return
_logger.debug(
"RESP: [%s] %s\n",
resp.status_code,
resp.headers)
if resp._content_consumed:
_logger.debug(
"RESP BODY: %s\n",
resp.text)
def serialize(self, kwargs):
if kwargs.get('json') is not None:
kwargs['headers']['Content-Type'] = 'application/json'
kwargs['data'] = json.dumps(kwargs['json'])
try:
del kwargs['json']
except KeyError:
pass
def get_timings(self):
return self.times
def reset_timings(self):
self.times = []
def request(self, method, url, **kwargs):
"""Send an http request with the specified characteristics.
Wrapper around `requests.Session.request` to handle tasks such as
setting headers, JSON encoding/decoding, and error handling.
:param method: method of HTTP request
:param url: URL of HTTP request
:param kwargs: any other parameter that can be passed to
requests.Session.request (such as `headers`) or `json`
that will be encoded as JSON and used as `data` argument
"""
kwargs.setdefault("headers", {})
kwargs["headers"]["User-Agent"] = self.user_agent
if self.original_ip:
kwargs["headers"]["Forwarded"] = "for=%s;by=%s" % (
self.original_ip, self.user_agent)
if self.timeout is not None:
kwargs.setdefault("timeout", self.timeout)
kwargs.setdefault("verify", self.verify)
if self.cert is not None:
kwargs.setdefault("cert", self.cert)
self.serialize(kwargs)
self._http_log_req(method, url, kwargs)
if self.timings:
start_time = time.time()
resp = self.http.request(method, url, **kwargs)
if self.timings:
self.times.append(("%s %s" % (method, url),
start_time, time.time()))
self._http_log_resp(resp)
self.last_request_id = resp.headers.get('x-openstack-request-id')
if resp.status_code >= 400:
_logger.debug(
"Request returned failure status: %s",
resp.status_code)
raise exceptions.from_response(resp, method, url)
return resp
@staticmethod
def concat_url(endpoint, url):
"""Concatenate endpoint and final URL.
E.g., "http://keystone/v2.0/" and "/tokens" are concatenated to
"http://keystone/v2.0/tokens".
:param endpoint: the base URL
:param url: the final URL
"""
return "%s/%s" % (endpoint.rstrip("/"), url.strip("/"))
def client_request(self, client, method, url, **kwargs):
"""Send an http request using `client`'s endpoint and specified `url`.
If request was rejected as unauthorized (possibly because the token is
expired), issue one authorization attempt and send the request once
again.
:param client: instance of BaseClient descendant
:param method: method of HTTP request
:param url: URL of HTTP request
:param kwargs: any other parameter that can be passed to
`HTTPClient.request`
"""
filter_args = {
"endpoint_type": client.endpoint_type or self.endpoint_type,
"service_type": client.service_type,
}
token, endpoint = (self.cached_token, client.cached_endpoint)
just_authenticated = False
if not (token and endpoint):
try:
token, endpoint = self.auth_plugin.token_and_endpoint(
**filter_args)
except exceptions.EndpointException:
pass
if not (token and endpoint):
self.authenticate()
just_authenticated = True
token, endpoint = self.auth_plugin.token_and_endpoint(
**filter_args)
if not (token and endpoint):
raise exceptions.AuthorizationFailure(
_("Cannot find endpoint or token for request"))
old_token_endpoint = (token, endpoint)
kwargs.setdefault("headers", {})["X-Auth-Token"] = token
self.cached_token = token
client.cached_endpoint = endpoint
# Perform the request once. If we get Unauthorized, then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
return self.request(
method, self.concat_url(endpoint, url), **kwargs)
except exceptions.Unauthorized as unauth_ex:
if just_authenticated:
raise
self.cached_token = None
client.cached_endpoint = None
if self.auth_plugin.opts.get('token'):
self.auth_plugin.opts['token'] = None
if self.auth_plugin.opts.get('endpoint'):
self.auth_plugin.opts['endpoint'] = None
self.authenticate()
try:
token, endpoint = self.auth_plugin.token_and_endpoint(
**filter_args)
except exceptions.EndpointException:
raise unauth_ex
if (not (token and endpoint) or
old_token_endpoint == (token, endpoint)):
raise unauth_ex
self.cached_token = token
client.cached_endpoint = endpoint
kwargs["headers"]["X-Auth-Token"] = token
return self.request(
method, self.concat_url(endpoint, url), **kwargs)
def add_client(self, base_client_instance):
"""Add a new instance of :class:`BaseClient` descendant.
`self` will store a reference to `base_client_instance`.
Example:
>>> def test_clients():
... from keystoneclient.auth import keystone
... from openstack.common.apiclient import client
... auth = keystone.KeystoneAuthPlugin(
... username="user", password="pass", tenant_name="tenant",
... auth_url="http://auth:5000/v2.0")
... openstack_client = client.HTTPClient(auth)
... # create nova client
... from novaclient.v1_1 import client
... client.Client(openstack_client)
... # create keystone client
... from keystoneclient.v2_0 import client
... client.Client(openstack_client)
... # use them
... openstack_client.identity.tenants.list()
... openstack_client.compute.servers.list()
"""
service_type = base_client_instance.service_type
if service_type and not hasattr(self, service_type):
setattr(self, service_type, base_client_instance)
def authenticate(self):
self.auth_plugin.authenticate(self)
# Store the authentication results in the keyring for later requests
if self.keyring_saver:
self.keyring_saver.save(self)
class BaseClient(object):
"""Top-level object to access the OpenStack API.
This client uses :class:`HTTPClient` to send requests. :class:`HTTPClient`
will handle a bunch of issues such as authentication.
"""
service_type = None
endpoint_type = None # "publicURL" will be used
cached_endpoint = None
def __init__(self, http_client, extensions=None):
self.http_client = http_client
http_client.add_client(self)
# Add in any extensions...
if extensions:
for extension in extensions:
if extension.manager_class:
setattr(self, extension.name,
extension.manager_class(self))
def client_request(self, method, url, **kwargs):
return self.http_client.client_request(
self, method, url, **kwargs)
@property
def last_request_id(self):
return self.http_client.last_request_id
def head(self, url, **kwargs):
return self.client_request("HEAD", url, **kwargs)
def get(self, url, **kwargs):
return self.client_request("GET", url, **kwargs)
def post(self, url, **kwargs):
return self.client_request("POST", url, **kwargs)
def put(self, url, **kwargs):
return self.client_request("PUT", url, **kwargs)
def delete(self, url, **kwargs):
return self.client_request("DELETE", url, **kwargs)
def patch(self, url, **kwargs):
return self.client_request("PATCH", url, **kwargs)
@staticmethod
def get_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = _("Invalid %(api_name)s client version '%(version)s'. "
"Must be one of: %(version_map)s") % {
'api_name': api_name,
'version': version,
'version_map': ', '.join(version_map.keys())}
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)

View File

@ -1,190 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A fake server that "responds" to API methods with pre-canned responses.
All of these responses come from the spec, so if for some reason the spec's
wrong the tests might raise AssertionError. I've indicated in comments the
places where actual behavior differs from the spec.
"""
########################################################################
#
# THIS MODULE IS DEPRECATED
#
# Please refer to
# https://etherpad.openstack.org/p/kilo-ironicclient-library-proposals for
# the discussion leading to this deprecation.
#
# We recommend checking out the python-openstacksdk project
# (https://launchpad.net/python-openstacksdk) instead.
#
########################################################################
# W0102: Dangerous default value %s as argument
# pylint: disable=W0102
import json
import requests
import six
from six.moves.urllib import parse
from ironicclient.openstack.common.apiclient import client
def assert_has_keys(dct, required=None, optional=None):
required = required or []
optional = optional or []
for k in required:
try:
assert k in dct
except AssertionError:
extra_keys = set(dct.keys()).difference(set(required + optional))
raise AssertionError("found unexpected keys: %s" %
list(extra_keys))
class TestResponse(requests.Response):
"""Wrap requests.Response and provide a convenient initialization.
"""
def __init__(self, data):
super(TestResponse, self).__init__()
self._content_consumed = True
if isinstance(data, dict):
self.status_code = data.get('status_code', 200)
# Fake the text attribute to streamline Response creation
text = data.get('text', "")
if isinstance(text, (dict, list)):
self._content = json.dumps(text)
default_headers = {
"Content-Type": "application/json",
}
else:
self._content = text
default_headers = {}
if six.PY3 and isinstance(self._content, six.string_types):
self._content = self._content.encode('utf-8', 'strict')
self.headers = data.get('headers') or default_headers
else:
self.status_code = data
def __eq__(self, other):
return (self.status_code == other.status_code and
self.headers == other.headers and
self._content == other._content)
class FakeHTTPClient(client.HTTPClient):
def __init__(self, *args, **kwargs):
self.callstack = []
self.fixtures = kwargs.pop("fixtures", None) or {}
if not args and "auth_plugin" not in kwargs:
args = (None, )
super(FakeHTTPClient, self).__init__(*args, **kwargs)
def assert_called(self, method, url, body=None, pos=-1):
"""Assert than an API method was just called.
"""
expected = (method, url)
called = self.callstack[pos][0:2]
assert self.callstack, \
"Expected %s %s but no calls were made." % expected
assert expected == called, 'Expected %s %s; got %s %s' % \
(expected + called)
if body is not None:
if self.callstack[pos][3] != body:
raise AssertionError('%r != %r' %
(self.callstack[pos][3], body))
def assert_called_anytime(self, method, url, body=None):
"""Assert than an API method was called anytime in the test.
"""
expected = (method, url)
assert self.callstack, \
"Expected %s %s but no calls were made." % expected
found = False
entry = None
for entry in self.callstack:
if expected == entry[0:2]:
found = True
break
assert found, 'Expected %s %s; got %s' % \
(method, url, self.callstack)
if body is not None:
assert entry[3] == body, "%s != %s" % (entry[3], body)
self.callstack = []
def clear_callstack(self):
self.callstack = []
def authenticate(self):
pass
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = parse.parse_qsl(parse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
self.last_request_id = headers.get('x-openstack-request-id',
'req-test')
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})

View File

@ -1,100 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
########################################################################
#
# THIS MODULE IS DEPRECATED
#
# Please refer to
# https://etherpad.openstack.org/p/kilo-ironicclient-library-proposals for
# the discussion leading to this deprecation.
#
# We recommend checking out the python-openstacksdk project
# (https://launchpad.net/python-openstacksdk) instead.
#
########################################################################
from oslo_utils import encodeutils
from oslo_utils import uuidutils
import six
from ironicclient.openstack.common._i18n import _
from ironicclient.openstack.common.apiclient import exceptions
def find_resource(manager, name_or_id, **find_args):
"""Look for resource in a given manager.
Used as a helper for the _find_* methods.
Example:
.. code-block:: python
def _find_hypervisor(cs, hypervisor):
#Get a hypervisor by name or ID.
return cliutils.find_resource(cs.hypervisors, hypervisor)
"""
# first try to get entity as integer id
try:
return manager.get(int(name_or_id))
except (TypeError, ValueError, exceptions.NotFound):
pass
# now try to get entity as uuid
try:
if six.PY2:
tmp_id = encodeutils.safe_encode(name_or_id)
else:
tmp_id = encodeutils.safe_decode(name_or_id)
if uuidutils.is_uuid_like(tmp_id):
return manager.get(tmp_id)
except (TypeError, ValueError, exceptions.NotFound):
pass
# for str id which is not uuid
if getattr(manager, 'is_alphanum_id_allowed', False):
try:
return manager.get(name_or_id)
except exceptions.NotFound:
pass
try:
try:
return manager.find(human_id=name_or_id, **find_args)
except exceptions.NotFound:
pass
# finally try to find entity by name
try:
resource = getattr(manager, 'resource_class', None)
name_attr = resource.NAME_ATTR if resource else 'name'
kwargs = {name_attr: name_or_id}
kwargs.update(find_args)
return manager.find(**kwargs)
except exceptions.NotFound:
msg = _("No %(name)s with a name or "
"ID of '%(name_or_id)s' exists.") % \
{
"name": manager.resource_class.__name__.lower(),
"name_or_id": name_or_id
}
raise exceptions.CommandError(msg)
except exceptions.NoUniqueMatch:
msg = _("Multiple %(name)s matches found for "
"'%(name_or_id)s', use an ID to be more specific.") % \
{
"name": manager.resource_class.__name__.lower(),
"name_or_id": name_or_id
}
raise exceptions.CommandError(msg)

View File

@ -34,11 +34,11 @@ import six.moves.urllib.parse as urlparse
import ironicclient
from ironicclient import client as iroclient
from ironicclient.common import cliutils
from ironicclient.common import http
from ironicclient.common.i18n import _
from ironicclient.common import utils
from ironicclient import exc
from ironicclient.openstack.common import cliutils
LATEST_API_VERSION = ('1', 'latest')

View File

@ -0,0 +1,157 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslotest import base as test_base
from ironicclient.common.apiclient import base
class HumanResource(base.Resource):
HUMAN_ID = True
class HumanResourceManager(base.ManagerWithFind):
resource_class = HumanResource
def list(self):
return self._list("/human_resources", "human_resources")
def get(self, human_resource):
return self._get(
"/human_resources/%s" % base.getid(human_resource),
"human_resource")
def update(self, human_resource, name):
body = {
"human_resource": {
"name": name,
},
}
return self._put(
"/human_resources/%s" % base.getid(human_resource),
body,
"human_resource")
class CrudResource(base.Resource):
pass
class CrudResourceManager(base.CrudManager):
"""Manager class for manipulating Identity crud_resources."""
resource_class = CrudResource
collection_key = 'crud_resources'
key = 'crud_resource'
def get(self, crud_resource):
return super(CrudResourceManager, self).get(
crud_resource_id=base.getid(crud_resource))
class ResourceTest(test_base.BaseTestCase):
def test_resource_repr(self):
r = base.Resource(None, dict(foo="bar", baz="spam"))
self.assertEqual(repr(r), "<Resource baz=spam, foo=bar>")
def test_getid(self):
class TmpObject(base.Resource):
id = "4"
self.assertEqual(base.getid(TmpObject(None, {})), "4")
def test_human_id(self):
r = base.Resource(None, {"name": "1"})
self.assertIsNone(r.human_id)
r = HumanResource(None, {"name": "1"})
self.assertEqual(r.human_id, "1")
r = HumanResource(None, {"name": None})
self.assertIsNone(r.human_id)
class BaseManagerTestCase(test_base.BaseTestCase):
def setUp(self):
super(BaseManagerTestCase, self).setUp()
self.response = mock.MagicMock()
self.http_client = mock.MagicMock()
self.http_client.get.return_value = self.response
self.http_client.post.return_value = self.response
self.manager = base.BaseManager(self.http_client)
self.manager.resource_class = HumanResource
def test_list(self):
self.response.json.return_value = {'human_resources': [{'id': 42}]}
expected = [HumanResource(self.manager, {'id': 42}, loaded=True)]
result = self.manager._list("/human_resources", "human_resources")
self.assertEqual(expected, result)
def test_list_no_response_key(self):
self.response.json.return_value = [{'id': 42}]
expected = [HumanResource(self.manager, {'id': 42}, loaded=True)]
result = self.manager._list("/human_resources")
self.assertEqual(expected, result)
def test_list_get(self):
self.manager._list("/human_resources", "human_resources")
self.manager.client.get.assert_called_with("/human_resources")
def test_list_post(self):
self.manager._list("/human_resources", "human_resources",
json={'id': 42})
self.manager.client.post.assert_called_with("/human_resources",
json={'id': 42})
def test_get(self):
self.response.json.return_value = {'human_resources': {'id': 42}}
expected = HumanResource(self.manager, {'id': 42}, loaded=True)
result = self.manager._get("/human_resources/42", "human_resources")
self.manager.client.get.assert_called_with("/human_resources/42")
self.assertEqual(expected, result)
def test_get_no_response_key(self):
self.response.json.return_value = {'id': 42}
expected = HumanResource(self.manager, {'id': 42}, loaded=True)
result = self.manager._get("/human_resources/42")
self.manager.client.get.assert_called_with("/human_resources/42")
self.assertEqual(expected, result)
def test_post(self):
self.response.json.return_value = {'human_resources': {'id': 42}}
expected = HumanResource(self.manager, {'id': 42}, loaded=True)
result = self.manager._post("/human_resources",
response_key="human_resources",
json={'id': 42})
self.manager.client.post.assert_called_with("/human_resources",
json={'id': 42})
self.assertEqual(expected, result)
def test_post_return_raw(self):
self.response.json.return_value = {'human_resources': {'id': 42}}
result = self.manager._post("/human_resources",
response_key="human_resources",
json={'id': 42}, return_raw=True)
self.manager.client.post.assert_called_with("/human_resources",
json={'id': 42})
self.assertEqual(result, {'id': 42})
def test_post_no_response_key(self):
self.response.json.return_value = {'id': 42}
expected = HumanResource(self.manager, {'id': 42}, loaded=True)
result = self.manager._post("/human_resources", json={'id': 42})
self.manager.client.post.assert_called_with("/human_resources",
json={'id': 42})
self.assertEqual(expected, result)

View File

@ -0,0 +1,137 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base as test_base
import six
from ironicclient.common.apiclient import exceptions
class FakeResponse(object):
json_data = {}
def __init__(self, **kwargs):
for key, value in six.iteritems(kwargs):
setattr(self, key, value)
def json(self):
return self.json_data
class ExceptionsArgsTest(test_base.BaseTestCase):
def assert_exception(self, ex_cls, method, url, status_code, json_data,
error_msg=None, error_details=None,
check_description=True):
ex = exceptions.from_response(
FakeResponse(status_code=status_code,
headers={"Content-Type": "application/json"},
json_data=json_data),
method,
url)
self.assertTrue(isinstance(ex, ex_cls))
if check_description:
expected_msg = error_msg or json_data["error"]["message"]
expected_details = error_details or json_data["error"]["details"]
self.assertEqual(ex.message, expected_msg)
self.assertEqual(ex.details, expected_details)
self.assertEqual(ex.method, method)
self.assertEqual(ex.url, url)
self.assertEqual(ex.http_status, status_code)
def test_from_response_known(self):
method = "GET"
url = "/fake"
status_code = 400
json_data = {"error": {"message": "fake message",
"details": "fake details"}}
self.assert_exception(
exceptions.BadRequest, method, url, status_code, json_data)
def test_from_response_unknown(self):
method = "POST"
url = "/fake-unknown"
status_code = 499
json_data = {"error": {"message": "fake unknown message",
"details": "fake unknown details"}}
self.assert_exception(
exceptions.HTTPClientError, method, url, status_code, json_data)
status_code = 600
self.assert_exception(
exceptions.HttpError, method, url, status_code, json_data)
def test_from_response_non_openstack(self):
method = "POST"
url = "/fake-unknown"
status_code = 400
json_data = {"alien": 123}
self.assert_exception(
exceptions.BadRequest, method, url, status_code, json_data,
check_description=False)
def test_from_response_with_different_response_format(self):
method = "GET"
url = "/fake-wsme"
status_code = 400
json_data1 = {"error_message": {"debuginfo": None,
"faultcode": "Client",
"faultstring": "fake message"}}
message = six.text_type(
json_data1["error_message"]["faultstring"])
details = six.text_type(json_data1)
self.assert_exception(
exceptions.BadRequest, method, url, status_code, json_data1,
message, details)
json_data2 = {"badRequest": {"message": "fake message", "code": 400}}
message = six.text_type(json_data2["badRequest"]["message"])
details = six.text_type(json_data2)
self.assert_exception(
exceptions.BadRequest, method, url, status_code, json_data2,
message, details)
def test_from_response_with_text_response_format(self):
method = "GET"
url = "/fake-wsme"
status_code = 400
text_data1 = "error_message: fake message"
ex = exceptions.from_response(
FakeResponse(status_code=status_code,
headers={"Content-Type": "text/html"},
text=text_data1),
method,
url)
self.assertTrue(isinstance(ex, exceptions.BadRequest))
self.assertEqual(ex.details, text_data1)
self.assertEqual(ex.method, method)
self.assertEqual(ex.url, url)
self.assertEqual(ex.http_status, status_code)
def test_from_response_with_text_response_format_with_no_body(self):
method = "GET"
url = "/fake-wsme"
status_code = 401
ex = exceptions.from_response(
FakeResponse(status_code=status_code,
headers={"Content-Type": "text/html"}),
method,
url)
self.assertTrue(isinstance(ex, exceptions.Unauthorized))
self.assertEqual(ex.details, '')
self.assertEqual(ex.method, method)
self.assertEqual(ex.url, url)
self.assertEqual(ex.http_status, status_code)

View File

@ -0,0 +1,679 @@
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import fixtures
import mock
from oslotest import base as test_base
import six
from ironicclient.common import cliutils
class ValidateArgsTest(test_base.BaseTestCase):
def test_lambda_no_args(self):
cliutils.validate_args(lambda: None)
def _test_lambda_with_args(self, *args, **kwargs):
cliutils.validate_args(lambda x, y: None, *args, **kwargs)
def test_lambda_positional_args(self):
self._test_lambda_with_args(1, 2)
def test_lambda_kwargs(self):
self._test_lambda_with_args(x=1, y=2)
def test_lambda_mixed_kwargs(self):
self._test_lambda_with_args(1, y=2)
def test_lambda_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_lambda_with_args)
def test_lambda_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_lambda_with_args, 1)
def test_lambda_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_lambda_with_args, y=2)
def _test_lambda_with_default(self, *args, **kwargs):
cliutils.validate_args(lambda x, y, z=3: None, *args, **kwargs)
def test_lambda_positional_args_with_default(self):
self._test_lambda_with_default(1, 2)
def test_lambda_kwargs_with_default(self):
self._test_lambda_with_default(x=1, y=2)
def test_lambda_mixed_kwargs_with_default(self):
self._test_lambda_with_default(1, y=2)
def test_lambda_positional_args_all_with_default(self):
self._test_lambda_with_default(1, 2, 3)
def test_lambda_kwargs_all_with_default(self):
self._test_lambda_with_default(x=1, y=2, z=3)
def test_lambda_mixed_kwargs_all_with_default(self):
self._test_lambda_with_default(1, y=2, z=3)
def test_lambda_with_default_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_lambda_with_default)
def test_lambda_with_default_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_lambda_with_default, 1)
def test_lambda_with_default_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_lambda_with_default, y=2)
def test_lambda_with_default_missing_args4(self):
self.assertRaises(cliutils.MissingArgs,
self._test_lambda_with_default, y=2, z=3)
def test_function_no_args(self):
def func():
pass
cliutils.validate_args(func)
def _test_function_with_args(self, *args, **kwargs):
def func(x, y):
pass
cliutils.validate_args(func, *args, **kwargs)
def test_function_positional_args(self):
self._test_function_with_args(1, 2)
def test_function_kwargs(self):
self._test_function_with_args(x=1, y=2)
def test_function_mixed_kwargs(self):
self._test_function_with_args(1, y=2)
def test_function_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_function_with_args)
def test_function_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_function_with_args, 1)
def test_function_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_function_with_args, y=2)
def _test_function_with_default(self, *args, **kwargs):
def func(x, y, z=3):
pass
cliutils.validate_args(func, *args, **kwargs)
def test_function_positional_args_with_default(self):
self._test_function_with_default(1, 2)
def test_function_kwargs_with_default(self):
self._test_function_with_default(x=1, y=2)
def test_function_mixed_kwargs_with_default(self):
self._test_function_with_default(1, y=2)
def test_function_positional_args_all_with_default(self):
self._test_function_with_default(1, 2, 3)
def test_function_kwargs_all_with_default(self):
self._test_function_with_default(x=1, y=2, z=3)
def test_function_mixed_kwargs_all_with_default(self):
self._test_function_with_default(1, y=2, z=3)
def test_function_with_default_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_function_with_default)
def test_function_with_default_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_function_with_default, 1)
def test_function_with_default_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_function_with_default, y=2)
def test_function_with_default_missing_args4(self):
self.assertRaises(cliutils.MissingArgs,
self._test_function_with_default, y=2, z=3)
def test_bound_method_no_args(self):
class Foo(object):
def bar(self):
pass
cliutils.validate_args(Foo().bar)
def _test_bound_method_with_args(self, *args, **kwargs):
class Foo(object):
def bar(self, x, y):
pass
cliutils.validate_args(Foo().bar, *args, **kwargs)
def test_bound_method_positional_args(self):
self._test_bound_method_with_args(1, 2)
def test_bound_method_kwargs(self):
self._test_bound_method_with_args(x=1, y=2)
def test_bound_method_mixed_kwargs(self):
self._test_bound_method_with_args(1, y=2)
def test_bound_method_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_bound_method_with_args)
def test_bound_method_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_bound_method_with_args, 1)
def test_bound_method_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_bound_method_with_args, y=2)
def _test_bound_method_with_default(self, *args, **kwargs):
class Foo(object):
def bar(self, x, y, z=3):
pass
cliutils.validate_args(Foo().bar, *args, **kwargs)
def test_bound_method_positional_args_with_default(self):
self._test_bound_method_with_default(1, 2)
def test_bound_method_kwargs_with_default(self):
self._test_bound_method_with_default(x=1, y=2)
def test_bound_method_mixed_kwargs_with_default(self):
self._test_bound_method_with_default(1, y=2)
def test_bound_method_positional_args_all_with_default(self):
self._test_bound_method_with_default(1, 2, 3)
def test_bound_method_kwargs_all_with_default(self):
self._test_bound_method_with_default(x=1, y=2, z=3)
def test_bound_method_mixed_kwargs_all_with_default(self):
self._test_bound_method_with_default(1, y=2, z=3)
def test_bound_method_with_default_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_bound_method_with_default)
def test_bound_method_with_default_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_bound_method_with_default, 1)
def test_bound_method_with_default_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_bound_method_with_default, y=2)
def test_bound_method_with_default_missing_args4(self):
self.assertRaises(cliutils.MissingArgs,
self._test_bound_method_with_default, y=2, z=3)
def test_unbound_method_no_args(self):
class Foo(object):
def bar(self):
pass
cliutils.validate_args(Foo.bar, Foo())
def _test_unbound_method_with_args(self, *args, **kwargs):
class Foo(object):
def bar(self, x, y):
pass
cliutils.validate_args(Foo.bar, Foo(), *args, **kwargs)
def test_unbound_method_positional_args(self):
self._test_unbound_method_with_args(1, 2)
def test_unbound_method_kwargs(self):
self._test_unbound_method_with_args(x=1, y=2)
def test_unbound_method_mixed_kwargs(self):
self._test_unbound_method_with_args(1, y=2)
def test_unbound_method_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_unbound_method_with_args)
def test_unbound_method_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_unbound_method_with_args, 1)
def test_unbound_method_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_unbound_method_with_args, y=2)
def _test_unbound_method_with_default(self, *args, **kwargs):
class Foo(object):
def bar(self, x, y, z=3):
pass
cliutils.validate_args(Foo.bar, Foo(), *args, **kwargs)
def test_unbound_method_positional_args_with_default(self):
self._test_unbound_method_with_default(1, 2)
def test_unbound_method_kwargs_with_default(self):
self._test_unbound_method_with_default(x=1, y=2)
def test_unbound_method_mixed_kwargs_with_default(self):
self._test_unbound_method_with_default(1, y=2)
def test_unbound_method_with_default_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_unbound_method_with_default)
def test_unbound_method_with_default_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_unbound_method_with_default, 1)
def test_unbound_method_with_default_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_unbound_method_with_default, y=2)
def test_unbound_method_with_default_missing_args4(self):
self.assertRaises(cliutils.MissingArgs,
self._test_unbound_method_with_default, y=2, z=3)
def test_class_method_no_args(self):
class Foo(object):
@classmethod
def bar(cls):
pass
cliutils.validate_args(Foo.bar)
def _test_class_method_with_args(self, *args, **kwargs):
class Foo(object):
@classmethod
def bar(cls, x, y):
pass
cliutils.validate_args(Foo.bar, *args, **kwargs)
def test_class_method_positional_args(self):
self._test_class_method_with_args(1, 2)
def test_class_method_kwargs(self):
self._test_class_method_with_args(x=1, y=2)
def test_class_method_mixed_kwargs(self):
self._test_class_method_with_args(1, y=2)
def test_class_method_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_class_method_with_args)
def test_class_method_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_class_method_with_args, 1)
def test_class_method_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_class_method_with_args, y=2)
def _test_class_method_with_default(self, *args, **kwargs):
class Foo(object):
@classmethod
def bar(cls, x, y, z=3):
pass
cliutils.validate_args(Foo.bar, *args, **kwargs)
def test_class_method_positional_args_with_default(self):
self._test_class_method_with_default(1, 2)
def test_class_method_kwargs_with_default(self):
self._test_class_method_with_default(x=1, y=2)
def test_class_method_mixed_kwargs_with_default(self):
self._test_class_method_with_default(1, y=2)
def test_class_method_with_default_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_class_method_with_default)
def test_class_method_with_default_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_class_method_with_default, 1)
def test_class_method_with_default_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_class_method_with_default, y=2)
def test_class_method_with_default_missing_args4(self):
self.assertRaises(cliutils.MissingArgs,
self._test_class_method_with_default, y=2, z=3)
def test_static_method_no_args(self):
class Foo(object):
@staticmethod
def bar():
pass
cliutils.validate_args(Foo.bar)
def _test_static_method_with_args(self, *args, **kwargs):
class Foo(object):
@staticmethod
def bar(x, y):
pass
cliutils.validate_args(Foo.bar, *args, **kwargs)
def test_static_method_positional_args(self):
self._test_static_method_with_args(1, 2)
def test_static_method_kwargs(self):
self._test_static_method_with_args(x=1, y=2)
def test_static_method_mixed_kwargs(self):
self._test_static_method_with_args(1, y=2)
def test_static_method_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_static_method_with_args)
def test_static_method_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_static_method_with_args, 1)
def test_static_method_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_static_method_with_args, y=2)
def _test_static_method_with_default(self, *args, **kwargs):
class Foo(object):
@staticmethod
def bar(x, y, z=3):
pass
cliutils.validate_args(Foo.bar, *args, **kwargs)
def test_static_method_positional_args_with_default(self):
self._test_static_method_with_default(1, 2)
def test_static_method_kwargs_with_default(self):
self._test_static_method_with_default(x=1, y=2)
def test_static_method_mixed_kwargs_with_default(self):
self._test_static_method_with_default(1, y=2)
def test_static_method_with_default_missing_args1(self):
self.assertRaises(cliutils.MissingArgs,
self._test_static_method_with_default)
def test_static_method_with_default_missing_args2(self):
self.assertRaises(cliutils.MissingArgs,
self._test_static_method_with_default, 1)
def test_static_method_with_default_missing_args3(self):
self.assertRaises(cliutils.MissingArgs,
self._test_static_method_with_default, y=2)
def test_static_method_with_default_missing_args4(self):
self.assertRaises(cliutils.MissingArgs,
self._test_static_method_with_default, y=2, z=3)
class _FakeResult(object):
def __init__(self, name, value):
self.name = name
self.value = value
class PrintResultTestCase(test_base.BaseTestCase):
def setUp(self):
super(PrintResultTestCase, self).setUp()
self.mock_add_row = mock.MagicMock()
self.useFixture(fixtures.MonkeyPatch(
"prettytable.PrettyTable.add_row",
self.mock_add_row))
self.mock_get_string = mock.MagicMock(return_value="")
self.useFixture(fixtures.MonkeyPatch(
"prettytable.PrettyTable.get_string",
self.mock_get_string))
self.mock_init = mock.MagicMock(return_value=None)
self.useFixture(fixtures.MonkeyPatch(
"prettytable.PrettyTable.__init__",
self.mock_init))
# NOTE(dtantsur): won't work with mocked __init__
self.useFixture(fixtures.MonkeyPatch(
"prettytable.PrettyTable.align",
mock.MagicMock()))
def test_print_list_sort_by_str(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k3", 2),
_FakeResult("k2", 3)]
cliutils.print_list(objs, ["Name", "Value"], sortby_index=0)
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["k1", 1]),
mock.call(["k3", 2]),
mock.call(["k2", 3])])
self.mock_get_string.assert_called_with(sortby="Name")
self.mock_init.assert_called_once_with(["Name", "Value"])
def test_print_list_sort_by_integer(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k2", 3),
_FakeResult("k3", 2)]
cliutils.print_list(objs, ["Name", "Value"], sortby_index=1)
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["k1", 1]),
mock.call(["k2", 3]),
mock.call(["k3", 2])])
self.mock_get_string.assert_called_with(sortby="Value")
self.mock_init.assert_called_once_with(["Name", "Value"])
def test_print_list_sort_by_none(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k3", 3),
_FakeResult("k2", 2)]
cliutils.print_list(objs, ["Name", "Value"], sortby_index=None)
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["k1", 1]),
mock.call(["k3", 3]),
mock.call(["k2", 2])])
self.mock_get_string.assert_called_with()
self.mock_init.assert_called_once_with(["Name", "Value"])
def test_print_dict(self):
cliutils.print_dict({"K": "k", "Key": "Value"})
cliutils.print_dict({"K": "k", "Key": "Long\\nValue"})
self.mock_add_row.assert_has_calls([
mock.call(["K", "k"]),
mock.call(["Key", "Value"]),
mock.call(["K", "k"]),
mock.call(["Key", "Long"]),
mock.call(["", "Value"])],
any_order=True)
def test_print_list_field_labels(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k3", 3),
_FakeResult("k2", 2)]
field_labels = ["Another Name", "Another Value"]
cliutils.print_list(objs, ["Name", "Value"], sortby_index=None,
field_labels=field_labels)
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["k1", 1]),
mock.call(["k3", 3]),
mock.call(["k2", 2])])
self.mock_init.assert_called_once_with(field_labels)
def test_print_list_field_labels_sort(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k3", 3),
_FakeResult("k2", 2)]
field_labels = ["Another Name", "Another Value"]
cliutils.print_list(objs, ["Name", "Value"], sortby_index=0,
field_labels=field_labels)
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["k1", 1]),
mock.call(["k3", 3]),
mock.call(["k2", 2])])
self.mock_init.assert_called_once_with(field_labels)
self.mock_get_string.assert_called_with(sortby="Another Name")
def test_print_list_field_labels_too_many(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k3", 3),
_FakeResult("k2", 2)]
field_labels = ["Another Name", "Another Value", "Redundant"]
self.assertRaises(ValueError, cliutils.print_list,
objs, ["Name", "Value"], sortby_index=None,
field_labels=field_labels)
class PrintResultStringTestCase(test_base.BaseTestCase):
def test_print_list_string(self):
objs = [_FakeResult("k1", 1)]
field_labels = ["Another Name", "Another Value"]
orig = sys.stdout
sys.stdout = six.StringIO()
cliutils.print_list(objs, ["Name", "Value"], sortby_index=0,
field_labels=field_labels)
out = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
expected = '''\
+--------------+---------------+
| Another Name | Another Value |
+--------------+---------------+
| k1 | 1 |
+--------------+---------------+
'''
self.assertEqual(expected, out)
def test_print_dict_string(self):
orig = sys.stdout
sys.stdout = six.StringIO()
cliutils.print_dict({"Key": "Value"})
out = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
expected = '''\
+----------+-------+
| Property | Value |
+----------+-------+
| Key | Value |
+----------+-------+
'''
self.assertEqual(expected, out)
def test_print_dict_string_custom_headers(self):
orig = sys.stdout
sys.stdout = six.StringIO()
cliutils.print_dict({"Key": "Value"}, dict_property='Foo')
out = sys.stdout.getvalue()
sys.stdout.close()
sys.stdout = orig
expected = '''\
+-----+-------+
| Foo | Value |
+-----+-------+
| Key | Value |
+-----+-------+
'''
self.assertEqual(expected, out)
class DecoratorsTestCase(test_base.BaseTestCase):
def test_arg(self):
func_args = [("--image", ), ("--flavor", )]
func_kwargs = [dict(default=None,
metavar="<image>"),
dict(default=None,
metavar="<flavor>")]
@cliutils.arg(*func_args[1], **func_kwargs[1])
@cliutils.arg(*func_args[0], **func_kwargs[0])
def dummy_func():
pass
self.assertTrue(hasattr(dummy_func, "arguments"))
self.assertEqual(len(dummy_func.arguments), 2)
for args_kwargs in zip(func_args, func_kwargs):
self.assertIn(args_kwargs, dummy_func.arguments)
def test_unauthenticated(self):
def dummy_func():
pass
self.assertFalse(cliutils.isunauthenticated(dummy_func))
dummy_func = cliutils.unauthenticated(dummy_func)
self.assertTrue(cliutils.isunauthenticated(dummy_func))
class EnvTestCase(test_base.BaseTestCase):
def test_env(self):
env = {"alpha": "a", "beta": "b"}
self.useFixture(fixtures.MonkeyPatch("os.environ", env))
self.assertEqual(cliutils.env("beta"), env["beta"])
self.assertEqual(cliutils.env("beta", "alpha"), env["beta"])
self.assertEqual(cliutils.env("alpha", "beta"), env["alpha"])
self.assertEqual(cliutils.env("gamma", "beta"), env["beta"])
self.assertEqual(cliutils.env("gamma"), "")
self.assertEqual(cliutils.env("gamma", default="c"), "c")
class GetPasswordTestCase(test_base.BaseTestCase):
def setUp(self):
super(GetPasswordTestCase, self).setUp()
class FakeFile(object):
def isatty(self):
return True
self.useFixture(fixtures.MonkeyPatch("sys.stdin", FakeFile()))
def test_get_password(self):
self.useFixture(fixtures.MonkeyPatch("getpass.getpass",
lambda prompt: "mellon"))
self.assertEqual(cliutils.get_password(), "mellon")
def test_get_password_verify(self):
env = {"OS_VERIFY_PASSWORD": "True"}
self.useFixture(fixtures.MonkeyPatch("os.environ", env))
self.useFixture(fixtures.MonkeyPatch("getpass.getpass",
lambda prompt: "mellon"))
self.assertEqual(cliutils.get_password(), "mellon")
def test_get_password_verify_failure(self):
env = {"OS_VERIFY_PASSWORD": "True"}
self.useFixture(fixtures.MonkeyPatch("os.environ", env))
self.useFixture(fixtures.MonkeyPatch("getpass.getpass",
lambda prompt: prompt))
self.assertIsNone(cliutils.get_password())

View File

@ -14,9 +14,9 @@
import mock
from ironicclient.common.apiclient import exceptions
from ironicclient.common import cliutils
from ironicclient.common import utils as commonutils
from ironicclient.openstack.common.apiclient import exceptions
from ironicclient.openstack.common import cliutils
from ironicclient.tests.unit import utils
import ironicclient.v1.chassis_shell as c_shell

View File

@ -14,7 +14,7 @@
import mock
from ironicclient.openstack.common import cliutils
from ironicclient.common import cliutils
from ironicclient.tests.unit import utils
import ironicclient.v1.driver_shell as d_shell

View File

@ -14,9 +14,9 @@
import mock
from ironicclient.common.apiclient import exceptions
from ironicclient.common import cliutils
from ironicclient.common import utils as commonutils
from ironicclient.openstack.common.apiclient import exceptions
from ironicclient.openstack.common import cliutils
from ironicclient.tests.unit import utils
import ironicclient.v1.node_shell as n_shell

View File

@ -14,9 +14,9 @@
import mock
from ironicclient.common.apiclient import exceptions
from ironicclient.common import cliutils
from ironicclient.common import utils as commonutils
from ironicclient.openstack.common.apiclient import exceptions
from ironicclient.openstack.common import cliutils
from ironicclient.tests.unit import utils
import ironicclient.v1.port_shell as p_shell

View File

@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from ironicclient.common import cliutils
from ironicclient.common import utils
from ironicclient.openstack.common import cliutils
from ironicclient.v1 import resource_fields as res_fields

View File

@ -15,8 +15,8 @@
import argparse
from ironicclient.common import cliutils
from ironicclient.common import utils
from ironicclient.openstack.common import cliutils
def _print_driver_show(driver):

View File

@ -15,10 +15,10 @@
import argparse
from ironicclient.common.apiclient import exceptions
from ironicclient.common import cliutils
from ironicclient.common.i18n import _
from ironicclient.common import utils
from ironicclient.openstack.common.apiclient import exceptions
from ironicclient.openstack.common import cliutils
from ironicclient.v1 import resource_fields as res_fields

View File

@ -13,8 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from ironicclient.common import cliutils
from ironicclient.common import utils
from ironicclient.openstack.common import cliutils
from ironicclient.v1 import resource_fields as res_fields

View File

@ -10,6 +10,7 @@ httpretty<0.8.7,>=0.8.4
mock>=1.2
Babel>=1.3
oslosphinx>=2.5.0 # Apache-2.0
oslotest>=1.10.0 # Apache-2.0
python-subunit>=0.0.18
sphinx!=1.2.0,!=1.3b1,<1.3,>=1.1.2
testrepository>=0.0.18