Add apiclient library

This library can be used in novaclient, keystoneclient,
glanceclient, and other client projects. The library
contains common code and uses python-requests for
HTTP communication.

Features:
* reissue authentication request for expired tokens;
* pluggable authentication;
* rich exceptions hierarchy;
* utils for building CLI tools.

Partially implements: blueprint common-client-library

Change-Id: I82df865d52f32ce47dde6a09a84d3d917fd77918
This commit is contained in:
Alessio Ababilov
2013-05-15 19:03:01 +03:00
committed by Alessio Ababilov
parent 5b385d116a
commit 4b9b013964
34 changed files with 4692 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,168 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Spanish National Research Council.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# E0202: An attribute inherited from %s hide this method
# pylint: disable=E0202
import abc
import argparse
import logging
import os
from stevedore import extension
from marconiclient.common.apiclient import exceptions
logger = logging.getLogger(__name__)
_discovered_plugins = {}
def discover_auth_systems():
"""Discover the available auth-systems.
This won't take into account the old style auth-systems.
"""
global _discovered_plugins
_discovered_plugins = {}
def add_plugin(ext):
_discovered_plugins[ext.name] = ext.plugin
ep_namespace = "marconiclient.common.apiclient.auth"
mgr = extension.ExtensionManager(ep_namespace)
mgr.map(add_plugin)
def load_auth_system_opts(parser):
"""Load options needed by the available auth-systems into a parser.
This function will try to populate the parser with options from the
available plugins.
"""
group = parser.add_argument_group("Common auth options")
BaseAuthPlugin.add_common_opts(group)
for name, auth_plugin in _discovered_plugins.iteritems():
group = parser.add_argument_group(
"Auth-system '%s' options" % name,
conflict_handler="resolve")
auth_plugin.add_opts(group)
def load_plugin(auth_system):
try:
plugin_class = _discovered_plugins[auth_system]
except KeyError:
raise exceptions.AuthSystemNotFound(auth_system)
return plugin_class(auth_system=auth_system)
class BaseAuthPlugin(object):
"""Base class for authentication plugins.
An authentication plugin needs to override at least the authenticate
method to be a valid plugin.
"""
__metaclass__ = abc.ABCMeta
auth_system = None
opt_names = []
common_opt_names = [
"auth_system",
"username",
"password",
"tenant_name",
"token",
"auth_url",
]
def __init__(self, auth_system=None, **kwargs):
self.auth_system = auth_system or self.auth_system
self.opts = dict((name, kwargs.get(name))
for name in self.opt_names)
@staticmethod
def _parser_add_opt(parser, opt):
"""Add an option to parser in two variants.
:param opt: option name (with underscores)
"""
dashed_opt = opt.replace("_", "-")
env_var = "OS_%s" % opt.upper()
arg_default = os.environ.get(env_var, "")
arg_help = "Defaults to env[%s]." % env_var
parser.add_argument(
"--os-%s" % dashed_opt,
metavar="<%s>" % dashed_opt,
default=arg_default,
help=arg_help)
parser.add_argument(
"--os_%s" % opt,
metavar="<%s>" % dashed_opt,
help=argparse.SUPPRESS)
@classmethod
def add_opts(cls, parser):
"""Populate the parser with the options for this plugin.
"""
for opt in cls.opt_names:
# use `BaseAuthPlugin.common_opt_names` since it is never
# changed in child classes
if opt not in BaseAuthPlugin.common_opt_names:
cls._parser_add_opt(parser, opt)
@classmethod
def add_common_opts(cls, parser):
"""Add options that are common for several plugins.
"""
for opt in cls.common_opt_names:
cls._parser_add_opt(parser, opt)
@staticmethod
def get_opt(opt_name, args):
"""Return option name and value.
:param opt_name: name of the option, e.g., "username"
:param args: parsed arguments
"""
return (opt_name, getattr(args, "os_%s" % opt_name, None))
def parse_opts(self, args):
"""Parse the actual auth-system options if any.
This method is expected to populate the attribute `self.opts` with a
dict containing the options and values needed to make authentication.
"""
return dict(self.get_opt(opt_name, args)
for opt_name in self.opt_names)
@abc.abstractmethod
def authenticate(self, http_client):
"""Authenticate using plugin defined method.
This method sets `auth_response` or `token` and `endpoint`
for `http_client`. The method usually analyses `self.opts`
and performs a request to authentication server.
:param http_client: client object that needs authentication
:type http_client: HttpClient
:raises: AuthorizationFailure
"""

View File

@@ -0,0 +1,41 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Spanish National Research Council.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from marconiclient.common.apiclient.auth import base
from marconiclient.common.apiclient import exceptions
logger = logging.getLogger(__name__)
class EndpointTokenAuthPlugin(base.BaseAuthPlugin):
auth_system = "endpoint-token"
opt_names = [
"token",
"endpoint",
]
def authenticate(self, http_client):
# we can work without an endpoint (`BaseClient.endpoint` can be used),
# but a token is required
if not self.opts.get("token"):
raise exceptions.AuthPluginOptionsMissing(["token"])
http_client.token = self.opts["token"]
http_client.endpoint = self.opts["endpoint"]

View File

@@ -0,0 +1,70 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Spanish National Research Council.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from marconiclient.common.apiclient.auth import base
from marconiclient.common.apiclient import exceptions
logger = logging.getLogger(__name__)
class KeystoneV2AuthPlugin(base.BaseAuthPlugin):
auth_system = "keystone"
opt_names = [
"username",
"password",
"tenant_id",
"tenant_name",
"token",
"auth_url",
]
def authenticate(self, http_client):
if not self.opts.get("auth_url"):
raise exceptions.AuthPluginOptionsMissing(["auth_url"])
if self.opts.get("token"):
params = {"auth": {"token": {"id": self.opts.get("token")}}}
elif self.opts.get("username") and self.opts.get("password"):
params = {
"auth": {
"passwordCredentials": {
"username": self.opts.get("username"),
"password": self.opts.get("password"),
}
}
}
else:
raise exceptions.AuthPluginOptionsMissing(
[opt
for opt in "username", "password", "token"
if not self.opts.get(opt)])
if self.opts.get("tenant_id"):
params["auth"]["tenantId"] = self.opts.get("tenant_id")
elif self.opts.get("tenant_name"):
params["auth"]["tenantName"] = self.opts.get("tenant_name")
try:
body = http_client.request(
"POST",
http_client.concat_url(self.opts.get("auth_url"), "/tokens"),
allow_redirects=True,
json=params).json()
except ValueError as ex:
raise exceptions.AuthorizationFailure(ex)
http_client.auth_response = body

View File

@@ -0,0 +1,59 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# Copyright 2013 Spanish National Research Council.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# E0202: An attribute inherited from %s hide this method
# pylint: disable=E0202
import logging
from marconiclient.common.apiclient.auth import base
from marconiclient.common.apiclient import exceptions
logger = logging.getLogger(__name__)
class NovaLegacyAuthPlugin(base.BaseAuthPlugin):
auth_system = "nova"
opt_names = [
"username",
"password",
"project_id",
"auth_url",
]
def authenticate(self, http_client):
headers = {"X-Auth-User": self.opts["username"],
"X-Auth-Key": self.opts["password"]}
if self.opts.get("project_id"):
headers["X-Auth-Project-Id"] = self.opts.get("project_id")
resp = http_client.request(
"GET", self.opts["auth_url"],
headers=headers, allow_redirects=True)
try:
endpoint = resp.headers["X-Server-Management-Url"].rstrip("/")
token = resp.headers["X-Auth-Token"]
except (KeyError, TypeError):
raise exceptions.AuthorizationFailure()
http_client.token = token
# set endpoint for compute if it exists
try:
http_client.compute.endpoint = endpoint
except AttributeError:
pass

View File

@@ -0,0 +1,190 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Nebula, Inc.
# Copyright 2013 Alessio Ababilov
# Copyright 2013 Grid Dynamics
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from marconiclient.common.apiclient import exceptions
from marconiclient.openstack.common import timeutils
_logger = logging.getLogger(__name__)
class AuthResponse(dict):
"""An object for encapsulating a raw authentication response from keystone.
The class provides methods for extracting useful values from that token.
"""
@property
def expires(self):
"""Returns the token expiration (as datetime object)
:returns: datetime
"""
try:
return timeutils.parse_isotime(self['access']['token']['expires'])
except KeyError:
return None
@property
def token(self):
"""Returns the token_id associated with the auth request.
:returns: str
"""
try:
return self['access']['token']['id']
except KeyError:
return None
@property
def username(self):
"""Returns the username associated with the authentication request.
Follows the pattern defined in the V2 API of first looking for 'name',
returning that if available, and falling back to 'username' if name
is unavailable.
:returns: str
"""
try:
return self['access']['user']['name']
except KeyError:
pass
try:
return self['access']['user']['username']
except KeyError:
return None
@property
def user_id(self):
"""Returns the user id associated with the authentication request.
:returns: str
"""
try:
return self['access']['user']['id']
except KeyError:
return None
@property
def tenant_name(self):
"""Returns the tenant name associated with the authentication request.
:returns: str
"""
try:
return self['access']['token']['tenant']['name']
except KeyError:
return None
@property
def project_name(self):
"""Synonym for tenant_name."""
return self.tenant_name
@property
def tenant_id(self):
"""Returns the tenant id associated with the authentication request.
:returns: str
"""
try:
return self['access']['token']['tenant']['id']
except KeyError:
return None
@property
def project_id(self):
"""Synonym for tenant_id."""
return self.tenant_id
@property
def scoped(self):
"""Checks if the authorization token is scoped to a tenant.
Additionally verifies that there is a populated service catalog.
:returns: bool
"""
try:
if (self['access']['serviceCatalog'] and
self['access']['token']['tenant']):
return True
except KeyError:
pass
return False
def filter_endpoints(self, endpoint_type=None,
service_type=None, service_name=None,
filter_attrs=None):
"""Returns a list of endpoints which match provided criteria.
"""
filter_attrs = filter_attrs or {}
matching_endpoints = []
def add_if_appropriate(endpoint):
# Ignore 1.0 compute endpoints
if (endpoint.get("serviceType") == 'compute' and
endpoint.get('versionId', '2') not in ('1.1', '2')):
return
if endpoint_type and endpoint_type not in endpoint.keys():
return
for k, v in filter_attrs.iteritems():
if endpoint.get(k).lower() != v.lower():
return
matching_endpoints.append(endpoint)
if 'endpoints' in self:
# We have a bastardized service catalog. Treat it special. :/
for endpoint in self['endpoints']:
add_if_appropriate(endpoint)
elif 'access' in self and 'serviceCatalog' in self['access']:
# Full catalog ...
for service in self['access']['serviceCatalog']:
if service_type and service.get("type") != service_type:
continue
if service_name and service.get('name') != service_name:
continue
for endpoint in service['endpoints']:
endpoint["serviceName"] = service.get("name")
endpoint["serviceType"] = service.get("type")
add_if_appropriate(endpoint)
return matching_endpoints
def url_for(self, endpoint_type,
service_type, service_name=None, filter_attrs=None):
"""Returns a unique endpoint which match provided criteria.
"""
filter_attrs = filter_attrs or {}
matching_endpoints = self.filter_endpoints(
endpoint_type, service_type, service_name, filter_attrs)
if not matching_endpoints:
raise exceptions.EndpointNotFound(
"Cannot find requested %s endpoint" % service_type)
elif len(matching_endpoints) > 1:
raise exceptions.AmbiguousEndpoints(
endpoints=matching_endpoints)
else:
return matching_endpoints[0][endpoint_type]

View File

@@ -0,0 +1,469 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack LLC
# Copyright 2012 Grid Dynamics
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Base utilities to build API operation managers and objects on top of.
"""
# E1102: %s is not callable
# pylint: disable=E1102
import abc
import urllib
from marconiclient.common.apiclient import exceptions
from marconiclient.openstack.common import strutils
def getid(obj):
"""Return id if argument is a Resource.
Abstracts the common pattern of allowing both an object or an object's ID
(UUID) as a parameter when dealing with relationships.
"""
try:
if obj.uuid:
return obj.uuid
except AttributeError:
pass
try:
return obj.id
except AttributeError:
return obj
# TODO(aababilov): call run_hooks() in HookableMixin's child classes
class HookableMixin(object):
"""Mixin so classes can register and run hooks."""
_hooks_map = {}
@classmethod
def add_hook(cls, hook_type, hook_func):
"""Add a new hook of specified type.
:param cls: class that registers hooks
:param hook_type: hook type, e.g., '__pre_parse_args__'
:param hook_func: hook function
"""
if hook_type not in cls._hooks_map:
cls._hooks_map[hook_type] = []
cls._hooks_map[hook_type].append(hook_func)
@classmethod
def run_hooks(cls, hook_type, *args, **kwargs):
"""Run all hooks of specified type.
:param cls: class that registers hooks
:param hook_type: hook type, e.g., '__pre_parse_args__'
:param **args: args to be passed to every hook function
:param **kwargs: kwargs to be passed to every hook function
"""
hook_funcs = cls._hooks_map.get(hook_type) or []
for hook_func in hook_funcs:
hook_func(*args, **kwargs)
class BaseManager(HookableMixin):
"""Basic manager type providing common operations.
Managers interact with a particular type of API (servers, flavors, images,
etc.) and provide CRUD operations for them.
"""
resource_class = None
def __init__(self, client):
"""Initializes BaseManager with `client`.
:param client: instance of BaseClient descendant for HTTP requests
"""
super(BaseManager, self).__init__()
self.client = client
def _list(self, url, response_key, obj_class=None, json=None):
"""List the collection.
:param url: a partial URL, e.g., '/servers'
:param response_key: the key to be looked up in response dictionary,
e.g., 'servers'
:param obj_class: class for constructing the returned objects
(self.resource_class will be used by default)
:param json: data that will be encoded as JSON and passed in POST
request (GET will be sent by default)
"""
if json:
body = self.client.post(url, json=json).json()
else:
body = self.client.get(url).json()
if obj_class is None:
obj_class = self.resource_class
data = body[response_key]
# NOTE(ja): keystone returns values as list as {'values': [ ... ]}
# unlike other services which just return the list...
try:
data = data['values']
except (KeyError, TypeError):
pass
return [obj_class(self, res, loaded=True) for res in data if res]
def _get(self, url, response_key):
"""Get an object from collection.
:param url: a partial URL, e.g., '/servers'
:param response_key: the key to be looked up in response dictionary,
e.g., 'server'
"""
body = self.client.get(url).json()
return self.resource_class(self, body[response_key], loaded=True)
def _head(self, url):
"""Retrieve request headers for an object.
:param url: a partial URL, e.g., '/servers'
"""
resp = self.client.head(url)
return resp.status_code == 204
def _post(self, url, json, response_key, return_raw=False):
"""Create an object.
:param url: a partial URL, e.g., '/servers'
:param json: data that will be encoded as JSON and passed in POST
request (GET will be sent by default)
:param response_key: the key to be looked up in response dictionary,
e.g., 'servers'
:param return_raw: flag to force returning raw JSON instead of
Python object of self.resource_class
"""
body = self.client.post(url, json=json).json()
if return_raw:
return body[response_key]
return self.resource_class(self, body[response_key])
def _put(self, url, json=None, response_key=None):
"""Update an object with PUT method.
:param url: a partial URL, e.g., '/servers'
:param json: data that will be encoded as JSON and passed in POST
request (GET will be sent by default)
:param response_key: the key to be looked up in response dictionary,
e.g., 'servers'
"""
resp = self.client.put(url, json=json)
# PUT requests may not return a body
if resp.content:
body = resp.json()
if response_key is not None:
return self.resource_class(self, body[response_key])
else:
return self.resource_class(self, body)
def _patch(self, url, json=None, response_key=None):
"""Update an object with PATCH method.
:param url: a partial URL, e.g., '/servers'
:param json: data that will be encoded as JSON and passed in POST
request (GET will be sent by default)
:param response_key: the key to be looked up in response dictionary,
e.g., 'servers'
"""
body = self.client.patch(url, json=json).json()
if response_key is not None:
return self.resource_class(self, body[response_key])
else:
return self.resource_class(self, body)
def _delete(self, url):
"""Delete an object.
:param url: a partial URL, e.g., '/servers/my-server'
"""
return self.client.delete(url)
class ManagerWithFind(BaseManager):
"""Manager with additional `find()`/`findall()` methods."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def list(self):
pass
def find(self, **kwargs):
"""Find a single item with attributes matching ``**kwargs``.
This isn't very efficient: it loads the entire list then filters on
the Python side.
"""
matches = self.findall(**kwargs)
num_matches = len(matches)
if num_matches == 0:
msg = "No %s matching %s." % (self.resource_class.__name__, kwargs)
raise exceptions.NotFound(msg)
elif num_matches > 1:
raise exceptions.NoUniqueMatch()
else:
return matches[0]
def findall(self, **kwargs):
"""Find all items with attributes matching ``**kwargs``.
This isn't very efficient: it loads the entire list then filters on
the Python side.
"""
found = []
searches = kwargs.items()
for obj in self.list():
try:
if all(getattr(obj, attr) == value
for (attr, value) in searches):
found.append(obj)
except AttributeError:
continue
return found
class CrudManager(BaseManager):
"""Base manager class for manipulating entities.
Children of this class are expected to define a `collection_key` and `key`.
- `collection_key`: Usually a plural noun by convention (e.g. `entities`);
used to refer collections in both URL's (e.g. `/v3/entities`) and JSON
objects containing a list of member resources (e.g. `{'entities': [{},
{}, {}]}`).
- `key`: Usually a singular noun by convention (e.g. `entity`); used to
refer to an individual member of the collection.
"""
collection_key = None
key = None
def build_url(self, base_url=None, **kwargs):
"""Builds a resource URL for the given kwargs.
Given an example collection where `collection_key = 'entities'` and
`key = 'entity'`, the following URL's could be generated.
By default, the URL will represent a collection of entities, e.g.::
/entities
If kwargs contains an `entity_id`, then the URL will represent a
specific member, e.g.::
/entities/{entity_id}
:param base_url: if provided, the generated URL will be appended to it
"""
url = base_url if base_url is not None else ''
url += '/%s' % self.collection_key
# do we have a specific entity?
entity_id = kwargs.get('%s_id' % self.key)
if entity_id is not None:
url += '/%s' % entity_id
return url
def _filter_kwargs(self, kwargs):
"""Drop null values and handle ids."""
for key, ref in kwargs.copy().iteritems():
if ref is None:
kwargs.pop(key)
else:
if isinstance(ref, Resource):
kwargs.pop(key)
kwargs['%s_id' % key] = getid(ref)
return kwargs
def create(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
return self._post(
self.build_url(**kwargs),
{self.key: kwargs},
self.key)
def get(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
return self._get(
self.build_url(**kwargs),
self.key)
def head(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
return self._head(self.build_url(**kwargs))
def list(self, base_url=None, **kwargs):
"""List the collection.
:param base_url: if provided, the generated URL will be appended to it
"""
kwargs = self._filter_kwargs(kwargs)
return self._list(
'%(base_url)s%(query)s' % {
'base_url': self.build_url(base_url=base_url, **kwargs),
'query': '?%s' % urllib.urlencode(kwargs) if kwargs else '',
},
self.collection_key)
def put(self, base_url=None, **kwargs):
"""Update an element.
:param base_url: if provided, the generated URL will be appended to it
"""
kwargs = self._filter_kwargs(kwargs)
return self._put(self.build_url(base_url=base_url, **kwargs))
def update(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
params = kwargs.copy()
params.pop('%s_id' % self.key)
return self._patch(
self.build_url(**kwargs),
{self.key: params},
self.key)
def delete(self, **kwargs):
kwargs = self._filter_kwargs(kwargs)
return self._delete(
self.build_url(**kwargs))
class Extension(HookableMixin):
"""Extension descriptor."""
SUPPORTED_HOOKS = ('__pre_parse_args__', '__post_parse_args__')
manager_class = None
def __init__(self, name, module):
super(Extension, self).__init__()
self.name = name
self.module = module
self._parse_extension_module()
def _parse_extension_module(self):
self.manager_class = None
for attr_name, attr_value in self.module.__dict__.items():
if attr_name in self.SUPPORTED_HOOKS:
self.add_hook(attr_name, attr_value)
else:
try:
if issubclass(attr_value, BaseManager):
self.manager_class = attr_value
except TypeError:
pass
def __repr__(self):
return "<Extension '%s'>" % self.name
class Resource(object):
"""Base class for OpenStack resources (tenant, user, etc.).
This is pretty much just a bag for attributes.
"""
HUMAN_ID = False
NAME_ATTR = 'name'
def __init__(self, manager, info, loaded=False):
"""Populate and bind to a manager.
:param manager: BaseManager object
:param info: dictionary representing resource attributes
:param loaded: prevent lazy-loading if set to True
"""
self.manager = manager
self._info = info
self._add_details(info)
self._loaded = loaded
def __repr__(self):
reprkeys = sorted(k
for k in self.__dict__.keys()
if k[0] != '_' and k != 'manager')
info = ", ".join("%s=%s" % (k, getattr(self, k)) for k in reprkeys)
return "<%s %s>" % (self.__class__.__name__, info)
@property
def human_id(self):
"""Human-readable ID which can be used for bash completion.
"""
if self.NAME_ATTR in self.__dict__ and self.HUMAN_ID:
return strutils.to_slug(getattr(self, self.NAME_ATTR))
return None
def _add_details(self, info):
for (k, v) in info.iteritems():
try:
setattr(self, k, v)
self._info[k] = v
except AttributeError:
# In this case we already defined the attribute on the class
pass
def __getattr__(self, k):
if k not in self.__dict__:
#NOTE(bcwaldon): disallow lazy-loading if already loaded once
if not self.is_loaded():
self.get()
return self.__getattr__(k)
raise AttributeError(k)
else:
return self.__dict__[k]
def get(self):
# set_loaded() first ... so if we have to bail, we know we tried.
self.set_loaded(True)
if not hasattr(self.manager, 'get'):
return
new = self.manager.get(self.id)
if new:
self._add_details(new._info)
def __eq__(self, other):
if not isinstance(other, Resource):
return NotImplemented
# two resources of different types are not equal
if not isinstance(other, self.__class__):
return False
if hasattr(self, 'id') and hasattr(other, 'id'):
return self.id == other.id
return self._info == other._info
def is_loaded(self):
return self._loaded
def set_loaded(self, val):
self._loaded = val

View File

@@ -0,0 +1,391 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 OpenStack LLC
# Copyright 2011 Piston Cloud Computing, Inc.
# Copyright 2013 Alessio Ababilov
# Copyright 2013 Grid Dynamics
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
OpenStack Client interface. Handles the REST calls and responses.
"""
# E0202: An attribute inherited from %s hide this method
# pylint: disable=E0202
import logging
import time
try:
import simplejson as json
except ImportError:
import json
import requests
from marconiclient.common.apiclient.auth import response
from marconiclient.common.apiclient import exceptions
from marconiclient.openstack.common import importutils
_logger = logging.getLogger(__name__)
class HttpClient(object):
"""This client handles sending HTTP requests to OpenStack servers.
Features:
- share authentication information between several clients to different
services (e.g., for compute and image clients);
- reissue authentication request for expired tokens;
- encode/decode JSON bodies;
- raise exeptions on HTTP errors;
- pluggable authentication;
- store authentication information in a keyring;
- store time spent for requests;
- register clients for particular services, so one can use
`http_client.identity` or `http_client.compute`;
- log requests and responses in a format that is easy to copy-and-paste
into terminal and send the same request with curl.
"""
user_agent = "marconiclient.common.apiclient"
_auth_response = None
def __init__(self,
auth_plugin,
auth_response=None,
region_name=None,
endpoint_type="publicURL",
original_ip=None,
verify=True,
cert=None,
timeout=None,
timings=False,
keyring_saver=None,
http_log_debug=False,
user_agent=None,
http=None):
self.auth_plugin = auth_plugin
self.auth_response = auth_response
self.endpoint_type = endpoint_type
self.region_name = region_name
self.original_ip = original_ip
self.timeout = timeout
self.verify = verify
self.cert = cert
self.keyring_saver = keyring_saver
self.http_log_debug = http_log_debug
self.user_agent = user_agent or self.user_agent
self.times = [] # [("item", starttime, endtime), ...]
self.timings = timings
# requests within the same session can reuse TCP connections from pool
self.http = http or requests.Session()
self.token = None
self.endpoint = None
@property
def auth_response(self):
return self._auth_response
@auth_response.setter
def auth_response(self, value):
self._auth_response = response.AuthResponse(value or {})
def http_log_req(self, method, url, kwargs):
if not self.http_log_debug:
return
string_parts = [
"curl -i",
"-X '%s'" % method,
"'%s'" % url,
]
for element in kwargs['headers']:
header = "-H '%s: %s'" % (element, kwargs['headers'][element])
string_parts.append(header)
_logger.debug("REQ: %s" % " ".join(string_parts))
if 'data' in kwargs:
_logger.debug("REQ BODY: %s\n" % (kwargs['data']))
def http_log_resp(self, resp):
if not self.http_log_debug:
return
_logger.debug(
"RESP: [%s] %s\n",
resp.status_code,
resp.headers)
if resp._content_consumed:
_logger.debug(
"RESP BODY: %s\n",
resp.text)
def serialize(self, kwargs):
if kwargs.get('json') is not None:
kwargs['headers']['Content-Type'] = 'application/json'
kwargs['data'] = json.dumps(kwargs['json'])
try:
del kwargs['json']
except KeyError:
pass
def get_timings(self):
return self.times
def reset_timings(self):
self.times = []
def request(self, method, url, **kwargs):
"""Send an http request with the specified characteristics.
Wrapper around `requests.Session.request` to handle tasks such as
setting headers, JSON encoding/decoding, and error handling.
:param method: method of HTTP request
:param url: URL of HTTP request
:param kwargs: any other parameter that can be passed to
' requests.Session.request (such as `headers`) or `json`
that will be encoded as JSON and used as `data` argument
"""
kwargs.setdefault("headers", kwargs.get("headers", {}))
kwargs["headers"]["User-Agent"] = self.user_agent
if self.original_ip:
kwargs["headers"]["Forwarded"] = "for=%s;by=%s" % (
self.original_ip, self.user_agent)
if self.timeout is not None:
kwargs.setdefault("timeout", self.timeout)
kwargs.setdefault("verify", self.verify)
if self.cert is not None:
kwargs.setdefault("cert", self.cert)
self.serialize(kwargs)
self.http_log_req(method, url, kwargs)
if self.timings:
start_time = time.time()
resp = self.http.request(method, url, **kwargs)
if self.timings:
self.times.append(("%s %s" % (method, url),
start_time, time.time()))
self.http_log_resp(resp)
if resp.status_code >= 400:
_logger.debug(
"Request returned failure status: %s",
resp.status_code)
raise exceptions.from_response(resp, method, url)
return resp
@staticmethod
def concat_url(endpoint, url):
"""Concatenate endpoint and final URL.
E.g., "http://keystone/v2.0/" and "/tokens" are concatenated to
"http://keystone/v2.0/tokens".
:param endpoint: the base URL
:param url: the final URL
"""
return "%s/%s" % (endpoint.rstrip("/"), url.strip("/"))
def client_request(self, client, method, url, **kwargs):
"""Send an http request using `client`'s endpoint and specified `url`.
If request was rejected as unauthorized (possibly because the token is
expired), issue one authorization attempt and send the request once
again.
:param client: instance of BaseClient descendant
:param method: method of HTTP request
:param url: URL of HTTP request
:param kwargs: any other parameter that can be passed to
' `HttpClient.request`
"""
# To send a request, we need a token and an endpoint.
# There are several ways to retrieve them.
# token:
# - self.token
# - self.auth_response.token
# endpoint:
# - client.endpoint
# - client.cache_endpoint
# - self.endpoint
# - self.auth_response.url_for()
# All these fields can be set by auth_plugin during
# authentication.
url_for_args = {
"endpoint_type": client.endpoint_type or self.endpoint_type,
"service_type": client.service_type,
"filter_attrs": (
{"region": self.region_name}
if self.region_name
else {}
)
}
def get_token_and_endpoint(silent):
token = self.token or self.auth_response.token
endpoint = (client.endpoint or client.cached_endpoint or
self.endpoint)
if not endpoint:
try:
endpoint = self.auth_response.url_for(**url_for_args)
except exceptions.EndpointException:
if not silent:
raise
return (token, endpoint)
token, endpoint = get_token_and_endpoint(silent=True)
just_authenticated = False
if not (endpoint and token):
self.authenticate()
just_authenticated = True
token, endpoint = get_token_and_endpoint(silent=False)
if not (endpoint and token):
raise exceptions.AuthorizationFailure(
"Cannot find endpoint or token for request")
old_token_endpoint = (token, endpoint)
kwargs.setdefault("headers", {})["X-Auth-Token"] = token
client.cached_endpoint = endpoint
# Perform the request once. If we get Unauthorized, then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
return self.request(
method, self.concat_url(endpoint, url), **kwargs)
except exceptions.Unauthorized:
if just_authenticated:
raise
client.cached_endpoint = None
self.authenticate()
token, endpoint = get_token_and_endpoint(silent=True)
if (not (endpoint and token) or
old_token_endpoint == (endpoint, token)):
raise
client.cached_endpoint = endpoint
kwargs["headers"]["X-Auth-Token"] = token
return self.request(
method, self.concat_url(endpoint, url), **kwargs)
def add_client(self, base_client_instance):
"""Add a new instance of :class:`BaseClient` descendant.
`self` will store a reference to `base_client_instance`.
Example:
>>> def test_clients():
... from marconiclient.common.apiclient.auth import keystone
... from marconiclient.common.apiclient import client
... auth = keystone.KeystoneV2AuthPlugin(
... username="user", password="pass", tenant_name="tenant",
... auth_url="http://auth:5000/v2.0")
... openstack_client = client.HttpClient(auth)
... # create nova client
... from novaclient.v1_1 import client
... client.Client(openstack_client)
... # create keystone client
... from keystoneclient.v2_0 import client
... client.Client(openstack_client)
... # use them
... openstack_client.identity.tenants.list()
... openstack_client.compute.servers.list()
"""
service_type = base_client_instance.service_type
if service_type and not hasattr(self, service_type):
setattr(self, service_type, base_client_instance)
def authenticate(self):
self.auth_plugin.authenticate(self)
# Store the authentication results in the keyring for later requests
if self.keyring_saver:
self.keyring_saver.save(self)
class BaseClient(object):
"""Top-level object to access the OpenStack API.
This client uses :class:`HttpClient` to send requests. :class:`HttpClient`
will handle a bunch of issues such as authentication.
"""
service_type = None
endpoint_type = None # "publicURL" will be used
endpoint = None
cached_endpoint = None
def __init__(self, http_client, extensions=None):
self.http_client = http_client
http_client.add_client(self)
# Add in any extensions...
if extensions:
for extension in extensions:
if extension.manager_class:
setattr(self, extension.name,
extension.manager_class(self))
def client_request(self, method, url, **kwargs):
return self.http_client.client_request(
self, method, url, **kwargs)
def head(self, url, **kwargs):
return self.client_request("HEAD", url, **kwargs)
def get(self, url, **kwargs):
return self.client_request("GET", url, **kwargs)
def post(self, url, **kwargs):
return self.client_request("POST", url, **kwargs)
def put(self, url, **kwargs):
return self.client_request("PUT", url, **kwargs)
def delete(self, url, **kwargs):
return self.client_request("DELETE", url, **kwargs)
def patch(self, url, **kwargs):
return self.client_request("PATCH", url, **kwargs)
@staticmethod
def get_class(api_name, version, version_map):
"""Returns the client class for the requested API version
:param api_name: the name of the API, e.g. 'compute', 'image', etc
:param version: the requested API version
:param version_map: a dict of client classes keyed by version
:rtype: a client class for the requested API version
"""
try:
client_path = version_map[str(version)]
except (KeyError, ValueError):
msg = "Invalid %s client version '%s'. must be one of: %s" % (
(api_name, version, ', '.join(version_map.keys())))
raise exceptions.UnsupportedVersion(msg)
return importutils.import_class(client_path)

View File

@@ -0,0 +1,435 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 Jacob Kaplan-Moss
# Copyright 2011 Nebula, Inc.
# Copyright 2013 Alessio Ababilov
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Exception definitions.
"""
import itertools
class ClientException(Exception):
"""The base exception class for all exceptions this library raises.
"""
pass
class MissingArgs(ClientException):
"""Supplied arguments are not sufficient for calling a function."""
def __init__(self, missing):
self.missing = missing
msg = "Missing argument(s): %s" % ", ".join(missing)
super(MissingArgs, self).__init__(msg)
class ValidationError(ClientException):
"""Error in validation on API client side."""
pass
class UnsupportedVersion(ClientException):
"""User is trying to use an unsupported version of the API."""
pass
class CommandError(ClientException):
"""Error in CLI tool."""
pass
class AuthorizationFailure(ClientException):
"""Cannot authorize API client."""
pass
class AuthPluginOptionsMissing(AuthorizationFailure):
"""Auth plugin misses some options."""
def __init__(self, opt_names):
super(AuthPluginOptionsMissing, self).__init__(
"Authentication failed. Missing options: %s" %
", ".join(opt_names))
self.opt_names = opt_names
class AuthSystemNotFound(AuthorizationFailure):
"""User has specified a AuthSystem that is not installed."""
def __init__(self, auth_system):
super(AuthSystemNotFound, self).__init__(
"AuthSystemNotFound: %s" % repr(auth_system))
self.auth_system = auth_system
class NoUniqueMatch(ClientException):
"""Multiple entities found instead of one."""
pass
class EndpointException(ClientException):
"""Something is rotten in Service Catalog."""
pass
class EndpointNotFound(EndpointException):
"""Could not find requested endpoint in Service Catalog."""
pass
class AmbiguousEndpoints(EndpointException):
"""Found more than one matching endpoint in Service Catalog."""
def __init__(self, endpoints=None):
super(AmbiguousEndpoints, self).__init__(
"AmbiguousEndpoints: %s" % repr(endpoints))
self.endpoints = endpoints
class HttpError(ClientException):
"""The base exception class for all HTTP exceptions.
"""
http_status = 0
message = "HTTP Error"
def __init__(self, message=None, details=None,
response=None, request_id=None,
url=None, method=None, http_status=None):
self.http_status = http_status or self.http_status
self.message = message or self.message
self.details = details
self.request_id = request_id
self.response = response
self.url = url
self.method = method
formatted_string = "%s (HTTP %s)" % (self.message, self.http_status)
if request_id:
formatted_string += " (Request-ID: %s)" % request_id
super(HttpError, self).__init__(formatted_string)
class HttpClientError(HttpError):
"""Client-side HTTP error.
Exception for cases in which the client seems to have erred.
"""
message = "HTTP Client Error"
class HttpServerError(HttpError):
"""Server-side HTTP error.
Exception for cases in which the server is aware that it has
erred or is incapable of performing the request.
"""
message = "HTTP Server Error"
class BadRequest(HttpClientError):
"""HTTP 400 - Bad Request.
The request cannot be fulfilled due to bad syntax.
"""
http_status = 400
message = "Bad Request"
class Unauthorized(HttpClientError):
"""HTTP 401 - Unauthorized.
Similar to 403 Forbidden, but specifically for use when authentication
is required and has failed or has not yet been provided.
"""
http_status = 401
message = "Unauthorized"
class PaymentRequired(HttpClientError):
"""HTTP 402 - Payment Required.
Reserved for future use.
"""
http_status = 402
message = "Payment Required"
class Forbidden(HttpClientError):
"""HTTP 403 - Forbidden.
The request was a valid request, but the server is refusing to respond
to it.
"""
http_status = 403
message = "Forbidden"
class NotFound(HttpClientError):
"""HTTP 404 - Not Found.
The requested resource could not be found but may be available again
in the future.
"""
http_status = 404
message = "Not Found"
class MethodNotAllowed(HttpClientError):
"""HTTP 405 - Method Not Allowed.
A request was made of a resource using a request method not supported
by that resource.
"""
http_status = 405
message = "Method Not Allowed"
class NotAcceptable(HttpClientError):
"""HTTP 406 - Not Acceptable.
The requested resource is only capable of generating content not
acceptable according to the Accept headers sent in the request.
"""
http_status = 406
message = "Not Acceptable"
class ProxyAuthenticationRequired(HttpClientError):
"""HTTP 407 - Proxy Authentication Required.
The client must first authenticate itself with the proxy.
"""
http_status = 407
message = "Proxy Authentication Required"
class RequestTimeout(HttpClientError):
"""HTTP 408 - Request Timeout.
The server timed out waiting for the request.
"""
http_status = 408
message = "Request Timeout"
class Conflict(HttpClientError):
"""HTTP 409 - Conflict.
Indicates that the request could not be processed because of conflict
in the request, such as an edit conflict.
"""
http_status = 409
message = "Conflict"
class Gone(HttpClientError):
"""HTTP 410 - Gone.
Indicates that the resource requested is no longer available and will
not be available again.
"""
http_status = 410
message = "Gone"
class LengthRequired(HttpClientError):
"""HTTP 411 - Length Required.
The request did not specify the length of its content, which is
required by the requested resource.
"""
http_status = 411
message = "Length Required"
class PreconditionFailed(HttpClientError):
"""HTTP 412 - Precondition Failed.
The server does not meet one of the preconditions that the requester
put on the request.
"""
http_status = 412
message = "Precondition Failed"
class RequestEntityTooLarge(HttpClientError):
"""HTTP 413 - Request Entity Too Large.
The request is larger than the server is willing or able to process.
"""
http_status = 413
message = "Request Entity Too Large"
def __init__(self, *args, **kwargs):
try:
self.retry_after = int(kwargs.pop('retry_after'))
except (KeyError, ValueError):
self.retry_after = 0
super(RequestEntityTooLarge, self).__init__(*args, **kwargs)
class RequestUriTooLong(HttpClientError):
"""HTTP 414 - Request-URI Too Long.
The URI provided was too long for the server to process.
"""
http_status = 414
message = "Request-URI Too Long"
class UnsupportedMediaType(HttpClientError):
"""HTTP 415 - Unsupported Media Type.
The request entity has a media type which the server or resource does
not support.
"""
http_status = 415
message = "Unsupported Media Type"
class RequestedRangeNotSatisfiable(HttpClientError):
"""HTTP 416 - Requested Range Not Satisfiable.
The client has asked for a portion of the file, but the server cannot
supply that portion.
"""
http_status = 416
message = "Requested Range Not Satisfiable"
class ExpectationFailed(HttpClientError):
"""HTTP 417 - Expectation Failed.
The server cannot meet the requirements of the Expect request-header field.
"""
http_status = 417
message = "Expectation Failed"
class UnprocessableEntity(HttpClientError):
"""HTTP 422 - Unprocessable Entity.
The request was well-formed but was unable to be followed due to semantic
errors.
"""
http_status = 422
message = "Unprocessable Entity"
class InternalServerError(HttpServerError):
"""HTTP 500 - Internal Server Error.
A generic error message, given when no more specific message is suitable.
"""
http_status = 500
message = "Internal Server Error"
# NotImplemented is a Python builtin
class HttpNotImplemented(HttpServerError):
"""HTTP 501 - Not Implemented.
The server either does not recognize the request method, or it lacks
the ability to fulfill the request.
"""
http_status = 501
message = "Not Implemented"
class BadGateway(HttpServerError):
"""HTTP 502 - Bad Gateway.
The server was acting as a gateway or proxy and received an invalid
response from the upstream server.
"""
http_status = 502
message = "Bad Gateway"
class ServiceUnavailable(HttpServerError):
"""HTTP 503 - Service Unavailable.
The server is currently unavailable.
"""
http_status = 503
message = "Service Unavailable"
class GatewayTimeout(HttpServerError):
"""HTTP 504 - Gateway Timeout.
The server was acting as a gateway or proxy and did not receive a timely
response from the upstream server.
"""
http_status = 504
message = "Gateway Timeout"
class HttpVersionNotSupported(HttpServerError):
"""HTTP 505 - HttpVersion Not Supported.
The server does not support the HTTP protocol version used in the request.
"""
http_status = 505
message = "HTTP Version Not Supported"
_code_map = dict(
(cls.http_status, cls)
for cls in itertools.chain(HttpClientError.__subclasses__(),
HttpServerError.__subclasses__()))
def from_response(response, method, url):
"""Returns an instance of :class:`HttpError` or subclass based on response.
:param response: instance of `requests.Response` class
:param method: HTTP method used for request
:param url: URL used for request
"""
kwargs = {
"http_status": response.status_code,
"response": response,
"method": method,
"url": url,
"request_id": response.headers.get("x-compute-request-id"),
}
if "retry-after" in response.headers:
kwargs["retry_after"] = response.headers["retry-after"]
content_type = response.headers.get("Content-Type", "")
if content_type.startswith("application/json"):
try:
body = response.json()
if len(body) == 1:
error = body.itervalues().next()
kwargs["message"] = error.get("message", None)
kwargs["details"] = error.get("details", None)
except (ValueError, TypeError):
pass
elif content_type.startswith("text/"):
kwargs["details"] = response.text
try:
cls = _code_map[response.status_code]
except KeyError:
if 500 <= response.status_code < 600:
cls = HttpServerError
elif 400 <= response.status_code < 500:
cls = HttpClientError
else:
cls = HttpError
return cls(**kwargs)

View File

@@ -0,0 +1,172 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A fake server that "responds" to API methods with pre-canned responses.
All of these responses come from the spec, so if for some reason the spec's
wrong the tests might raise AssertionError. I've indicated in comments the
places where actual behavior differs from the spec.
"""
# W0102: Dangerous default value %s as argument
# pylint: disable=W0102
import json
import urlparse
import requests
from marconiclient.common.apiclient import client
def assert_has_keys(dct, required=[], optional=[]):
for k in required:
try:
assert k in dct
except AssertionError:
extra_keys = set(dct.keys()).difference(set(required + optional))
raise AssertionError("found unexpected keys: %s" %
list(extra_keys))
class TestResponse(requests.Response):
"""Wrap requests.Response and provide a convenient initialization.
"""
def __init__(self, data):
super(TestResponse, self).__init__()
self._content_consumed = True
if isinstance(data, dict):
self.status_code = data.get('status_code', 200)
# Fake the text attribute to streamline Response creation
text = data.get('text', "")
if isinstance(text, (dict, list)):
self._content = json.dumps(text)
default_headers = {
"Content-Type": "application/json",
}
else:
self._content = text
default_headers = {}
self.headers = data.get('headers') or default_headers
else:
self.status_code = data
def __eq__(self, other):
return (self.status_code == other.status_code and
self.headers == other.headers and
self._content == other._content)
class FakeHttpClient(client.HttpClient):
def __init__(self, *args, **kwargs):
self.callstack = []
self.fixtures = kwargs.pop("fixtures", None) or {}
if not args and not "auth_plugin" in kwargs:
args = (None, )
super(FakeHttpClient, self).__init__(*args, **kwargs)
def assert_called(self, method, url, body=None, pos=-1):
"""Assert than an API method was just called.
"""
expected = (method, url)
called = self.callstack[pos][0:2]
assert self.callstack, \
"Expected %s %s but no calls were made." % expected
assert expected == called, 'Expected %s %s; got %s %s' % \
(expected + called)
if body is not None:
if self.callstack[pos][3] != body:
raise AssertionError('%r != %r' %
(self.callstack[pos][3], body))
def assert_called_anytime(self, method, url, body=None):
"""Assert than an API method was called anytime in the test.
"""
expected = (method, url)
assert self.callstack, \
"Expected %s %s but no calls were made." % expected
found = False
entry = None
for entry in self.callstack:
if expected == entry[0:2]:
found = True
break
assert found, 'Expected %s %s; got %s' % \
(method, url, self.callstack)
if body is not None:
assert entry[3] == body, "%s != %s" % (entry[3], body)
self.callstack = []
def clear_callstack(self):
self.callstack = []
def authenticate(self):
pass
def client_request(self, client, method, url, **kwargs):
# Check that certain things are called correctly
if method in ["GET", "DELETE"]:
assert "json" not in kwargs
# Note the call
self.callstack.append(
(method,
url,
kwargs.get("headers") or {},
kwargs.get("json") or kwargs.get("data")))
try:
fixture = self.fixtures[url][method]
except KeyError:
pass
else:
return TestResponse({"headers": fixture[0],
"text": fixture[1]})
# Call the method
args = urlparse.parse_qsl(urlparse.urlparse(url)[4])
kwargs.update(args)
munged_url = url.rsplit('?', 1)[0]
munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
munged_url = munged_url.replace('-', '_')
callback = "%s_%s" % (method.lower(), munged_url)
if not hasattr(self, callback):
raise AssertionError('Called unknown API method: %s %s, '
'expected fakes method name: %s' %
(method, url, callback))
resp = getattr(self, callback)(**kwargs)
if len(resp) == 3:
status, headers, body = resp
else:
status, body = resp
headers = {}
return TestResponse({
"status_code": status,
"text": body,
"headers": headers,
})

View File

@@ -0,0 +1,216 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# W0603: Using the global statement
# W0621: Redefining name %s from outer scope
# pylint: disable=W0102,W0603,W0621
import getpass
import inspect
import os
import sys
import textwrap
import prettytable
from marconiclient.common.apiclient import exceptions
from marconiclient.openstack.common import strutils
def validate_args(fn, *args, **kwargs):
"""Check that the supplied args are sufficient for calling a function.
>>> validate_args(lambda a: None)
Traceback (most recent call last):
...
MissingArgs: Missing argument(s): a
>>> validate_args(lambda a, b, c, d: None, 0, c=1)
Traceback (most recent call last):
...
MissingArgs: Missing argument(s): b, d
:param fn: the function to check
:param arg: the positional arguments supplied
:param kwargs: the keyword arguments supplied
"""
argspec = inspect.getargspec(fn)
num_defaults = len(argspec.defaults or [])
required_args = argspec.args[:len(argspec.args) - num_defaults]
def isbound(method):
return getattr(method, 'im_self', None) is not None
if isbound(fn):
required_args.pop(0)
missing = [arg for arg in required_args if arg not in kwargs]
missing = missing[len(args):]
if missing:
raise exceptions.MissingArgs(missing)
def arg(*args, **kwargs):
"""Decorator for CLI args.
Example:
>>> @arg("name", help="Name of the new entity")
... def entity_create(args):
... pass
"""
def _decorator(func):
add_arg(func, *args, **kwargs)
return func
return _decorator
def env(*args, **kwargs):
"""Returns the first environment variable set.
If all are empty, defaults to '' or keyword arg `default`.
"""
for arg in args:
value = os.environ.get(arg, None)
if value:
return value
return kwargs.get('default', '')
def add_arg(func, *args, **kwargs):
"""Bind CLI arguments to a shell.py `do_foo` function."""
if not hasattr(func, 'arguments'):
func.arguments = []
# NOTE(sirp): avoid dups that can occur when the module is shared across
# tests.
if (args, kwargs) not in func.arguments:
# Because of the sematics of decorator composition if we just append
# to the options list positional options will appear to be backwards.
func.arguments.insert(0, (args, kwargs))
def unauthenticated(func):
"""Adds 'unauthenticated' attribute to decorated function.
Usage:
>>> @unauthenticated
... def mymethod(f):
... pass
"""
func.unauthenticated = True
return func
def isunauthenticated(func):
"""Checks if the function does not require authentication.
Mark such functions with the `@unauthenticated` decorator.
:returns: bool
"""
return getattr(func, 'unauthenticated', False)
def print_list(objs, fields, formatters=None, sortby_index=0,
mixed_case_fields=None):
"""Print a list or objects as a table, one row per object.
:param objs: iterable of :class:`Resource`
:param fields: attributes that correspond to columns, in order
:param formatters: `dict` of callables for field formatting
:param sortby_index: index of the field for sorting table rows
:param mixed_case_fields: fields corresponding to object attributes that
have mixed case names (e.g., 'serverId')
"""
formatters = formatters or {}
mixed_case_fields = mixed_case_fields or []
if sortby_index is None:
sortby = None
else:
sortby = fields[sortby_index]
pt = prettytable.PrettyTable(fields, caching=False)
pt.align = 'l'
for o in objs:
row = []
for field in fields:
if field in formatters:
row.append(formatters[field](o))
else:
if field in mixed_case_fields:
field_name = field.replace(' ', '_')
else:
field_name = field.lower().replace(' ', '_')
data = getattr(o, field_name, '')
row.append(data)
pt.add_row(row)
if sortby is not None:
print(strutils.safe_encode(pt.get_string(sortby=sortby)))
else:
print(strutils.safe_encode(pt.get_string()))
def print_dict(dct, dict_property="Property", wrap=0):
"""Print a `dict` as a table of two columns.
:param dct: `dict` to print
:param dict_property: name of the first column
:param wrap: wrapping for the second column
"""
pt = prettytable.PrettyTable([dict_property, 'Value'], caching=False)
pt.align = 'l'
for k, v in dct.iteritems():
# convert dict to str to check length
if isinstance(v, dict):
v = str(v)
if wrap > 0:
v = textwrap.fill(str(v), wrap)
# if value has a newline, add in multiple rows
# e.g. fault with stacktrace
if v and isinstance(v, basestring) and r'\n' in v:
lines = v.strip().split(r'\n')
col1 = k
for line in lines:
pt.add_row([col1, line])
col1 = ''
else:
pt.add_row([k, v])
print(strutils.safe_encode(pt.get_string()))
def get_password(max_password_prompts=3):
"""Read password from TTY."""
verify = strutils.bool_from_string(env("OS_VERIFY_PASSWORD"))
pw = None
if hasattr(sys.stdin, "isatty") and sys.stdin.isatty():
# Check for Ctrl-D
try:
for _ in xrange(max_password_prompts):
pw1 = getpass.getpass("OS Password: ")
if verify:
pw2 = getpass.getpass("Please verify: ")
else:
pw2 = pw1
if pw1 == pw2 and pw1:
pw = pw1
break
except EOFError:
pass
return pw

View File

View File

@@ -0,0 +1,226 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
# All Rights Reserved.
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
gettext for openstack-common modules.
Usual usage in an openstack.common module:
from marconiclient.openstack.common.gettextutils import _
"""
import copy
import gettext
import logging.handlers
import os
import UserString
_localedir = os.environ.get('marconiclient'.upper() + '_LOCALEDIR')
_t = gettext.translation('marconiclient', localedir=_localedir, fallback=True)
def _(msg):
return _t.ugettext(msg)
def install(domain):
"""Install a _() function using the given translation domain.
Given a translation domain, install a _() function using gettext's
install() function.
The main difference from gettext.install() is that we allow
overriding the default localedir (e.g. /usr/share/locale) using
a translation-domain-specific environment variable (e.g.
NOVA_LOCALEDIR).
"""
gettext.install(domain,
localedir=os.environ.get(domain.upper() + '_LOCALEDIR'),
unicode=True)
"""
Lazy gettext functionality.
The following is an attempt to introduce a deferred way
to do translations on messages in OpenStack. We attempt to
override the standard _() function and % (format string) operation
to build Message objects that can later be translated when we have
more information. Also included is an example LogHandler that
translates Messages to an associated locale, effectively allowing
many logs, each with their own locale.
"""
def get_lazy_gettext(domain):
"""Assemble and return a lazy gettext function for a given domain.
Factory method for a project/module to get a lazy gettext function
for its own translation domain (i.e. nova, glance, cinder, etc.)
"""
def _lazy_gettext(msg):
"""
Create and return a Message object encapsulating a string
so that we can translate it later when needed.
"""
return Message(msg, domain)
return _lazy_gettext
class Message(UserString.UserString, object):
"""Class used to encapsulate translatable messages."""
def __init__(self, msg, domain):
# _msg is the gettext msgid and should never change
self._msg = msg
self._left_extra_msg = ''
self._right_extra_msg = ''
self.params = None
self.locale = None
self.domain = domain
@property
def data(self):
# NOTE(mrodden): this should always resolve to a unicode string
# that best represents the state of the message currently
localedir = os.environ.get(self.domain.upper() + '_LOCALEDIR')
if self.locale:
lang = gettext.translation(self.domain,
localedir=localedir,
languages=[self.locale],
fallback=True)
else:
# use system locale for translations
lang = gettext.translation(self.domain,
localedir=localedir,
fallback=True)
full_msg = (self._left_extra_msg +
lang.ugettext(self._msg) +
self._right_extra_msg)
if self.params is not None:
full_msg = full_msg % self.params
return unicode(full_msg)
def _save_parameters(self, other):
# we check for None later to see if
# we actually have parameters to inject,
# so encapsulate if our parameter is actually None
if other is None:
self.params = (other, )
else:
self.params = copy.deepcopy(other)
return self
# overrides to be more string-like
def __unicode__(self):
return self.data
def __str__(self):
return self.data.encode('utf-8')
def __getstate__(self):
to_copy = ['_msg', '_right_extra_msg', '_left_extra_msg',
'domain', 'params', 'locale']
new_dict = self.__dict__.fromkeys(to_copy)
for attr in to_copy:
new_dict[attr] = copy.deepcopy(self.__dict__[attr])
return new_dict
def __setstate__(self, state):
for (k, v) in state.items():
setattr(self, k, v)
# operator overloads
def __add__(self, other):
copied = copy.deepcopy(self)
copied._right_extra_msg += other.__str__()
return copied
def __radd__(self, other):
copied = copy.deepcopy(self)
copied._left_extra_msg += other.__str__()
return copied
def __mod__(self, other):
# do a format string to catch and raise
# any possible KeyErrors from missing parameters
self.data % other
copied = copy.deepcopy(self)
return copied._save_parameters(other)
def __mul__(self, other):
return self.data * other
def __rmul__(self, other):
return other * self.data
def __getitem__(self, key):
return self.data[key]
def __getslice__(self, start, end):
return self.data.__getslice__(start, end)
def __getattribute__(self, name):
# NOTE(mrodden): handle lossy operations that we can't deal with yet
# These override the UserString implementation, since UserString
# uses our __class__ attribute to try and build a new message
# after running the inner data string through the operation.
# At that point, we have lost the gettext message id and can just
# safely resolve to a string instead.
ops = ['capitalize', 'center', 'decode', 'encode',
'expandtabs', 'ljust', 'lstrip', 'replace', 'rjust', 'rstrip',
'strip', 'swapcase', 'title', 'translate', 'upper', 'zfill']
if name in ops:
return getattr(self.data, name)
else:
return UserString.UserString.__getattribute__(self, name)
class LocaleHandler(logging.Handler):
"""Handler that can have a locale associated to translate Messages.
A quick example of how to utilize the Message class above.
LocaleHandler takes a locale and a target logging.Handler object
to forward LogRecord objects to after translating the internal Message.
"""
def __init__(self, locale, target):
"""
Initialize a LocaleHandler
:param locale: locale to use for translating messages
:param target: logging.Handler object to forward
LogRecord objects to after translation
"""
logging.Handler.__init__(self)
self.locale = locale
self.target = target
def emit(self, record):
if isinstance(record.msg, Message):
# set the locale and resolve to a string
record.msg.locale = self.locale
self.target.emit(record)

View File

@@ -0,0 +1,67 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Import related utilities and helper functions.
"""
import sys
import traceback
def import_class(import_str):
"""Returns a class from a string including module and class."""
mod_str, _sep, class_str = import_str.rpartition('.')
try:
__import__(mod_str)
return getattr(sys.modules[mod_str], class_str)
except (ValueError, AttributeError):
raise ImportError('Class %s cannot be found (%s)' %
(class_str,
traceback.format_exception(*sys.exc_info())))
def import_object(import_str, *args, **kwargs):
"""Import a class and return an instance of it."""
return import_class(import_str)(*args, **kwargs)
def import_object_ns(name_space, import_str, *args, **kwargs):
"""
Import a class and return an instance of it, first by trying
to find the class in a default namespace, then failing back to
a full path if not found in the default namespace.
"""
import_value = "%s.%s" % (name_space, import_str)
try:
return import_class(import_value)(*args, **kwargs)
except ImportError:
return import_class(import_str)(*args, **kwargs)
def import_module(import_str):
"""Import a module."""
__import__(import_str)
return sys.modules[import_str]
def try_import(import_str, default=None):
"""Try to import a module and if it fails return default."""
try:
return import_module(import_str)
except ImportError:
return default

View File

@@ -0,0 +1,219 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
System-level utilities and helper functions.
"""
import re
import sys
import unicodedata
from marconiclient.openstack.common.gettextutils import _
# Used for looking up extensions of text
# to their 'multiplied' byte amount
BYTE_MULTIPLIERS = {
'': 1,
't': 1024 ** 4,
'g': 1024 ** 3,
'm': 1024 ** 2,
'k': 1024,
}
TRUE_STRINGS = ('1', 't', 'true', 'on', 'y', 'yes')
FALSE_STRINGS = ('0', 'f', 'false', 'off', 'n', 'no')
SLUGIFY_STRIP_RE = re.compile(r"[^\w\s-]")
SLUGIFY_HYPHENATE_RE = re.compile(r"[-\s]+")
def int_from_bool_as_string(subject):
"""
Interpret a string as a boolean and return either 1 or 0.
Any string value in:
('True', 'true', 'On', 'on', '1')
is interpreted as a boolean True.
Useful for JSON-decoded stuff and config file parsing
"""
return bool_from_string(subject) and 1 or 0
def bool_from_string(subject, strict=False):
"""
Interpret a string as a boolean.
A case-insensitive match is performed such that strings matching 't',
'true', 'on', 'y', 'yes', or '1' are considered True and, when
`strict=False`, anything else is considered False.
Useful for JSON-decoded stuff and config file parsing.
If `strict=True`, unrecognized values, including None, will raise a
ValueError which is useful when parsing values passed in from an API call.
Strings yielding False are 'f', 'false', 'off', 'n', 'no', or '0'.
"""
if not isinstance(subject, basestring):
subject = str(subject)
lowered = subject.strip().lower()
if lowered in TRUE_STRINGS:
return True
elif lowered in FALSE_STRINGS:
return False
elif strict:
acceptable = ', '.join(
"'%s'" % s for s in sorted(TRUE_STRINGS + FALSE_STRINGS))
msg = _("Unrecognized value '%(val)s', acceptable values are:"
" %(acceptable)s") % {'val': subject,
'acceptable': acceptable}
raise ValueError(msg)
else:
return False
def safe_decode(text, incoming=None, errors='strict'):
"""
Decodes incoming str using `incoming` if they're
not already unicode.
:param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: text or a unicode `incoming` encoded
representation of it.
:raises TypeError: If text is not an isntance of basestring
"""
if not isinstance(text, basestring):
raise TypeError("%s can't be decoded" % type(text))
if isinstance(text, unicode):
return text
if not incoming:
incoming = (sys.stdin.encoding or
sys.getdefaultencoding())
try:
return text.decode(incoming, errors)
except UnicodeDecodeError:
# Note(flaper87) If we get here, it means that
# sys.stdin.encoding / sys.getdefaultencoding
# didn't return a suitable encoding to decode
# text. This happens mostly when global LANG
# var is not set correctly and there's no
# default encoding. In this case, most likely
# python will use ASCII or ANSI encoders as
# default encodings but they won't be capable
# of decoding non-ASCII characters.
#
# Also, UTF-8 is being used since it's an ASCII
# extension.
return text.decode('utf-8', errors)
def safe_encode(text, incoming=None,
encoding='utf-8', errors='strict'):
"""
Encodes incoming str/unicode using `encoding`. If
incoming is not specified, text is expected to
be encoded with current python's default encoding.
(`sys.getdefaultencoding`)
:param incoming: Text's current encoding
:param encoding: Expected encoding for text (Default UTF-8)
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: text or a bytestring `encoding` encoded
representation of it.
:raises TypeError: If text is not an isntance of basestring
"""
if not isinstance(text, basestring):
raise TypeError("%s can't be encoded" % type(text))
if not incoming:
incoming = (sys.stdin.encoding or
sys.getdefaultencoding())
if isinstance(text, unicode):
return text.encode(encoding, errors)
elif text and encoding != incoming:
# Decode text before encoding it with `encoding`
text = safe_decode(text, incoming, errors)
return text.encode(encoding, errors)
return text
def to_bytes(text, default=0):
"""Try to turn a string into a number of bytes. Looks at the last
characters of the text to determine what conversion is needed to
turn the input text into a byte number.
Supports: B/b, K/k, M/m, G/g, T/t (or the same with b/B on the end)
"""
# Take off everything not number 'like' (which should leave
# only the byte 'identifier' left)
mult_key_org = text.lstrip('-1234567890')
mult_key = mult_key_org.lower()
mult_key_len = len(mult_key)
if mult_key.endswith("b"):
mult_key = mult_key[0:-1]
try:
multiplier = BYTE_MULTIPLIERS[mult_key]
if mult_key_len:
# Empty cases shouldn't cause text[0:-0]
text = text[0:-mult_key_len]
return int(text) * multiplier
except KeyError:
msg = _('Unknown byte multiplier: %s') % mult_key_org
raise TypeError(msg)
except ValueError:
return default
def to_slug(value, incoming=None, errors="strict"):
"""Normalize string.
Convert to lowercase, remove non-word characters, and convert spaces
to hyphens.
Inspired by Django's `slugify` filter.
:param value: Text to slugify
:param incoming: Text's current encoding
:param errors: Errors handling policy. See here for valid
values http://docs.python.org/2/library/codecs.html
:returns: slugified unicode representation of `value`
:raises TypeError: If text is not an instance of basestring
"""
value = safe_decode(value, incoming, errors)
# NOTE(aababilov): no need to use safe_(encode|decode) here:
# encodings are always "ascii", error handling is always "ignore"
# and types are always known (first: unicode; second: str)
value = unicodedata.normalize("NFKD", value).encode(
"ascii", "ignore").decode("ascii")
value = SLUGIFY_STRIP_RE.sub("", value).strip().lower()
return SLUGIFY_HYPHENATE_RE.sub("-", value)

View File

@@ -0,0 +1,187 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import calendar
import datetime
import iso8601
# ISO 8601 extended time format with microseconds
_ISO8601_TIME_FORMAT_SUBSECOND = '%Y-%m-%dT%H:%M:%S.%f'
_ISO8601_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
PERFECT_TIME_FORMAT = _ISO8601_TIME_FORMAT_SUBSECOND
def isotime(at=None, subsecond=False):
"""Stringify time in ISO 8601 format."""
if not at:
at = utcnow()
st = at.strftime(_ISO8601_TIME_FORMAT
if not subsecond
else _ISO8601_TIME_FORMAT_SUBSECOND)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
st += ('Z' if tz == 'UTC' else tz)
return st
def parse_isotime(timestr):
"""Parse time from ISO 8601 format."""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
except TypeError as e:
raise ValueError(e.message)
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow."""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC naive object."""
offset = timestamp.utcoffset()
if offset is None:
return timestamp
return timestamp.replace(tzinfo=None) - offset
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
if isinstance(before, basestring):
before = parse_strtime(before).replace(tzinfo=None)
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
if isinstance(after, basestring):
after = parse_strtime(after).replace(tzinfo=None)
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return calendar.timegm(utcnow().timetuple())
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
try:
return utcnow.override_time.pop(0)
except AttributeError:
return utcnow.override_time
return datetime.datetime.utcnow()
def iso8601_from_timestamp(timestamp):
"""Returns a iso8601 formated date from timestamp."""
return isotime(datetime.datetime.utcfromtimestamp(timestamp))
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
"""
Override utils.utcnow to return a constant time or a list thereof,
one at a time.
"""
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overridden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
try:
for dt in utcnow.override_time:
dt += timedelta
except TypeError:
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overridden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None
def marshall_now(now=None):
"""Make an rpc-safe datetime with microseconds.
Note: tzinfo is stripped, but not required for relative times.
"""
if not now:
now = utcnow()
return dict(day=now.day, month=now.month, year=now.year, hour=now.hour,
minute=now.minute, second=now.second,
microsecond=now.microsecond)
def unmarshall_time(tyme):
"""Unmarshall a datetime dict."""
return datetime.datetime(day=tyme['day'],
month=tyme['month'],
year=tyme['year'],
hour=tyme['hour'],
minute=tyme['minute'],
second=tyme['second'],
microsecond=tyme['microsecond'])
def delta_seconds(before, after):
"""
Compute the difference in seconds between two date, time, or
datetime objects (as a float, to microsecond resolution).
"""
delta = after - before
try:
return delta.total_seconds()
except AttributeError:
return ((delta.days * 24 * 3600) + delta.seconds +
float(delta.microseconds) / (10 ** 6))
def is_soon(dt, window):
"""
Determines if time is going to happen in the next window seconds.
:params dt: the time
:params window: minimum seconds to remain to consider the time not soon
:return: True if expiration is within the given duration
"""
soon = (utcnow() + datetime.timedelta(seconds=window))
return normalize_time(dt) <= soon

7
openstack-common.conf Normal file
View File

@@ -0,0 +1,7 @@
[DEFAULT]
# The list of modules to copy from openstack-common
modules=importutils,strutils,timeutils
# The base module to hold the copy of openstack.common
base=marconiclient

View File

@@ -1,2 +1,7 @@
argparse
d2to1>=0.2.10,<0.3
iso8601>=0.1.4
pbr>=0.5.16,<0.6
prettytable>=0.6,<0.8
requests>=1.1,<1.2.3
stevedore>=0.9

View File

@@ -29,6 +29,12 @@ setup-hooks =
packages =
marconiclient
[entry_points]
marconiclient.common.apiclient.auth =
endpoint-token = marconiclient.common.apiclient.auth.endpoint:EndpointTokenAuthPlugin
keystone = marconiclient.common.apiclient.auth.keystone:KeystoneV2AuthPlugin
nova = marconiclient.common.apiclient.auth.nova:NovaLegacyAuthPlugin
[build_sphinx]
source-dir = doc/source
build-dir = doc/build

14
tests/common/__init__.py Normal file
View File

@@ -0,0 +1,14 @@
# Copyright (c) 2013 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,19 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(jaypipes) Code in this module is intended to be ported to the eventual
# openstack-common library

View File

@@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -0,0 +1,181 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import fixtures
import mock
import requests
from stevedore import extension
try:
import json
except ImportError:
import simplejson as json
from marconiclient.common.apiclient.auth import base
from marconiclient.common.apiclient import client
from marconiclient.common.apiclient import fake_client
from tests import utils
TEST_REQUEST_BASE = {
'verify': True,
}
def mock_http_request(resp=None):
"""Mock an HTTP Request."""
if not resp:
resp = {
"access": {
"token": {
"expires": "12345",
"id": "FAKE_ID",
"tenant": {
"id": "FAKE_TENANT_ID",
}
},
"serviceCatalog": [
{
"type": "compute",
"endpoints": [
{
"region": "RegionOne",
"adminURL": "http://localhost:8774/v1.1",
"internalURL": "http://localhost:8774/v1.1",
"publicURL": "http://localhost:8774/v1.1/",
},
],
},
],
},
}
auth_response = fake_client.TestResponse({
"status_code": 200,
"text": json.dumps(resp),
})
return mock.Mock(return_value=(auth_response))
def requested_headers(cs):
"""Return requested passed headers."""
return {
'User-Agent': cs.user_agent,
'Content-Type': 'application/json',
}
class GlobalFunctionsTest(utils.BaseTestCase):
def test_load_auth_system_opts(self):
self.useFixture(fixtures.MonkeyPatch(
"os.environ",
{"OS_TENANT_NAME": "fake-project",
"OS_USERNAME": "fake-username"}))
parser = argparse.ArgumentParser()
base.discover_auth_systems()
base.load_auth_system_opts(parser)
options = parser.parse_args(
["--os-auth-url=fake-url", "--os_auth_system=fake-system"])
self.assertTrue(options.os_tenant_name, "fake-project")
self.assertTrue(options.os_username, "fake-username")
self.assertTrue(options.os_auth_url, "fake-url")
self.assertTrue(options.os_auth_system, "fake-system")
class MockEntrypoint(object):
def __init__(self, name, plugin):
self.name = name
self.plugin = plugin
class AuthPluginTest(utils.BaseTestCase):
@mock.patch.object(requests.Session, "request")
@mock.patch.object(extension.ExtensionManager, "map")
def test_auth_system_success(self, mock_mgr_map, mock_request):
"""Test that we can authenticate using the auth system."""
class FakePlugin(base.BaseAuthPlugin):
def authenticate(self, cls):
cls.request(
"POST", "http://auth/tokens",
json={"fake": "me"}, allow_redirects=True)
mock_mgr_map.side_effect = (
lambda func: func(MockEntrypoint("fake", FakePlugin)))
mock_request.side_effect = mock_http_request()
base.discover_auth_systems()
plugin = base.load_plugin("fake")
cs = client.HttpClient(auth_plugin=plugin)
cs.authenticate()
headers = requested_headers(cs)
mock_request.assert_called_with(
"POST",
"http://auth/tokens",
headers=headers,
data='{"fake": "me"}',
allow_redirects=True,
**TEST_REQUEST_BASE)
@mock.patch.object(extension.ExtensionManager, "map")
def test_discover_auth_system_options(self, mock_mgr_map):
"""Test that we can load the auth system options."""
class FakePlugin(base.BaseAuthPlugin):
@classmethod
def add_opts(cls, parser):
parser.add_argument('--auth_system_opt',
default=False,
action='store_true',
help="Fake option")
def authenticate(self, http_client):
pass
mock_mgr_map.side_effect = (
lambda func: func(MockEntrypoint("fake", FakePlugin)))
parser = argparse.ArgumentParser()
base.discover_auth_systems()
base.load_auth_system_opts(parser)
opts, _args = parser.parse_known_args(['--auth_system_opt'])
self.assertTrue(opts.auth_system_opt)
@mock.patch.object(extension.ExtensionManager, "map")
def test_parse_auth_system_options(self, mock_mgr_map):
"""Test that we can parse the auth system options."""
class FakePlugin(base.BaseAuthPlugin):
opt_names = ["fake_argument"]
def authenticate(self, http_client):
pass
mock_mgr_map.side_effect = (
lambda func: func(MockEntrypoint("fake", FakePlugin)))
base.discover_auth_systems()
plugin = base.load_plugin("fake")
plugin.parse_opts([])
self.assertIn("fake_argument", plugin.opts)

View File

@@ -0,0 +1,93 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import requests
try:
import json
except ImportError:
import simplejson as json
from marconiclient.common.apiclient.auth import keystone
from marconiclient.common.apiclient import client
from marconiclient.common.apiclient import exceptions
from marconiclient.common.apiclient import fake_client
from tests import utils
class KeystoneV2AuthPluginTest(utils.BaseTestCase):
def test_authenticate(self):
http_client = client.HttpClient(None)
mock_request = mock.Mock()
mock_request.return_value = fake_client.TestResponse({
"status_code": 200,
"text": {"access": {}}
})
successful_tests = [
{
"kwargs": ["tenant_id", "token", "auth_url"],
"data": {
"auth": {
"token": {"id": "token"}, "tenantId": "tenant_id"
},
},
},
{
"kwargs": ["tenant_name", "token", "auth_url"],
"data": {
"auth": {
"token": {"id": "token"}, "tenantName": "tenant_name"
},
},
},
{
"kwargs": ["username", "password", "tenant_name", "auth_url"],
"data": {
"auth": {
"tenantName": "tenant_name",
"passwordCredentials": {
"username": "username",
"password": "password",
},
},
},
},
]
with mock.patch("requests.Session.request", mock_request):
for test in successful_tests:
kwargs = dict((k, k) for k in test["kwargs"])
auth = keystone.KeystoneV2AuthPlugin(**kwargs)
http_client.auth_plugin = auth
http_client.authenticate()
requests.Session.request.assert_called_with(
"POST",
"auth_url/tokens",
headers=mock.ANY,
allow_redirects=True,
data=json.dumps(test["data"]),
verify=mock.ANY)
auth = keystone.KeystoneV2AuthPlugin(
password="password",
tenant_name="tenant_name",
auth_url="auth_url")
http_client.auth_plugin = auth
self.assertRaises(exceptions.AuthPluginOptionsMissing,
http_client.authenticate)

View File

@@ -0,0 +1,59 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import requests
from marconiclient.common.apiclient.auth import nova
from marconiclient.common.apiclient import client
from marconiclient.common.apiclient import fake_client
from tests import utils
class NovaLegacyAuthPluginTest(utils.BaseTestCase):
def test_authenticate(self):
http_client = client.HttpClient(None)
mock_request = mock.Mock()
mock_request.return_value = fake_client.TestResponse({
"status_code": 200,
"text": {"access": {}},
"headers": {
"X-Auth-Token": "token",
"X-Server-Management-Url": "url",
},
})
with mock.patch("requests.Session.request", mock_request):
auth = nova.NovaLegacyAuthPlugin(
username="username",
password="password",
project_id="project_id",
auth_url="auth_url")
http_client.auth_plugin = auth
http_client.authenticate()
requests.Session.request.assert_called_with(
"GET",
"auth_url",
headers={
"X-Auth-Project-Id": "project_id",
"X-Auth-Key": "password",
"X-Auth-User": "username",
"User-Agent": http_client.user_agent
},
allow_redirects=True,
verify=True)

View File

@@ -0,0 +1,296 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from marconiclient.common.apiclient.auth import response
from marconiclient.common.apiclient import exceptions
from marconiclient.openstack.common import timeutils
from tests import utils
# Taken directly from keystone/content/common/samples/auth.json
# Do not edit this structure. Instead, grab the latest from there.
SERVICE_CATALOG = {
"access": {
"token": {
"id": "ab48a9efdfedb23ty3494",
"expires": "2010-11-01T03:32:15-05:00",
"tenant": {
"id": "345",
"name": "My Project"
}
},
"user": {
"id": "123",
"name": "jqsmith",
"roles": [
{
"id": "234",
"name": "compute:admin",
},
{
"id": "235",
"name": "object-store:admin",
"tenantId": "1",
}
],
"roles_links": [],
},
"serviceCatalog": [
{
"name": "Cloud Servers",
"type": "compute",
"endpoints": [
{
# Tenant 1, no region, v1.0
"tenantId": "1",
"publicURL": "https://compute1.host/v1/1",
"internalURL": "https://compute1.host/v1/1",
"versionId": "1.0",
"versionInfo": "https://compute1.host/v1.0/",
"versionList": "https://compute1.host/"
},
{
# Tenant 2, with region, v1.1
"tenantId": "2",
"publicURL": "https://compute1.host/v1.1/2",
"internalURL": "https://compute1.host/v1.1/2",
"region": "North",
"versionId": "1.1",
"versionInfo": "https://compute1.host/v1.1/",
"versionList": "https://compute1.host/"
},
{
# Tenant 1, with region, v2.0
"tenantId": "1",
"publicURL": "https://compute1.host/v2/1",
"internalURL": "https://compute1.host/v2/1",
"region": "North",
"versionId": "2",
"versionInfo": "https://compute1.host/v2/",
"versionList": "https://compute1.host/"
},
],
"endpoints_links": [],
},
{
"name": "Nova Volumes",
"type": "volume",
"endpoints": [
{
"tenantId": "1",
"publicURL": "https://volume1.host/v1/1",
"internalURL": "https://volume1.host/v1/1",
"region": "South",
"versionId": "1.0",
"versionInfo": "uri",
"versionList": "uri"
},
{
"tenantId": "2",
"publicURL": "https://volume1.host/v1.1/2",
"internalURL": "https://volume1.host/v1.1/2",
"region": "South",
"versionId": "1.1",
"versionInfo": "https://volume1.host/v1.1/",
"versionList": "https://volume1.host/"
},
],
"endpoints_links": [
{
"rel": "next",
"href": "https://identity1.host/v2.0/endpoints"
},
],
},
],
"serviceCatalog_links": [
{
"rel": "next",
"href": "https://identity.host/v2.0/endpoints?session=2hfh8Ar",
},
],
},
}
UNSCOPED_TOKEN = {
u'access': {u'serviceCatalog': {},
u'token': {u'expires': u'2012-10-03T16:58:01Z',
u'id': u'3e2813b7ba0b4006840c3825860b86ed'},
u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
u'roles': [],
u'roles_links': [],
u'username': u'exampleuser'}
}
}
PROJECT_SCOPED_TOKEN = {
u'access': {
u'serviceCatalog': [{
u'endpoints': [{
u'adminURL': u'http://admin:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'internalURL':
u'http://internal:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'publicURL':
u'http://public.com:8776/v1/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne'
}],
u'endpoints_links': [],
u'name': u'Volume Service',
u'type': u'volume'},
{u'endpoints': [{
u'adminURL': u'http://admin:9292/v1',
u'internalURL': u'http://internal:9292/v1',
u'publicURL': u'http://public.com:9292/v1',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Image Service',
u'type': u'image'},
{u'endpoints': [{
u'adminURL': u'http://admin:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'internalURL': u'http://internal:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'publicURL': u'http://public.com:8774/v2/225da22d3ce34b15877ea70b2a575f58',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Compute Service',
u'type': u'compute'},
{u'endpoints': [{
u'adminURL': u'http://admin:8773/services/Admin',
u'internalURL': u'http://internal:8773/services/Cloud',
u'publicURL': u'http://public.com:8773/services/Cloud',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'EC2 Service',
u'type': u'ec2'},
{u'endpoints': [{
u'adminURL': u'http://admin:35357/v2.0',
u'internalURL': u'http://internal:5000/v2.0',
u'publicURL': u'http://public.com:5000/v2.0',
u'region': u'RegionOne'}],
u'endpoints_links': [],
u'name': u'Identity Service',
u'type': u'identity'}],
u'token': {u'expires': u'2012-10-03T16:53:36Z',
u'id': u'04c7d5ffaeef485f9dc69c06db285bdb',
u'tenant': {u'description': u'',
u'enabled': True,
u'id': u'225da22d3ce34b15877ea70b2a575f58',
u'name': u'exampleproject'}},
u'user': {u'id': u'c4da488862bd435c9e6c0275a0d0e49a',
u'name': u'exampleuser',
u'roles': [{u'id': u'edc12489faa74ee0aca0b8a0b4d74a74',
u'name': u'Member'}],
u'roles_links': [],
u'username': u'exampleuser'}
}
}
class AuthResponseTest(utils.BaseTestCase):
def test_building_unscoped(self):
auth_resp = response.AuthResponse(UNSCOPED_TOKEN)
self.assertTrue(auth_resp)
self.assertIn('access', auth_resp)
self.assertEquals(auth_resp.token,
'3e2813b7ba0b4006840c3825860b86ed')
self.assertEquals(auth_resp.username, 'exampleuser')
self.assertEquals(auth_resp.user_id,
'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(auth_resp.tenant_name, None)
self.assertEquals(auth_resp.tenant_id, None)
self.assertFalse(auth_resp.scoped)
self.assertEquals(auth_resp.expires, timeutils.parse_isotime(
UNSCOPED_TOKEN['access']['token']['expires']))
def test_building_scoped(self):
auth_resp = response.AuthResponse(PROJECT_SCOPED_TOKEN)
self.assertTrue(auth_resp)
self.assertIn('access', auth_resp)
self.assertEquals(auth_resp.token,
'04c7d5ffaeef485f9dc69c06db285bdb')
self.assertEquals(auth_resp.username, 'exampleuser')
self.assertEquals(auth_resp.user_id,
'c4da488862bd435c9e6c0275a0d0e49a')
self.assertEquals(auth_resp.tenant_name, 'exampleproject')
self.assertEquals(auth_resp.tenant_id,
'225da22d3ce34b15877ea70b2a575f58')
self.assertEquals(auth_resp.tenant_name, auth_resp.project_name)
self.assertEquals(auth_resp.tenant_id, auth_resp.project_id)
self.assertTrue(auth_resp.scoped)
def test_building_empty(self):
auth_resp = response.AuthResponse({})
self.assertFalse(auth_resp)
self.assertEquals(auth_resp.expires, None)
self.assertEquals(auth_resp.token, None)
self.assertEquals(auth_resp.username, None)
self.assertEquals(auth_resp.user_id, None)
self.assertEquals(auth_resp.tenant_name, None)
self.assertEquals(auth_resp.project_name, None)
self.assertFalse(auth_resp.scoped)
self.assertEquals(auth_resp.tenant_id, None)
self.assertEquals(auth_resp.project_id, None)
self.assertRaises(exceptions.EndpointNotFound,
auth_resp.url_for,
endpoint_type="publicURL",
service_type="compute",
filter_attrs={"region": "South"})
def test_url_for(self):
auth_resp = response.AuthResponse(SERVICE_CATALOG)
self.assertRaises(exceptions.AmbiguousEndpoints,
auth_resp.url_for,
endpoint_type="publicURL",
service_type="compute")
self.assertEquals(auth_resp.url_for(endpoint_type="publicURL",
service_type="compute",
filter_attrs={"tenantId": "1"}),
"https://compute1.host/v2/1")
self.assertEquals(auth_resp.url_for(endpoint_type="publicURL",
service_type="compute",
filter_attrs={"tenantId": "2"}),
"https://compute1.host/v1.1/2")
self.assertRaises(exceptions.EndpointNotFound,
auth_resp.url_for,
endpoint_type="publicURL",
service_type="compute",
filter_attrs={"region": "South"})
def test_url_for_case_insensitive(self):
auth_resp = response.AuthResponse(SERVICE_CATALOG)
# Matching south (and catalog has South).
self.assertRaises(exceptions.AmbiguousEndpoints,
auth_resp.url_for,
endpoint_type="publicURL",
service_type="volume",
filter_attrs={"region": "south"})

View File

@@ -0,0 +1,240 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from marconiclient.common.apiclient import base
from marconiclient.common.apiclient import client
from marconiclient.common.apiclient import exceptions
from marconiclient.common.apiclient import fake_client
from tests import utils
class HumanResource(base.Resource):
HUMAN_ID = True
class HumanResourceManager(base.ManagerWithFind):
resource_class = HumanResource
def list(self):
return self._list("/human_resources", "human_resources")
def get(self, human_resource):
return self._get(
"/human_resources/%s" % base.getid(human_resource),
"human_resource")
def update(self, human_resource, name):
body = {
"human_resource": {
"name": name,
},
}
return self._put(
"/human_resources/%s" % base.getid(human_resource),
body,
"human_resource")
class CrudResource(base.Resource):
pass
class CrudResourceManager(base.CrudManager):
"""Manager class for manipulating Identity crud_resources."""
resource_class = CrudResource
collection_key = 'crud_resources'
key = 'crud_resource'
def get(self, crud_resource):
return super(CrudResourceManager, self).get(
crud_resource_id=base.getid(crud_resource))
class FakeHttpClient(fake_client.FakeHttpClient):
crud_resource_json = {"id": "1", "domain_id": "my-domain"}
def get_human_resources(self, **kw):
return (200, {}, {'human_resources': [
{'id': 1, 'name': '256 MB Server'},
{'id': 2, 'name': '512 MB Server'},
{'id': 'aa1', 'name': '128 MB Server'}
]})
def get_human_resources_1(self, **kw):
res = self.get_human_resources()[2]['human_resources'][0]
return (200, {}, {'human_resource': res})
def put_human_resources_1(self, **kw):
kw = kw["json"]["human_resource"].copy()
kw["id"] = "1"
return (200, {}, {'human_resource': kw})
def post_crud_resources(self, **kw):
return (200, {}, {"crud_resource": {"id": "1"}})
def get_crud_resources(self, **kw):
crud_resources = []
if kw.get("domain_id") == self.crud_resource_json["domain_id"]:
crud_resources = [self.crud_resource_json]
else:
crud_resources = []
return (200, {}, {"crud_resources": crud_resources})
def get_crud_resources_1(self, **kw):
return (200, {}, {"crud_resource": self.crud_resource_json})
def head_crud_resources_1(self, **kw):
return (204, {}, None)
def patch_crud_resources_1(self, **kw):
self.crud_resource_json.update(kw)
return (200, {}, {"crud_resource": self.crud_resource_json})
def delete_crud_resources_1(self, **kw):
return (202, {}, None)
class TestClient(client.BaseClient):
service_type = "test"
def __init__(self, http_client, extensions=None):
super(TestClient, self).__init__(
http_client, extensions=extensions)
self.human_resources = HumanResourceManager(self)
self.crud_resources = CrudResourceManager(self)
class ResourceTest(utils.BaseTestCase):
def test_resource_repr(self):
r = base.Resource(None, dict(foo="bar", baz="spam"))
self.assertEqual(repr(r), "<Resource baz=spam, foo=bar>")
def test_getid(self):
class TmpObject(base.Resource):
id = "4"
self.assertEqual(base.getid(TmpObject(None, {})), "4")
def test_human_id(self):
r = base.Resource(None, {"name": "1"})
self.assertEqual(r.human_id, None)
r = HumanResource(None, {"name": "1"})
self.assertEqual(r.human_id, "1")
class BaseManagerTest(utils.BaseTestCase):
def setUp(self):
super(BaseManagerTest, self).setUp()
self.http_client = FakeHttpClient()
self.tc = TestClient(self.http_client)
def test_resource_lazy_getattr(self):
f = HumanResource(self.tc.human_resources, {'id': 1})
self.assertEqual(f.name, '256 MB Server')
self.http_client.assert_called('GET', '/human_resources/1')
# Missing stuff still fails after a second get
self.assertRaises(AttributeError, getattr, f, 'blahblah')
def test_eq(self):
# Two resources of the same type with the same id: equal
r1 = base.Resource(None, {'id': 1, 'name': 'hi'})
r2 = base.Resource(None, {'id': 1, 'name': 'hello'})
self.assertEqual(r1, r2)
# Two resources of different types: never equal
r1 = base.Resource(None, {'id': 1})
r2 = HumanResource(None, {'id': 1})
self.assertNotEqual(r1, r2)
# Two resources with no ID: equal if their info is equal
r1 = base.Resource(None, {'name': 'joe', 'age': 12})
r2 = base.Resource(None, {'name': 'joe', 'age': 12})
self.assertEqual(r1, r2)
def test_findall_invalid_attribute(self):
# Make sure findall with an invalid attribute doesn't cause errors.
# The following should not raise an exception.
self.tc.human_resources.findall(vegetable='carrot')
# However, find() should raise an error
self.assertRaises(exceptions.NotFound,
self.tc.human_resources.find,
vegetable='carrot')
def test_update(self):
name = "new-name"
human_resource = self.tc.human_resources.update("1", name)
self.assertEqual(human_resource.id, "1")
self.assertEqual(human_resource.name, name)
class CrudManagerTest(utils.BaseTestCase):
domain_id = "my-domain"
crud_resource_id = "1"
def setUp(self):
super(CrudManagerTest, self).setUp()
self.http_client = FakeHttpClient()
self.tc = TestClient(self.http_client)
def test_create(self):
crud_resource = self.tc.crud_resources.create()
self.assertEqual(crud_resource.id, self.crud_resource_id)
def test_list(self, domain=None, user=None):
crud_resources = self.tc.crud_resources.list(
base_url=None,
domain_id=self.domain_id)
self.assertEqual(len(crud_resources), 1)
self.assertEqual(crud_resources[0].id, self.crud_resource_id)
self.assertEqual(crud_resources[0].domain_id, self.domain_id)
crud_resources = self.tc.crud_resources.list(
base_url=None,
domain_id="another-domain",
another_attr=None)
self.assertEqual(len(crud_resources), 0)
def test_get(self):
crud_resource = self.tc.crud_resources.get(self.crud_resource_id)
self.assertEqual(crud_resource.id, self.crud_resource_id)
fake_client.assert_has_keys(
crud_resource._info,
required=["id", "domain_id"],
optional=["missing-attr"])
def test_update(self):
crud_resource = self.tc.crud_resources.update(
crud_resource_id=self.crud_resource_id,
domain_id=self.domain_id)
self.assertEqual(crud_resource.id, self.crud_resource_id)
self.assertEqual(crud_resource.domain_id, self.domain_id)
def test_delete(self):
resp = self.tc.crud_resources.delete(
crud_resource_id=self.crud_resource_id)
self.assertEqual(resp.status_code, 202)
def test_head(self):
ret = self.tc.crud_resources.head(
crud_resource_id=self.crud_resource_id)
self.assertTrue(ret)

View File

@@ -0,0 +1,136 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import requests
from marconiclient.common.apiclient.auth import endpoint
from marconiclient.common.apiclient import client
from marconiclient.common.apiclient import exceptions
from tests import utils
class TestClient(client.BaseClient):
service_type = "test"
class FakeAuthPlugin(endpoint.EndpointTokenAuthPlugin):
auth_system = "fake"
attempt = 0
def authenticate(self, http_client):
http_client.token = "token-%s" % self.attempt
http_client.endpoint = "/endpoint-%s" % self.attempt
self.attempt = self.attempt + 1
class ClientTest(utils.BaseTestCase):
def test_client_with_timeout(self):
http_client = client.HttpClient(None, timeout=2)
self.assertEqual(http_client.timeout, 2)
mock_request = mock.Mock()
mock_request.return_value = requests.Response()
mock_request.return_value.status_code = 200
with mock.patch("requests.Session.request", mock_request):
http_client.request("GET", "/", json={"1": "2"})
requests.Session.request.assert_called_with(
"GET",
"/",
timeout=2,
headers=mock.ANY,
verify=mock.ANY,
data=mock.ANY)
def test_concat_url(self):
self.assertEqual(client.HttpClient.concat_url("/a", "/b"), "/a/b")
self.assertEqual(client.HttpClient.concat_url("/a", "b"), "/a/b")
self.assertEqual(client.HttpClient.concat_url("/a/", "/b"), "/a/b")
def test_client_request(self):
http_client = client.HttpClient(FakeAuthPlugin())
mock_request = mock.Mock()
mock_request.return_value = requests.Response()
mock_request.return_value.status_code = 200
with mock.patch("requests.Session.request", mock_request):
http_client.client_request(
TestClient(http_client), "GET", "/resource", json={"1": "2"})
requests.Session.request.assert_called_with(
"GET",
"/endpoint-0/resource",
headers={
"User-Agent": http_client.user_agent,
"Content-Type": "application/json",
"X-Auth-Token": "token-0"
},
data='{"1": "2"}',
verify=True)
def test_client_request_reissue(self):
reject_token = None
def fake_request(method, url, **kwargs):
if kwargs["headers"]["X-Auth-Token"] == reject_token:
raise exceptions.Unauthorized(method=method, url=url)
return "%s %s" % (method, url)
http_client = client.HttpClient(FakeAuthPlugin())
test_client = TestClient(http_client)
http_client.request = fake_request
self.assertEqual(
http_client.client_request(
test_client, "GET", "/resource"),
"GET /endpoint-0/resource")
reject_token = "token-0"
self.assertEqual(
http_client.client_request(
test_client, "GET", "/resource"),
"GET /endpoint-1/resource")
class FakeClient1(object):
pass
class FakeClient21(object):
pass
class GetClientClassTestCase(utils.BaseTestCase):
version_map = {
"1": "%s.FakeClient1" % __name__,
"2.1": "%s.FakeClient21" % __name__,
}
def test_get_int(self):
self.assertEqual(
client.BaseClient.get_class("fake", 1, self.version_map),
FakeClient1)
def test_get_str(self):
self.assertEqual(
client.BaseClient.get_class("fake", "2.1", self.version_map),
FakeClient21)
def test_unsupported_version(self):
self.assertRaises(
exceptions.UnsupportedVersion,
client.BaseClient.get_class,
"fake", "7", self.version_map)

View File

@@ -0,0 +1,67 @@
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tests import utils
from marconiclient.common.apiclient import exceptions
class FakeResponse(object):
json_data = {}
def __init__(self, **kwargs):
for key, value in kwargs.iteritems():
setattr(self, key, value)
def json(self):
return self.json_data
class ExceptionsArgsTest(utils.BaseTestCase):
def assert_exception(self, ex_cls, method, url, status_code, json_data):
ex = exceptions.from_response(
FakeResponse(status_code=status_code,
headers={"Content-Type": "application/json"},
json_data=json_data),
method,
url)
self.assertTrue(isinstance(ex, ex_cls))
self.assertEqual(ex.message, json_data["error"]["message"])
self.assertEqual(ex.details, json_data["error"]["details"])
self.assertEqual(ex.method, method)
self.assertEqual(ex.url, url)
self.assertEqual(ex.http_status, status_code)
def test_from_response_known(self):
method = "GET"
url = "/fake"
status_code = 400
json_data = {"error": {"message": "fake message",
"details": "fake details"}}
self.assert_exception(
exceptions.BadRequest, method, url, status_code, json_data)
def test_from_response_unknown(self):
method = "POST"
url = "/fake-unknown"
status_code = 499
json_data = {"error": {"message": "fake unknown message",
"details": "fake unknown details"}}
self.assert_exception(
exceptions.HttpClientError, method, url, status_code, json_data)
status_code = 600
self.assert_exception(
exceptions.HttpError, method, url, status_code, json_data)

View File

@@ -0,0 +1,567 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import mock
from marconiclient.common.apiclient import exceptions
from marconiclient.common import cliutils
from tests import utils
class ValidateArgsTest(utils.BaseTestCase):
def test_lambda_no_args(self):
cliutils.validate_args(lambda: None)
def _test_lambda_with_args(self, *args, **kwargs):
cliutils.validate_args(lambda x, y: None, *args, **kwargs)
def test_lambda_positional_args(self):
self._test_lambda_with_args(1, 2)
def test_lambda_kwargs(self):
self._test_lambda_with_args(x=1, y=2)
def test_lambda_mixed_kwargs(self):
self._test_lambda_with_args(1, y=2)
def test_lambda_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_lambda_with_args)
def test_lambda_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_lambda_with_args, 1)
def test_lambda_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_lambda_with_args, y=2)
def _test_lambda_with_default(self, *args, **kwargs):
cliutils.validate_args(lambda x, y, z=3: None, *args, **kwargs)
def test_lambda_positional_args_with_default(self):
self._test_lambda_with_default(1, 2)
def test_lambda_kwargs_with_default(self):
self._test_lambda_with_default(x=1, y=2)
def test_lambda_mixed_kwargs_with_default(self):
self._test_lambda_with_default(1, y=2)
def test_lambda_positional_args_all_with_default(self):
self._test_lambda_with_default(1, 2, 3)
def test_lambda_kwargs_all_with_default(self):
self._test_lambda_with_default(x=1, y=2, z=3)
def test_lambda_mixed_kwargs_all_with_default(self):
self._test_lambda_with_default(1, y=2, z=3)
def test_lambda_with_default_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_lambda_with_default)
def test_lambda_with_default_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_lambda_with_default, 1)
def test_lambda_with_default_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_lambda_with_default, y=2)
def test_lambda_with_default_missing_args4(self):
self.assertRaises(exceptions.MissingArgs,
self._test_lambda_with_default, y=2, z=3)
def test_function_no_args(self):
def func():
pass
cliutils.validate_args(func)
def _test_function_with_args(self, *args, **kwargs):
def func(x, y):
pass
cliutils.validate_args(func, *args, **kwargs)
def test_function_positional_args(self):
self._test_function_with_args(1, 2)
def test_function_kwargs(self):
self._test_function_with_args(x=1, y=2)
def test_function_mixed_kwargs(self):
self._test_function_with_args(1, y=2)
def test_function_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_function_with_args)
def test_function_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_function_with_args, 1)
def test_function_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_function_with_args, y=2)
def _test_function_with_default(self, *args, **kwargs):
def func(x, y, z=3):
pass
cliutils.validate_args(func, *args, **kwargs)
def test_function_positional_args_with_default(self):
self._test_function_with_default(1, 2)
def test_function_kwargs_with_default(self):
self._test_function_with_default(x=1, y=2)
def test_function_mixed_kwargs_with_default(self):
self._test_function_with_default(1, y=2)
def test_function_positional_args_all_with_default(self):
self._test_function_with_default(1, 2, 3)
def test_function_kwargs_all_with_default(self):
self._test_function_with_default(x=1, y=2, z=3)
def test_function_mixed_kwargs_all_with_default(self):
self._test_function_with_default(1, y=2, z=3)
def test_function_with_default_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_function_with_default)
def test_function_with_default_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_function_with_default, 1)
def test_function_with_default_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_function_with_default, y=2)
def test_function_with_default_missing_args4(self):
self.assertRaises(exceptions.MissingArgs,
self._test_function_with_default, y=2, z=3)
def test_bound_method_no_args(self):
class Foo:
def bar(self):
pass
cliutils.validate_args(Foo().bar)
def _test_bound_method_with_args(self, *args, **kwargs):
class Foo:
def bar(self, x, y):
pass
cliutils.validate_args(Foo().bar, *args, **kwargs)
def test_bound_method_positional_args(self):
self._test_bound_method_with_args(1, 2)
def test_bound_method_kwargs(self):
self._test_bound_method_with_args(x=1, y=2)
def test_bound_method_mixed_kwargs(self):
self._test_bound_method_with_args(1, y=2)
def test_bound_method_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_bound_method_with_args)
def test_bound_method_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_bound_method_with_args, 1)
def test_bound_method_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_bound_method_with_args, y=2)
def _test_bound_method_with_default(self, *args, **kwargs):
class Foo:
def bar(self, x, y, z=3):
pass
cliutils.validate_args(Foo().bar, *args, **kwargs)
def test_bound_method_positional_args_with_default(self):
self._test_bound_method_with_default(1, 2)
def test_bound_method_kwargs_with_default(self):
self._test_bound_method_with_default(x=1, y=2)
def test_bound_method_mixed_kwargs_with_default(self):
self._test_bound_method_with_default(1, y=2)
def test_bound_method_positional_args_all_with_default(self):
self._test_bound_method_with_default(1, 2, 3)
def test_bound_method_kwargs_all_with_default(self):
self._test_bound_method_with_default(x=1, y=2, z=3)
def test_bound_method_mixed_kwargs_all_with_default(self):
self._test_bound_method_with_default(1, y=2, z=3)
def test_bound_method_with_default_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_bound_method_with_default)
def test_bound_method_with_default_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_bound_method_with_default, 1)
def test_bound_method_with_default_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_bound_method_with_default, y=2)
def test_bound_method_with_default_missing_args4(self):
self.assertRaises(exceptions.MissingArgs,
self._test_bound_method_with_default, y=2, z=3)
def test_unbound_method_no_args(self):
class Foo:
def bar(self):
pass
cliutils.validate_args(Foo.bar, Foo())
def _test_unbound_method_with_args(self, *args, **kwargs):
class Foo:
def bar(self, x, y):
pass
cliutils.validate_args(Foo.bar, Foo(), *args, **kwargs)
def test_unbound_method_positional_args(self):
self._test_unbound_method_with_args(1, 2)
def test_unbound_method_kwargs(self):
self._test_unbound_method_with_args(x=1, y=2)
def test_unbound_method_mixed_kwargs(self):
self._test_unbound_method_with_args(1, y=2)
def test_unbound_method_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_unbound_method_with_args)
def test_unbound_method_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_unbound_method_with_args, 1)
def test_unbound_method_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_unbound_method_with_args, y=2)
def _test_unbound_method_with_default(self, *args, **kwargs):
class Foo:
def bar(self, x, y, z=3):
pass
cliutils.validate_args(Foo.bar, Foo(), *args, **kwargs)
def test_unbound_method_positional_args_with_default(self):
self._test_unbound_method_with_default(1, 2)
def test_unbound_method_kwargs_with_default(self):
self._test_unbound_method_with_default(x=1, y=2)
def test_unbound_method_mixed_kwargs_with_default(self):
self._test_unbound_method_with_default(1, y=2)
def test_unbound_method_with_default_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_unbound_method_with_default)
def test_unbound_method_with_default_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_unbound_method_with_default, 1)
def test_unbound_method_with_default_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_unbound_method_with_default, y=2)
def test_unbound_method_with_default_missing_args4(self):
self.assertRaises(exceptions.MissingArgs,
self._test_unbound_method_with_default, y=2, z=3)
def test_class_method_no_args(self):
class Foo:
@classmethod
def bar(cls):
pass
cliutils.validate_args(Foo.bar)
def _test_class_method_with_args(self, *args, **kwargs):
class Foo:
@classmethod
def bar(cls, x, y):
pass
cliutils.validate_args(Foo.bar, *args, **kwargs)
def test_class_method_positional_args(self):
self._test_class_method_with_args(1, 2)
def test_class_method_kwargs(self):
self._test_class_method_with_args(x=1, y=2)
def test_class_method_mixed_kwargs(self):
self._test_class_method_with_args(1, y=2)
def test_class_method_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_class_method_with_args)
def test_class_method_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_class_method_with_args, 1)
def test_class_method_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_class_method_with_args, y=2)
def _test_class_method_with_default(self, *args, **kwargs):
class Foo:
@classmethod
def bar(cls, x, y, z=3):
pass
cliutils.validate_args(Foo.bar, *args, **kwargs)
def test_class_method_positional_args_with_default(self):
self._test_class_method_with_default(1, 2)
def test_class_method_kwargs_with_default(self):
self._test_class_method_with_default(x=1, y=2)
def test_class_method_mixed_kwargs_with_default(self):
self._test_class_method_with_default(1, y=2)
def test_class_method_with_default_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_class_method_with_default)
def test_class_method_with_default_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_class_method_with_default, 1)
def test_class_method_with_default_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_class_method_with_default, y=2)
def test_class_method_with_default_missing_args4(self):
self.assertRaises(exceptions.MissingArgs,
self._test_class_method_with_default, y=2, z=3)
def test_static_method_no_args(self):
class Foo:
@staticmethod
def bar():
pass
cliutils.validate_args(Foo.bar)
def _test_static_method_with_args(self, *args, **kwargs):
class Foo:
@staticmethod
def bar(x, y):
pass
cliutils.validate_args(Foo.bar, *args, **kwargs)
def test_static_method_positional_args(self):
self._test_static_method_with_args(1, 2)
def test_static_method_kwargs(self):
self._test_static_method_with_args(x=1, y=2)
def test_static_method_mixed_kwargs(self):
self._test_static_method_with_args(1, y=2)
def test_static_method_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_static_method_with_args)
def test_static_method_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_static_method_with_args, 1)
def test_static_method_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_static_method_with_args, y=2)
def _test_static_method_with_default(self, *args, **kwargs):
class Foo:
@staticmethod
def bar(x, y, z=3):
pass
cliutils.validate_args(Foo.bar, *args, **kwargs)
def test_static_method_positional_args_with_default(self):
self._test_static_method_with_default(1, 2)
def test_static_method_kwargs_with_default(self):
self._test_static_method_with_default(x=1, y=2)
def test_static_method_mixed_kwargs_with_default(self):
self._test_static_method_with_default(1, y=2)
def test_static_method_with_default_missing_args1(self):
self.assertRaises(exceptions.MissingArgs,
self._test_static_method_with_default)
def test_static_method_with_default_missing_args2(self):
self.assertRaises(exceptions.MissingArgs,
self._test_static_method_with_default, 1)
def test_static_method_with_default_missing_args3(self):
self.assertRaises(exceptions.MissingArgs,
self._test_static_method_with_default, y=2)
def test_static_method_with_default_missing_args4(self):
self.assertRaises(exceptions.MissingArgs,
self._test_static_method_with_default, y=2, z=3)
class _FakeResult(object):
def __init__(self, name, value):
self.name = name
self.value = value
class PrintResultTestCase(utils.BaseTestCase):
def setUp(self):
super(PrintResultTestCase, self).setUp()
self.mock_add_row = mock.MagicMock()
self.useFixture(fixtures.MonkeyPatch(
"prettytable.PrettyTable.add_row",
self.mock_add_row))
self.mock_get_string = mock.MagicMock(return_value="")
self.useFixture(fixtures.MonkeyPatch(
"prettytable.PrettyTable.get_string",
self.mock_get_string))
def test_print_list_sort_by_str(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k3", 2),
_FakeResult("k2", 3)]
cliutils.print_list(objs, ["Name", "Value"], sortby_index=0)
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["k1", 1]),
mock.call(["k3", 2]),
mock.call(["k2", 3])])
self.mock_get_string.assert_called_with(sortby="Name")
def test_print_list_sort_by_integer(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k2", 3),
_FakeResult("k3", 2)]
cliutils.print_list(objs, ["Name", "Value"], sortby_index=1)
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["k1", 1]),
mock.call(["k2", 3]),
mock.call(["k3", 2])])
self.mock_get_string.assert_called_with(sortby="Value")
def test_print_list_sort_by_none(self):
objs = [_FakeResult("k1", 1),
_FakeResult("k3", 3),
_FakeResult("k2", 2)]
cliutils.print_list(objs, ["Name", "Value"], sortby_index=None)
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["k1", 1]),
mock.call(["k3", 3]),
mock.call(["k2", 2])])
self.mock_get_string.assert_called_with()
def test_print_dict(self):
cliutils.print_dict({"K": "k", "Key": "Value"})
cliutils.print_dict({"K": "k", "Key": "Long\\nValue"})
self.assertEqual(self.mock_add_row.call_args_list,
[mock.call(["K", "k"]),
mock.call(["Key", "Value"]),
mock.call(["K", "k"]),
mock.call(["Key", "Long"]),
mock.call(["", "Value"])])
class DecoratorsTestCase(utils.BaseTestCase):
def test_arg(self):
func_args = [("--image", ), ("--flavor", )]
func_kwargs = [dict(default=None,
metavar="<image>"),
dict(default=None,
metavar="<flavor>")]
@cliutils.arg(*func_args[1], **func_kwargs[1])
@cliutils.arg(*func_args[0], **func_kwargs[0])
def dummy_func():
pass
self.assertTrue(hasattr(dummy_func, "arguments"))
self.assertEqual(len(dummy_func.arguments), 2)
for args_kwargs in zip(func_args, func_kwargs):
self.assertIn(args_kwargs, dummy_func.arguments)
def test_unauthenticated(self):
def dummy_func():
pass
self.assertFalse(cliutils.isunauthenticated(dummy_func))
dummy_func = cliutils.unauthenticated(dummy_func)
self.assertTrue(cliutils.isunauthenticated(dummy_func))
class EnvTestCase(utils.BaseTestCase):
def test_env(self):
env = {"alpha": "a", "beta": "b"}
self.useFixture(fixtures.MonkeyPatch("os.environ", env))
self.assertEqual(cliutils.env("beta"), env["beta"])
self.assertEqual(cliutils.env("beta", "alpha"), env["beta"])
self.assertEqual(cliutils.env("alpha", "beta"), env["alpha"])
self.assertEqual(cliutils.env("gamma", "beta"), env["beta"])
self.assertEqual(cliutils.env("gamma"), "")
self.assertEqual(cliutils.env("gamma", default="c"), "c")
class GetPasswordTestCase(utils.BaseTestCase):
def setUp(self):
super(GetPasswordTestCase, self).setUp()
class FakeFile(object):
def isatty(self):
return True
self.useFixture(fixtures.MonkeyPatch("sys.stdin", FakeFile()))
def test_get_password(self):
self.useFixture(fixtures.MonkeyPatch("getpass.getpass",
lambda prompt: "mellon"))
self.assertEqual(cliutils.get_password(), "mellon")
def test_get_password_verify(self):
env = {"OS_VERIFY_PASSWORD": "True"}
self.useFixture(fixtures.MonkeyPatch("os.environ", env))
self.useFixture(fixtures.MonkeyPatch("getpass.getpass",
lambda prompt: "mellon"))
self.assertEqual(cliutils.get_password(), "mellon")
self.useFixture(fixtures.MonkeyPatch("getpass.getpass",
lambda prompt: prompt))
self.assertIsNone(cliutils.get_password())

28
tests/utils.py Normal file
View File

@@ -0,0 +1,28 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010-2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Common utilities used in testing"""
import fixtures
import testtools
class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
self.useFixture(fixtures.Timeout(30, True))