Expunge the internal version of WSME

This change removes unused code and concludes the conversion of the
REST API from WSME based to plain JSON.

Change-Id: Ib04c759f86d9758b67a75648b5971f5a80c77ecb
Story: 1651346
Task: 10551
This commit is contained in:
Steve Baker 2020-09-10 14:04:26 +12:00
parent 000bc72cc2
commit f408d63adc
16 changed files with 15 additions and 3389 deletions

View File

@ -1,381 +0,0 @@
# Copyright 2011-2019 the WSME authors and contributors
# (See https://opendev.org/x/wsme/)
#
# This module is part of WSME and is also released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import decimal
import json
import logging
from dateutil import parser as dateparser
from ironic.api import types as atypes
from ironic.common import exception
LOG = logging.getLogger(__name__)
CONTENT_TYPE = 'application/json'
ACCEPT_CONTENT_TYPES = [
CONTENT_TYPE,
'text/javascript',
'application/javascript'
]
ENUM_TRUE = ('true', 't', 'yes', 'y', 'on', '1')
ENUM_FALSE = ('false', 'f', 'no', 'n', 'off', '0')
def fromjson_array(datatype, value):
if not isinstance(value, list):
raise ValueError("Value not a valid list: %s" % value)
return [fromjson(datatype.item_type, item) for item in value]
def fromjson_dict(datatype, value):
if not isinstance(value, dict):
raise ValueError("Value not a valid dict: %s" % value)
return dict((
(fromjson(datatype.key_type, item[0]),
fromjson(datatype.value_type, item[1]))
for item in value.items()))
def fromjson_bool(value):
if isinstance(value, (int, bool)):
return bool(value)
if value in ENUM_TRUE:
return True
if value in ENUM_FALSE:
return False
raise ValueError("Value not an unambiguous boolean: %s" % value)
def fromjson(datatype, value):
"""A generic converter from json base types to python datatype.
"""
if value is None:
return None
if isinstance(datatype, atypes.ArrayType):
return fromjson_array(datatype, value)
if isinstance(datatype, atypes.DictType):
return fromjson_dict(datatype, value)
if datatype is bytes:
if isinstance(value, (str, int, float)):
return str(value).encode('utf8')
return value
if datatype is str:
if isinstance(value, bytes):
return value.decode('utf-8')
return value
if datatype in (int, float):
return datatype(value)
if datatype is bool:
return fromjson_bool(value)
if datatype is decimal.Decimal:
return decimal.Decimal(value)
if datatype is datetime.datetime:
return dateparser.parse(value)
if atypes.iscomplex(datatype):
return fromjson_complex(datatype, value)
if atypes.isusertype(datatype):
return datatype.frombasetype(fromjson(datatype.basetype, value))
return value
def fromjson_complex(datatype, value):
obj = datatype()
attributes = atypes.list_attributes(datatype)
# Here we check that all the attributes in the value are also defined
# in our type definition, otherwise we raise an Error.
v_keys = set(value.keys())
a_keys = set(adef.name for adef in attributes)
if not v_keys <= a_keys:
raise exception.UnknownAttribute(None, v_keys - a_keys)
for attrdef in attributes:
if attrdef.name in value:
try:
val_fromjson = fromjson(attrdef.datatype,
value[attrdef.name])
except exception.UnknownAttribute as e:
e.add_fieldname(attrdef.name)
raise
if getattr(attrdef, 'readonly', False):
raise exception.InvalidInput(attrdef.name, val_fromjson,
"Cannot set read only field.")
setattr(obj, attrdef.key, val_fromjson)
elif attrdef.mandatory:
raise exception.InvalidInput(attrdef.name, None,
"Mandatory field missing.")
return atypes.validate_value(datatype, obj)
def parse(s, datatypes, bodyarg, encoding='utf8'):
jload = json.load
if not hasattr(s, 'read'):
if isinstance(s, bytes):
s = s.decode(encoding)
jload = json.loads
try:
jdata = jload(s)
except ValueError:
raise exception.ClientSideError("Request is not in valid JSON format")
if bodyarg:
argname = list(datatypes.keys())[0]
try:
kw = {argname: fromjson(datatypes[argname], jdata)}
except ValueError as e:
raise exception.InvalidInput(argname, jdata, e.args[0])
except exception.UnknownAttribute as e:
# We only know the fieldname at this level, not in the
# called function. We fill in this information here.
e.add_fieldname(argname)
raise
else:
kw = {}
extra_args = []
if not isinstance(jdata, dict):
raise exception.ClientSideError("Request must be a JSON dict")
for key in jdata:
if key not in datatypes:
extra_args.append(key)
else:
try:
kw[key] = fromjson(datatypes[key], jdata[key])
except ValueError as e:
raise exception.InvalidInput(key, jdata[key], e.args[0])
except exception.UnknownAttribute as e:
# We only know the fieldname at this level, not in the
# called function. We fill in this information here.
e.add_fieldname(key)
raise
if extra_args:
raise exception.UnknownArgument(', '.join(extra_args))
return kw
def from_param(datatype, value):
if datatype is datetime.datetime:
return dateparser.parse(value) if value else None
if isinstance(datatype, atypes.UserType):
return datatype.frombasetype(
from_param(datatype.basetype, value))
if isinstance(datatype, atypes.ArrayType):
if value is None:
return value
return [
from_param(datatype.item_type, item)
for item in value
]
return datatype(value) if value is not None else None
def from_params(datatype, params, path, hit_paths):
if isinstance(datatype, atypes.ArrayType):
return array_from_params(datatype, params, path, hit_paths)
if isinstance(datatype, atypes.UserType):
return usertype_from_params(datatype, params, path, hit_paths)
if path in params:
assert not isinstance(datatype, atypes.DictType), \
'DictType unsupported'
assert not atypes.iscomplex(datatype) or datatype is atypes.File, \
'complex type unsupported'
hit_paths.add(path)
return from_param(datatype, params[path])
return atypes.Unset
def array_from_params(datatype, params, path, hit_paths):
if hasattr(params, 'getall'):
# webob multidict
def getall(params, path):
return params.getall(path)
elif hasattr(params, 'getlist'):
# werkzeug multidict
def getall(params, path): # noqa
return params.getlist(path)
if path in params:
hit_paths.add(path)
return [
from_param(datatype.item_type, value)
for value in getall(params, path)]
return atypes.Unset
def usertype_from_params(datatype, params, path, hit_paths):
if path in params:
hit_paths.add(path)
value = from_param(datatype.basetype, params[path])
if value is not atypes.Unset:
return datatype.frombasetype(value)
return atypes.Unset
def args_from_args(funcdef, args, kwargs):
newargs = []
for argdef, arg in zip(funcdef.arguments[:len(args)], args):
try:
newargs.append(from_param(argdef.datatype, arg))
except Exception as e:
if isinstance(argdef.datatype, atypes.UserType):
datatype_name = argdef.datatype.name
elif isinstance(argdef.datatype, type):
datatype_name = argdef.datatype.__name__
else:
datatype_name = argdef.datatype.__class__.__name__
raise exception.InvalidInput(
argdef.name,
arg,
"unable to convert to %(datatype)s. Error: %(error)s" % {
'datatype': datatype_name, 'error': e})
newkwargs = {}
for argname, value in kwargs.items():
newkwargs[argname] = from_param(
funcdef.get_arg(argname).datatype, value
)
return newargs, newkwargs
def args_from_params(funcdef, params):
kw = {}
hit_paths = set()
for argdef in funcdef.arguments:
value = from_params(
argdef.datatype, params, argdef.name, hit_paths)
if value is not atypes.Unset:
kw[argdef.name] = value
paths = set(params.keys())
unknown_paths = paths - hit_paths
if '__body__' in unknown_paths:
unknown_paths.remove('__body__')
if not funcdef.ignore_extra_args and unknown_paths:
raise exception.UnknownArgument(', '.join(unknown_paths))
return [], kw
def args_from_body(funcdef, body, mimetype):
if funcdef.body_type is not None:
datatypes = {funcdef.arguments[-1].name: funcdef.body_type}
else:
datatypes = dict(((a.name, a.datatype) for a in funcdef.arguments))
if not body:
return (), {}
if mimetype == "application/x-www-form-urlencoded":
# the parameters should have been parsed in params
return (), {}
elif mimetype not in ACCEPT_CONTENT_TYPES:
raise exception.ClientSideError("Unknown mimetype: %s" % mimetype,
status_code=415)
try:
kw = parse(
body, datatypes, bodyarg=funcdef.body_type is not None
)
except exception.UnknownArgument:
if not funcdef.ignore_extra_args:
raise
kw = {}
return (), kw
def combine_args(funcdef, akw, allow_override=False):
newargs, newkwargs = [], {}
for args, kwargs in akw:
for i, arg in enumerate(args):
n = funcdef.arguments[i].name
if not allow_override and n in newkwargs:
raise exception.ClientSideError(
"Parameter %s was given several times" % n)
newkwargs[n] = arg
for name, value in kwargs.items():
n = str(name)
if not allow_override and n in newkwargs:
raise exception.ClientSideError(
"Parameter %s was given several times" % n)
newkwargs[n] = value
return newargs, newkwargs
def get_args(funcdef, args, kwargs, params, body, mimetype):
"""Combine arguments from multiple sources
Combine arguments from :
* the host framework args and kwargs
* the request params
* the request body
Note that the host framework args and kwargs can be overridden
by arguments from params of body
"""
# get the body from params if not given directly
if not body and '__body__' in params:
body = params['__body__']
# extract args from the host args and kwargs
from_args = args_from_args(funcdef, args, kwargs)
# extract args from the request parameters
from_params = args_from_params(funcdef, params)
# extract args from the request body
from_body = args_from_body(funcdef, body, mimetype)
# combine params and body arguments
from_params_and_body = combine_args(
funcdef,
(from_params, from_body)
)
args, kwargs = combine_args(
funcdef,
(from_args, from_params_and_body),
allow_override=True
)
check_arguments(funcdef, args, kwargs)
return args, kwargs
def check_arguments(funcdef, args, kw):
"""Check if some arguments are missing"""
assert len(args) == 0
for arg in funcdef.arguments:
if arg.mandatory and arg.name not in kw:
raise exception.MissingArgument(arg.name)

View File

@ -12,66 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import functools
from webob import exc
from ironic.api import types as atypes
from ironic.common.i18n import _
class AsDictMixin(object):
"""Mixin class adding an as_dict() method."""
def as_dict(self):
"""Render this object as a dict of its fields."""
def _attr_as_pod(attr):
"""Return an attribute as a Plain Old Data (POD) type."""
if isinstance(attr, list):
return [_attr_as_pod(item) for item in attr]
# Recursively evaluate objects that support as_dict().
try:
return attr.as_dict()
except AttributeError:
return attr
return dict((k, _attr_as_pod(getattr(self, k)))
for k in self.fields
if hasattr(self, k)
and getattr(self, k) != atypes.Unset)
class Base(AsDictMixin):
"""Base type for complex types"""
def __init__(self, **kw):
for key, value in kw.items():
if hasattr(self, key):
setattr(self, key, value)
def unset_fields_except(self, except_list=None):
"""Unset fields so they don't appear in the message body.
:param except_list: A list of fields that won't be touched.
"""
if except_list is None:
except_list = []
for k in self.as_dict():
if k not in except_list:
setattr(self, k, atypes.Unset)
class APIBase(Base):
created_at = atypes.wsattr(datetime.datetime, readonly=True)
"""The time in UTC at which the object is created"""
updated_at = atypes.wsattr(datetime.datetime, readonly=True)
"""The time in UTC at which the object is updated"""
@functools.total_ordering
class Version(object):
"""API Version object."""

View File

@ -14,9 +14,7 @@
# under the License.
from ironic import api
from ironic.api.controllers import base
from ironic.api.controllers import link
from ironic.api import types as atypes
def has_next(collection, limit):
@ -87,30 +85,3 @@ def get_next(collection, limit, url=None, key_field='uuid', **kwargs):
return link.make_link('next', api.request.public_url,
url, next_args)['href']
class Collection(base.Base):
next = str
"""A link to retrieve the next subset of the collection"""
@property
def collection(self):
return getattr(self, self._type)
@classmethod
def get_key_field(cls):
return 'uuid'
def has_next(self, limit):
"""Return whether collection has more items."""
return has_next(self.collection, limit)
def get_next(self, limit, url=None, **kwargs):
"""Return a link to the next subset of the collection."""
resource_url = url or self._type
the_next = get_next(self.collection, limit, url=resource_url,
key_field=self.get_key_field(), **kwargs)
if the_next is None:
return atypes.Unset
return the_next

View File

@ -18,7 +18,6 @@ from oslo_messaging import exceptions as oslo_msg_exc
from oslo_utils import excutils
from oslo_versionedobjects import exception as oslo_vo_exc
from ironic.api import types as atypes
from ironic.common import exception
from ironic.common.i18n import _
from ironic.objects import allocation as allocation_objects
@ -71,9 +70,7 @@ def _emit_api_notification(context, obj, action, level, status, **kwargs):
:param kwargs: kwargs to use when creating the notification payload.
"""
resource = obj.__class__.__name__.lower()
# value atypes.Unset can be passed from API representation of resource
extra_args = {k: (v if v != atypes.Unset else None)
for k, v in kwargs.items()}
extra_args = kwargs
try:
try:
if action == 'maintenance_set':

View File

@ -1,31 +0,0 @@
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ironic.api.controllers import base
class State(base.APIBase):
current = str
"""The current state"""
target = str
"""The user modified desired state"""
available = [str]
"""A list of available states it is able to transition to"""
links = None
"""A list containing a self link and associated state links"""

View File

@ -1,263 +0,0 @@
# coding: utf-8
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import json
from oslo_log import log
from oslo_utils import strutils
from oslo_utils import uuidutils
from ironic.api.controllers import base
from ironic.api.controllers.v1 import utils as v1_utils
from ironic.api import types as atypes
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import utils
LOG = log.getLogger(__name__)
class MacAddressType(atypes.UserType):
"""A simple MAC address type."""
basetype = str
name = 'macaddress'
@staticmethod
def validate(value):
return utils.validate_and_normalize_mac(value)
@staticmethod
def frombasetype(value):
if value is None:
return None
return MacAddressType.validate(value)
class UuidOrNameType(atypes.UserType):
"""A simple UUID or logical name type."""
basetype = str
name = 'uuid_or_name'
@staticmethod
def validate(value):
if not (uuidutils.is_uuid_like(value)
or v1_utils.is_valid_logical_name(value)):
raise exception.InvalidUuidOrName(name=value)
return value
@staticmethod
def frombasetype(value):
if value is None:
return None
return UuidOrNameType.validate(value)
class NameType(atypes.UserType):
"""A simple logical name type."""
basetype = str
name = 'name'
@staticmethod
def validate(value):
if not v1_utils.is_valid_logical_name(value):
raise exception.InvalidName(name=value)
return value
@staticmethod
def frombasetype(value):
if value is None:
return None
return NameType.validate(value)
class UuidType(atypes.UserType):
"""A simple UUID type."""
basetype = str
name = 'uuid'
@staticmethod
def validate(value):
if not uuidutils.is_uuid_like(value):
raise exception.InvalidUUID(uuid=value)
return value
@staticmethod
def frombasetype(value):
if value is None:
return None
return UuidType.validate(value)
class BooleanType(atypes.UserType):
"""A simple boolean type."""
basetype = str
name = 'boolean'
@staticmethod
def validate(value):
try:
return strutils.bool_from_string(value, strict=True)
except ValueError as e:
# raise Invalid to return 400 (BadRequest) in the API
raise exception.Invalid(str(e))
@staticmethod
def frombasetype(value):
if value is None:
return None
return BooleanType.validate(value)
class JsonType(atypes.UserType):
"""A simple JSON type."""
basetype = str
name = 'json'
def __str__(self):
# These are the json serializable native types
return ' | '.join(map(str, (str, int, float,
BooleanType, list, dict, None)))
@staticmethod
def validate(value):
try:
json.dumps(value)
except TypeError:
raise exception.Invalid(_('%s is not JSON serializable') % value)
else:
return value
@staticmethod
def frombasetype(value):
return JsonType.validate(value)
class ListType(atypes.UserType):
"""A simple list type."""
basetype = str
name = 'list'
@staticmethod
def validate(value):
"""Validate and convert the input to a ListType.
:param value: A comma separated string of values
:returns: A list of unique values (lower-cased), maintaining the
same order
"""
items = []
for v in str(value).split(','):
v_norm = v.strip().lower()
if v_norm and v_norm not in items:
items.append(v_norm)
return items
@staticmethod
def frombasetype(value):
if value is None:
return None
return ListType.validate(value)
macaddress = MacAddressType()
uuid_or_name = UuidOrNameType()
name = NameType()
uuid = UuidType()
boolean = BooleanType()
listtype = ListType()
# Can't call it 'json' because that's the name of the stdlib module
jsontype = JsonType()
class JsonPatchType(base.Base):
"""A complex type that represents a single json-patch operation."""
path = atypes.wsattr(atypes.StringType(pattern='^(/[\\w-]+)+$'),
mandatory=True)
op = atypes.wsattr(atypes.Enum(str, 'add', 'replace', 'remove'),
mandatory=True)
value = atypes.wsattr(jsontype, default=atypes.Unset)
# The class of the objects being patched. Override this in subclasses.
# Should probably be a subclass of ironic.api.controllers.base.APIBase.
_api_base = None
# Attributes that are not required for construction, but which may not be
# removed if set. Override in subclasses if needed.
_extra_non_removable_attrs = set()
# Set of non-removable attributes, calculated lazily.
_non_removable_attrs = None
@staticmethod
def internal_attrs():
"""Returns a list of internal attributes.
Internal attributes can't be added, replaced or removed. This
method may be overwritten by derived class.
"""
return ['/created_at', '/id', '/links', '/updated_at', '/uuid']
@classmethod
def non_removable_attrs(cls):
"""Returns a set of names of attributes that may not be removed.
Attributes whose 'mandatory' property is True are automatically added
to this set. To add additional attributes to the set, override the
field _extra_non_removable_attrs in subclasses, with a set of the form
{'/foo', '/bar'}.
"""
if cls._non_removable_attrs is None:
cls._non_removable_attrs = cls._extra_non_removable_attrs.copy()
if cls._api_base:
fields = inspect.getmembers(cls._api_base,
lambda a: not inspect.isroutine(a))
for name, field in fields:
if getattr(field, 'mandatory', False):
cls._non_removable_attrs.add('/%s' % name)
return cls._non_removable_attrs
@staticmethod
def validate(patch):
_path = '/' + patch.path.split('/')[1]
if _path in patch.internal_attrs():
msg = _("'%s' is an internal attribute and can not be updated")
raise exception.ClientSideError(msg % patch.path)
if patch.path in patch.non_removable_attrs() and patch.op == 'remove':
msg = _("'%s' is a mandatory attribute and can not be removed")
raise exception.ClientSideError(msg % patch.path)
if patch.op != 'remove':
if patch.value is atypes.Unset:
msg = _("'add' and 'replace' operations need a value")
raise exception.ClientSideError(msg)
ret = {'path': patch.path, 'op': patch.op}
if patch.value is not atypes.Unset:
ret['value'] = patch.value
return ret

View File

@ -30,7 +30,6 @@ from pecan import rest
from ironic import api
from ironic.api.controllers import link
from ironic.api.controllers.v1 import versions
from ironic.api import types as atypes
from ironic.common import args
from ironic.common import exception
from ironic.common import faults
@ -677,6 +676,16 @@ def is_valid_logical_name(name):
return utils.is_valid_logical_name(name)
class PassthruResponse(object):
"""Object to hold the "response" from a passthru call"""
def __init__(self, obj, status_code=None):
#: Store the result object from the view
self.obj = obj
#: Store an optional status_code
self.status_code = status_code
def vendor_passthru(ident, method, topic, data=None, driver_passthru=False):
"""Call a vendor passthru API extension.
@ -719,7 +728,7 @@ def vendor_passthru(ident, method, topic, data=None, driver_passthru=False):
return_value = return_value.encode('utf-8')
return_value = io.BytesIO(return_value)
return atypes.PassthruResponse(return_value, status_code=status_code)
return PassthruResponse(return_value, status_code=status_code)
def check_for_invalid_fields(fields, object_fields):

View File

@ -1,222 +0,0 @@
#
# Copyright 2015 Rackspace, Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import functools
from http import client as http_client
import inspect
import json
import sys
import traceback
from oslo_config import cfg
from oslo_log import log
import pecan
from webob import static
from ironic.api import args as api_args
from ironic.api import functions
from ironic.api import types as atypes
LOG = log.getLogger(__name__)
class JSonRenderer(object):
@staticmethod
def __init__(path, extra_vars):
pass
@staticmethod
def render(template_path, namespace):
if 'faultcode' in namespace:
return encode_error(None, namespace)
result = encode_result(
namespace['result'],
namespace['datatype']
)
return result
pecan.templating._builtin_renderers['wsmejson'] = JSonRenderer
pecan_json_decorate = pecan.expose(
template='wsmejson:',
content_type='application/json',
generic=False)
def expose(*args, **kwargs):
sig = functions.signature(*args, **kwargs)
def decorate(f):
sig(f)
funcdef = functions.FunctionDefinition.get(f)
funcdef.resolve_types(atypes.registry)
@functools.wraps(f)
def callfunction(self, *args, **kwargs):
return_type = funcdef.return_type
try:
args, kwargs = api_args.get_args(
funcdef, args, kwargs, pecan.request.params,
pecan.request.body, pecan.request.content_type
)
result = f(self, *args, **kwargs)
# NOTE: Support setting of status_code with default 201
pecan.response.status = funcdef.status_code
if isinstance(result, atypes.PassthruResponse):
pecan.response.status = result.status_code
# NOTE(lucasagomes): If the return code is 204
# (No Response) we have to make sure that we are not
# returning anything in the body response and the
# content-length is 0
if result.status_code == 204:
return_type = None
if callable(getattr(result.obj, 'read', None)):
# Stream the files-like data directly to the response
pecan.response.app_iter = static.FileIter(result.obj)
return_type = None
result = None
else:
result = result.obj
except Exception:
try:
exception_info = sys.exc_info()
orig_exception = exception_info[1]
orig_code = getattr(orig_exception, 'code', None)
data = format_exception(
exception_info,
cfg.CONF.debug_tracebacks_in_api
)
finally:
del exception_info
if orig_code and orig_code in http_client.responses:
pecan.response.status = orig_code
else:
pecan.response.status = 500
return data
if return_type is None:
pecan.request.pecan['content_type'] = None
pecan.response.content_type = None
return ''
return dict(
datatype=return_type,
result=result
)
pecan_json_decorate(callfunction)
pecan.util._cfg(callfunction)['argspec'] = inspect.getfullargspec(f)
callfunction._wsme_definition = funcdef
return callfunction
return decorate
def tojson(datatype, value):
"""A generic converter from python to jsonify-able datatypes.
"""
if value is None:
return None
if isinstance(datatype, atypes.ArrayType):
return [tojson(datatype.item_type, item) for item in value]
if isinstance(datatype, atypes.DictType):
return dict((
(tojson(datatype.key_type, item[0]),
tojson(datatype.value_type, item[1]))
for item in value.items()
))
if isinstance(value, datetime.datetime):
return value.isoformat()
if atypes.iscomplex(datatype):
d = dict()
for attr in atypes.list_attributes(datatype):
attr_value = getattr(value, attr.key)
if attr_value is not atypes.Unset:
d[attr.name] = tojson(attr.datatype, attr_value)
return d
if isinstance(datatype, atypes.UserType):
return tojson(datatype.basetype, datatype.tobasetype(value))
return value
def encode_result(value, datatype, **options):
jsondata = tojson(datatype, value)
return json.dumps(jsondata)
def encode_error(context, errordetail):
return json.dumps(errordetail)
def format_exception(excinfo, debug=False):
"""Extract informations that can be sent to the client."""
error = excinfo[1]
code = getattr(error, 'code', None)
if code and code in http_client.responses and (400 <= code < 500):
faultstring = (error.faultstring if hasattr(error, 'faultstring')
else str(error))
faultcode = getattr(error, 'faultcode', 'Client')
r = dict(faultcode=faultcode,
faultstring=faultstring)
LOG.debug("Client-side error: %s", r['faultstring'])
r['debuginfo'] = None
return r
else:
faultstring = str(error)
debuginfo = "\n".join(traceback.format_exception(*excinfo))
LOG.error('Server-side error: "%s". Detail: \n%s',
faultstring, debuginfo)
faultcode = getattr(error, 'faultcode', 'Server')
r = dict(faultcode=faultcode, faultstring=faultstring)
if debug:
r['debuginfo'] = debuginfo
else:
r['debuginfo'] = None
return r
class validate(object):
"""Decorator that define the arguments types of a function.
Example::
class MyController(object):
@expose(str)
@validate(datetime.date, datetime.time)
def format(self, d, t):
return d.isoformat() + ' ' + t.isoformat()
"""
def __init__(self, *param_types):
self.param_types = param_types
def __call__(self, func):
argspec = functions.getargspec(func)
fd = functions.FunctionDefinition.get(func)
fd.set_arg_types(argspec, self.param_types)
return func

View File

@ -1,709 +0,0 @@
# coding: utf-8
#
# Copyright 2011-2019 the WSME authors and contributors
# (See https://opendev.org/x/wsme/)
#
# This module is part of WSME and is also released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import datetime
import decimal
import inspect
import re
import weakref
from oslo_log import log
from ironic.common import exception
LOG = log.getLogger(__name__)
pod_types = (int, bytes, str, float, bool)
native_types = pod_types + (datetime.datetime, decimal.Decimal)
_promotable_types = (int, str, bytes)
class ArrayType(object):
def __init__(self, item_type):
if iscomplex(item_type):
self._item_type = weakref.ref(item_type)
else:
self._item_type = item_type
def __hash__(self):
return hash(self.item_type)
def __eq__(self, other):
return isinstance(other, ArrayType) \
and self.item_type == other.item_type
def sample(self):
return [getattr(self.item_type, 'sample', self.item_type)()]
@property
def item_type(self):
if isinstance(self._item_type, weakref.ref):
return self._item_type()
else:
return self._item_type
def validate(self, value):
if value is None:
return
if not isinstance(value, list):
raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
self.item_type, type(value)
))
return [
validate_value(self.item_type, item)
for item in value
]
class DictType(object):
def __init__(self, key_type, value_type):
if key_type not in (int, bytes, str, float, bool):
raise ValueError("Dictionaries key can only be a pod type")
self.key_type = key_type
if iscomplex(value_type):
self._value_type = weakref.ref(value_type)
else:
self._value_type = value_type
def __hash__(self):
return hash((self.key_type, self.value_type))
def sample(self):
key = getattr(self.key_type, 'sample', self.key_type)()
value = getattr(self.value_type, 'sample', self.value_type)()
return {key: value}
@property
def value_type(self):
if isinstance(self._value_type, weakref.ref):
return self._value_type()
else:
return self._value_type
def validate(self, value):
if not isinstance(value, dict):
raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
self.key_type, self.value_type, type(value)
))
return dict((
(
validate_value(self.key_type, key),
validate_value(self.value_type, v)
) for key, v in value.items()
))
class UserType(object):
basetype = None
name = None
def validate(self, value):
return value
def tobasetype(self, value):
return value
def frombasetype(self, value):
return value
def isusertype(class_):
return isinstance(class_, UserType)
class BinaryType(UserType):
"""A user type that use base64 strings to carry binary data.
"""
basetype = bytes
name = 'binary'
def tobasetype(self, value):
if value is None:
return None
return base64.encodebytes(value)
def frombasetype(self, value):
if value is None:
return None
return base64.decodebytes(value)
#: The binary almost-native type
binary = BinaryType()
class IntegerType(UserType):
"""A simple integer type. Can validate a value range.
:param minimum: Possible minimum value
:param maximum: Possible maximum value
Example::
Price = IntegerType(minimum=1)
"""
basetype = int
name = "integer"
def __init__(self, minimum=None, maximum=None):
self.minimum = minimum
self.maximum = maximum
@staticmethod
def frombasetype(value):
return int(value) if value is not None else None
def validate(self, value):
if self.minimum is not None and value < self.minimum:
error = 'Value should be greater or equal to %s' % self.minimum
raise ValueError(error)
if self.maximum is not None and value > self.maximum:
error = 'Value should be lower or equal to %s' % self.maximum
raise ValueError(error)
return value
class StringType(UserType):
"""A simple string type. Can validate a length and a pattern.
:param min_length: Possible minimum length
:param max_length: Possible maximum length
:param pattern: Possible string pattern
Example::
Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$')
"""
basetype = str
name = "string"
def __init__(self, min_length=None, max_length=None, pattern=None):
self.min_length = min_length
self.max_length = max_length
if isinstance(pattern, str):
self.pattern = re.compile(pattern)
else:
self.pattern = pattern
def validate(self, value):
if not isinstance(value, self.basetype):
error = 'Value should be string'
raise ValueError(error)
if self.min_length is not None and len(value) < self.min_length:
error = 'Value should have a minimum character requirement of %s' \
% self.min_length
raise ValueError(error)
if self.max_length is not None and len(value) > self.max_length:
error = 'Value should have a maximum character requirement of %s' \
% self.max_length
raise ValueError(error)
if self.pattern is not None and not self.pattern.search(value):
error = 'Value should match the pattern %s' % self.pattern.pattern
raise ValueError(error)
return value
class Enum(UserType):
"""A simple enumeration type. Can be based on any non-complex type.
:param basetype: The actual data type
:param values: A set of possible values
If nullable, 'None' should be added the values set.
Example::
Gender = Enum(str, 'male', 'female')
Specie = Enum(str, 'cat', 'dog')
"""
def __init__(self, basetype, *values, **kw):
self.basetype = basetype
self.values = set(values)
name = kw.pop('name', None)
if name is None:
name = "Enum(%s)" % ', '.join((str(v) for v in values))
self.name = name
def validate(self, value):
if value not in self.values:
raise ValueError("Value should be one of: %s" %
', '.join(map(str, self.values)))
return value
def tobasetype(self, value):
return value
def frombasetype(self, value):
return value
class UnsetType(object):
def __bool__(self):
return False
def __repr__(self):
return 'Unset'
Unset = UnsetType()
def validate_value(datatype, value):
if value in (Unset, None) or datatype is None:
return value
# Try to promote the data type to one of our complex types.
if isinstance(datatype, list):
datatype = ArrayType(datatype[0])
elif isinstance(datatype, dict):
datatype = DictType(*list(datatype.items())[0])
# If the datatype has its own validator, use that.
if hasattr(datatype, 'validate'):
return datatype.validate(value)
# Do type promotion/conversion and data validation for builtin
# types.
v_type = type(value)
if datatype == int:
if v_type in _promotable_types:
try:
# Try to turn the value into an int
value = datatype(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is float and v_type in _promotable_types:
try:
value = float(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is str and isinstance(value, bytes):
value = value.decode()
elif datatype is bytes and isinstance(value, str):
value = value.encode()
if not isinstance(value, datatype):
raise ValueError(
"Wrong type. Expected '%s', got '%s'" % (
datatype, v_type
))
return value
def iscomplex(datatype):
return inspect.isclass(datatype) \
and '_wsme_attributes' in datatype.__dict__
class wsproperty(property):
"""A specialised :class:`property` to define typed-property on complex types.
Example::
class MyComplexType(Base):
def get_aint(self):
return self._aint
def set_aint(self, value):
assert avalue < 10 # Dummy input validation
self._aint = value
aint = wsproperty(int, get_aint, set_aint, mandatory=True)
"""
def __init__(self, datatype, fget, fset=None,
mandatory=False, doc=None, name=None):
property.__init__(self, fget, fset)
#: The property name in the parent python class
self.key = None
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
#: property data type
self.datatype = datatype
#: True if the property is mandatory
self.mandatory = mandatory
class wsattr(object):
"""Complex type attribute definition.
Example::
class MyComplexType(ctypes.Base):
optionalvalue = int
mandatoryvalue = wsattr(int, mandatory=True)
named_value = wsattr(int, name='named.value')
After inspection, the non-wsattr attributes will be replaced, and
the above class will be equivalent to::
class MyComplexType(ctypes.Base):
optionalvalue = wsattr(int)
mandatoryvalue = wsattr(int, mandatory=True)
"""
def __init__(self, datatype, mandatory=False, name=None, default=Unset,
readonly=False):
#: The attribute name in the parent python class.
#: Set by :func:`inspect_class`
self.key = None # will be set by class inspection
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
self._datatype = (datatype,)
#: True if the attribute is mandatory
self.mandatory = mandatory
#: Default value. The attribute will return this instead
#: of :data:`Unset` if no value has been set.
self.default = default
#: If True value cannot be set from json/xml input data
self.readonly = readonly
self.complextype = None
def _get_dataholder(self, instance):
dataholder = getattr(instance, '_wsme_dataholder', None)
if dataholder is None:
dataholder = instance._wsme_DataHolderClass()
instance._wsme_dataholder = dataholder
return dataholder
def __get__(self, instance, owner):
if instance is None:
return self
return getattr(
self._get_dataholder(instance),
self.key,
self.default
)
def __set__(self, instance, value):
try:
value = validate_value(self.datatype, value)
except (ValueError, TypeError) as e:
raise exception.InvalidInput(self.name, value, str(e))
dataholder = self._get_dataholder(instance)
if value is Unset:
if hasattr(dataholder, self.key):
delattr(dataholder, self.key)
else:
setattr(dataholder, self.key, value)
def __delete__(self, instance):
self.__set__(instance, Unset)
def _get_datatype(self):
if isinstance(self._datatype, tuple):
self._datatype = \
self.complextype().__registry__.resolve_type(self._datatype[0])
if isinstance(self._datatype, weakref.ref):
return self._datatype()
if isinstance(self._datatype, list):
return [
item() if isinstance(item, weakref.ref) else item
for item in self._datatype
]
return self._datatype
def _set_datatype(self, datatype):
self._datatype = datatype
#: attribute data type. Can be either an actual type,
#: or a type name, in which case the actual type will be
#: determined when needed (generally just before scanning the api).
datatype = property(_get_datatype, _set_datatype)
def iswsattr(attr):
if inspect.isfunction(attr) or inspect.ismethod(attr):
return False
if isinstance(attr, property) and not isinstance(attr, wsproperty):
return False
return True
def sort_attributes(class_, attributes):
"""Sort a class attributes list.
3 mechanisms are attempted :
#. Look for a _wsme_attr_order attribute on the class. This allow
to define an arbitrary order of the attributes (useful for
generated types).
#. Access the object source code to find the declaration order.
#. Sort by alphabetically
"""
if not len(attributes):
return
attrs = dict((a.key, a) for a in attributes)
if hasattr(class_, '_wsme_attr_order'):
names_order = class_._wsme_attr_order
else:
names = attrs.keys()
names_order = []
try:
lines = []
for cls in inspect.getmro(class_):
if cls is object:
continue
lines[len(lines):] = inspect.getsourcelines(cls)[0]
for line in lines:
line = line.strip().replace(" ", "")
if '=' in line:
aname = line[:line.index('=')]
if aname in names and aname not in names_order:
names_order.append(aname)
if len(names_order) < len(names):
names_order.extend((
name for name in names if name not in names_order))
assert len(names_order) == len(names)
except (TypeError, IOError):
names_order = list(names)
names_order.sort()
attributes[:] = [attrs[name] for name in names_order]
def inspect_class(class_):
"""Extract a list of (name, wsattr|wsproperty) for the given class"""
attributes = []
for name, attr in inspect.getmembers(class_, iswsattr):
if name.startswith('_'):
continue
if inspect.isroutine(attr):
continue
if isinstance(attr, (wsattr, wsproperty)):
attrdef = attr
else:
if (attr not in native_types
and (inspect.isclass(attr) or isinstance(attr, (list, dict)))):
register_type(attr)
attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr)
attrdef.key = name
if attrdef.name is None:
attrdef.name = name
attrdef.complextype = weakref.ref(class_)
attributes.append(attrdef)
setattr(class_, name, attrdef)
sort_attributes(class_, attributes)
return attributes
def list_attributes(class_):
"""Returns a list of a complex type attributes."""
if not iscomplex(class_):
raise TypeError("%s is not a registered type")
return class_._wsme_attributes
def make_dataholder(class_):
# the slots are computed outside the class scope to avoid
# 'attr' to pullute the class namespace, which leads to weird
# things if one of the slots is named 'attr'.
slots = [attr.key for attr in class_._wsme_attributes]
class DataHolder(object):
__slots__ = slots
DataHolder.__name__ = class_.__name__ + 'DataHolder'
return DataHolder
class Registry(object):
def __init__(self):
self._complex_types = []
self.array_types = set()
self.dict_types = set()
@property
def complex_types(self):
return [t() for t in self._complex_types if t()]
def register(self, class_):
"""Make sure a type is registered.
It is automatically called by :class:`expose() <expose.expose>`
and :class:`validate() <expose.validate>`.
Unless you want to control when the class inspection is done there
is no need to call it.
"""
if class_ is None or \
class_ in native_types or \
isinstance(class_, UserType) or iscomplex(class_) or \
isinstance(class_, ArrayType) or isinstance(class_, DictType):
return class_
if isinstance(class_, list):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = ArrayType(class_[0])
self.register(dt.item_type)
self.array_types.add(dt)
return dt
if isinstance(class_, dict):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = DictType(*list(class_.items())[0])
self.register(dt.value_type)
self.dict_types.add(dt)
return dt
class_._wsme_attributes = None
class_._wsme_attributes = inspect_class(class_)
class_._wsme_DataHolderClass = make_dataholder(class_)
class_.__registry__ = self
self._complex_types.append(weakref.ref(class_))
return class_
def reregister(self, class_):
"""Register a type which may already have been registered.
"""
self._unregister(class_)
return self.register(class_)
def _unregister(self, class_):
"""Remove a previously registered type.
"""
# Clear the existing attribute reference so it is rebuilt if
# the class is registered again later.
if hasattr(class_, '_wsme_attributes'):
del class_._wsme_attributes
# FIXME(dhellmann): This method does not recurse through the
# types like register() does. Should it?
if isinstance(class_, list):
at = ArrayType(class_[0])
try:
self.array_types.remove(at)
except KeyError:
pass
elif isinstance(class_, dict):
key_type, value_type = list(class_.items())[0]
self.dict_types = set(
dt for dt in self.dict_types
if (dt.key_type, dt.value_type) != (key_type, value_type)
)
# We can't use remove() here because the items in
# _complex_types are weakref objects pointing to the classes,
# so we can't compare with them directly.
self._complex_types = [
ct for ct in self._complex_types
if ct() is not class_
]
def lookup(self, typename):
LOG.debug('Lookup %s', typename)
modname = None
if '.' in typename:
modname, typename = typename.rsplit('.', 1)
for ct in self._complex_types:
ct = ct()
if ct is not None and typename == ct.__name__ and (
modname is None or modname == ct.__module__):
return ct
def resolve_type(self, type_):
if isinstance(type_, str):
return self.lookup(type_)
if isinstance(type_, list):
type_ = ArrayType(type_[0])
if isinstance(type_, dict):
type_ = DictType(list(type_.keys())[0], list(type_.values())[0])
if isinstance(type_, ArrayType):
type_ = ArrayType(self.resolve_type(type_.item_type))
self.array_types.add(type_)
elif isinstance(type_, DictType):
type_ = DictType(
type_.key_type,
self.resolve_type(type_.value_type)
)
self.dict_types.add(type_)
else:
type_ = self.register(type_)
return type_
# Default type registry
registry = Registry()
def register_type(class_):
return registry.register(class_)
class BaseMeta(type):
def __new__(cls, name, bases, dct):
if bases and bases[0] is not object and '__registry__' not in dct:
dct['__registry__'] = registry
return type.__new__(cls, name, bases, dct)
def __init__(cls, name, bases, dct):
if bases and bases[0] is not object and cls.__registry__:
cls.__registry__.register(cls)
class Base(metaclass=BaseMeta):
"""Base type for complex types"""
def __init__(self, **kw):
for key, value in kw.items():
if hasattr(self, key):
setattr(self, key, value)
class PassthruResponse(object):
"""Object to hold the "response" from a passthru call"""
def __init__(self, obj, status_code=None):
#: Store the result object from the view
self.obj = obj
#: Store an optional status_code
self.status_code = status_code

View File

@ -1,315 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from http import client as http_client
from importlib import machinery
import inspect
import json
import os
import sys
from unittest import mock
from oslo_utils import uuidutils
import pecan.rest
import pecan.testing
from ironic.api.controllers import root
from ironic.api.controllers import v1
from ironic.api import expose
from ironic.api import types as atypes
from ironic.common import exception
from ironic.tests import base as test_base
from ironic.tests.unit.api import base as test_api_base
class TestExposedAPIMethodsCheckPolicy(test_base.TestCase):
"""Ensure that all exposed HTTP endpoints call authorize."""
def setUp(self):
super(TestExposedAPIMethodsCheckPolicy, self).setUp()
self.original_method = sys.modules['ironic.api.expose'].expose
self.exposed_methods = set()
def expose_and_track(*args, **kwargs):
def wrap(f):
if f not in self.exposed_methods:
self.exposed_methods.add(f)
e = self.original_method(*args, **kwargs)
return e(f)
return wrap
p = mock.patch('ironic.api.expose.expose', expose_and_track)
p.start()
self.addCleanup(p.stop)
def _test(self, module):
module_path = os.path.abspath(sys.modules[module].__file__)
machinery.SourceFileLoader(uuidutils.generate_uuid(),
module_path).load_module()
expected_calls = [
'api_utils.check_node_policy_and_retrieve',
'api_utils.check_list_policy',
'api_utils.check_multiple_node_policies_and_retrieve',
'self._get_node_and_topic',
'api_utils.check_port_policy_and_retrieve',
'api_utils.check_port_list_policy',
'self._authorize_patch_and_get_node',
]
for func in self.exposed_methods:
src = inspect.getsource(func)
self.assertTrue(
any(call in src for call in expected_calls)
or ('policy.authorize' in src
and 'context.to_policy_values' in src),
'no policy check found in in exposed method %s' % func)
def test_chassis_api_policy(self):
self._test('ironic.api.controllers.v1.chassis')
def test_driver_api_policy(self):
self._test('ironic.api.controllers.v1.driver')
def test_node_api_policy(self):
self._test('ironic.api.controllers.v1.node')
def test_port_api_policy(self):
self._test('ironic.api.controllers.v1.port')
def test_portgroup_api_policy(self):
self._test('ironic.api.controllers.v1.portgroup')
def test_ramdisk_api_policy(self):
self._test('ironic.api.controllers.v1.ramdisk')
def test_conductor_api_policy(self):
self._test('ironic.api.controllers.v1.conductor')
class UnderscoreStr(atypes.UserType):
basetype = str
name = "custom string"
def tobasetype(self, value):
return '__' + value
class Obj(atypes.Base):
id = int
name = str
unset_me = str
class NestedObj(atypes.Base):
o = Obj
class TestJsonRenderer(test_base.TestCase):
def setUp(self):
super(TestJsonRenderer, self).setUp()
self.renderer = expose.JSonRenderer('/', None)
def test_render_error(self):
error_dict = {
'faultcode': 500,
'faultstring': 'ouch'
}
self.assertEqual(
error_dict,
json.loads(self.renderer.render('/', error_dict))
)
def test_render_exception(self):
error_dict = {
'faultcode': 'Server',
'faultstring': 'ouch',
'debuginfo': None
}
try:
raise Exception('ouch')
except Exception:
excinfo = sys.exc_info()
self.assertEqual(
json.dumps(error_dict),
self.renderer.render('/', expose.format_exception(excinfo))
)
def test_render_http_exception(self):
error_dict = {
'faultcode': '403',
'faultstring': 'Not authorized',
'debuginfo': None
}
try:
e = exception.NotAuthorized()
e.code = 403
except exception.IronicException:
excinfo = sys.exc_info()
self.assertEqual(
json.dumps(error_dict),
self.renderer.render('/', expose.format_exception(excinfo))
)
def test_render_int(self):
self.assertEqual(
'42',
self.renderer.render('/', {
'result': 42,
'datatype': int
})
)
def test_render_none(self):
self.assertEqual(
'null',
self.renderer.render('/', {
'result': None,
'datatype': str
})
)
def test_render_str(self):
self.assertEqual(
'"a string"',
self.renderer.render('/', {
'result': 'a string',
'datatype': str
})
)
def test_render_datetime(self):
self.assertEqual(
'"2020-04-14T10:35:10.586431"',
self.renderer.render('/', {
'result': datetime.datetime(2020, 4, 14, 10, 35, 10, 586431),
'datatype': datetime.datetime
})
)
def test_render_array(self):
self.assertEqual(
json.dumps(['one', 'two', 'three']),
self.renderer.render('/', {
'result': ['one', 'two', 'three'],
'datatype': atypes.ArrayType(str)
})
)
def test_render_dict(self):
self.assertEqual(
json.dumps({'one': 'a', 'two': 'b', 'three': 'c'}),
self.renderer.render('/', {
'result': {'one': 'a', 'two': 'b', 'three': 'c'},
'datatype': atypes.DictType(str, str)
})
)
def test_complex_type(self):
o = Obj()
o.id = 1
o.name = 'one'
o.unset_me = atypes.Unset
n = NestedObj()
n.o = o
self.assertEqual(
json.dumps({'o': {'id': 1, 'name': 'one'}}),
self.renderer.render('/', {
'result': n,
'datatype': NestedObj
})
)
def test_user_type(self):
self.assertEqual(
'"__foo"',
self.renderer.render('/', {
'result': 'foo',
'datatype': UnderscoreStr()
})
)
class MyThingController(pecan.rest.RestController):
_custom_actions = {
'no_content': ['GET'],
'response_content': ['GET'],
'ouch': ['GET'],
}
@expose.expose(int, str, int)
def get(self, name, number):
return {name: number}
@expose.expose(str)
def no_content(self):
return atypes.PassthruResponse('nothing', status_code=204)
@expose.expose(str)
def response_content(self):
return atypes.PassthruResponse('nothing', status_code=200)
@expose.expose(str)
def ouch(self):
raise Exception('ouch')
class MyV1Controller(v1.Controller):
things = MyThingController()
class MyRootController(root.RootController):
v1 = MyV1Controller()
class TestExpose(test_api_base.BaseApiTest):
block_execute = False
root_controller = '%s.%s' % (MyRootController.__module__,
MyRootController.__name__)
def test_expose(self):
self.assertEqual(
{'foo': 1},
self.get_json('/things/', name='foo', number=1)
)
def test_response_204(self):
response = self.get_json('/things/no_content', expect_errors=True)
self.assertEqual(http_client.NO_CONTENT, response.status_int)
self.assertIsNone(response.content_type)
self.assertEqual(b'', response.normal_body)
def test_response_content(self):
response = self.get_json('/things/response_content',
expect_errors=True)
self.assertEqual(http_client.OK, response.status_int)
self.assertEqual(b'"nothing"', response.normal_body)
self.assertEqual('application/json', response.content_type)
def test_exception(self):
response = self.get_json('/things/ouch',
expect_errors=True)
error_message = json.loads(response.json['error_message'])
self.assertEqual(http_client.INTERNAL_SERVER_ERROR,
response.status_int)
self.assertEqual('application/json', response.content_type)
self.assertEqual('Server', error_message['faultcode'])
self.assertEqual('ouch', error_message['faultstring'])

View File

@ -17,7 +17,6 @@ from unittest import mock
from oslo_utils import uuidutils
from ironic.api.controllers.v1 import notification_utils as notif_utils
from ironic.api import types as atypes
from ironic.objects import fields
from ironic.objects import notification
from ironic.tests import base as tests_base
@ -97,17 +96,6 @@ class APINotifyTestCase(tests_base.TestCase):
self.assertEqual('******', node.driver_info['password'])
self.assertEqual('fake-value', node.driver_info['some_value'])
def test_notification_uuid_unset(self):
node = obj_utils.get_test_node(self.context)
test_level = fields.NotificationLevel.INFO
test_status = fields.NotificationStatus.SUCCESS
notif_utils._emit_api_notification(self.context, node, 'create',
test_level, test_status,
chassis_uuid=atypes.Unset)
init_kwargs = self.node_notify_mock.call_args[1]
payload = init_kwargs['payload']
self.assertIsNone(payload.chassis_uuid)
def test_chassis_notification(self):
chassis = obj_utils.get_test_chassis(self.context,
extra={'foo': 'boo'},

View File

@ -1,291 +0,0 @@
# coding: utf-8
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from http import client as http_client
from unittest import mock
from pecan import rest
from ironic.api.controllers.v1 import types
from ironic.api import expose
from ironic.api import types as atypes
from ironic.common import exception
from ironic.common import utils
from ironic.tests import base
from ironic.tests.unit.api import base as api_base
class TestMacAddressType(base.TestCase):
def test_valid_mac_addr(self):
test_mac = 'aa:bb:cc:11:22:33'
with mock.patch.object(utils, 'validate_and_normalize_mac') as m_mock:
types.MacAddressType.validate(test_mac)
m_mock.assert_called_once_with(test_mac)
def test_invalid_mac_addr(self):
self.assertRaises(exception.InvalidMAC,
types.MacAddressType.validate, 'invalid-mac')
class TestUuidType(base.TestCase):
def test_valid_uuid(self):
test_uuid = '1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e'
self.assertEqual(test_uuid, types.UuidType.validate(test_uuid))
def test_invalid_uuid(self):
self.assertRaises(exception.InvalidUUID,
types.UuidType.validate, 'invalid-uuid')
@mock.patch("ironic.api.request")
class TestNameType(base.TestCase):
def test_valid_name(self, mock_pecan_req):
mock_pecan_req.version.minor = 10
test_name = 'hal-9000'
self.assertEqual(test_name, types.NameType.validate(test_name))
def test_invalid_name(self, mock_pecan_req):
mock_pecan_req.version.minor = 10
self.assertRaises(exception.InvalidName,
types.NameType.validate, '-this is not valid-')
@mock.patch("ironic.api.request")
class TestUuidOrNameType(base.TestCase):
def test_valid_uuid(self, mock_pecan_req):
mock_pecan_req.version.minor = 10
test_uuid = '1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e'
self.assertTrue(types.UuidOrNameType.validate(test_uuid))
def test_valid_name(self, mock_pecan_req):
mock_pecan_req.version.minor = 10
test_name = 'dc16-database5'
self.assertTrue(types.UuidOrNameType.validate(test_name))
def test_invalid_uuid_or_name(self, mock_pecan_req):
mock_pecan_req.version.minor = 10
self.assertRaises(exception.InvalidUuidOrName,
types.UuidOrNameType.validate, 'inval#uuid%or*name')
class MyBaseType(object):
"""Helper class, patched by objects of type MyPatchType"""
mandatory = atypes.wsattr(str, mandatory=True)
class MyPatchType(types.JsonPatchType):
"""Helper class for TestJsonPatchType tests."""
_api_base = MyBaseType
_extra_non_removable_attrs = {'/non_removable'}
@staticmethod
def internal_attrs():
return ['/internal']
class MyTest(rest.RestController):
"""Helper class for TestJsonPatchType tests."""
@expose.validate([MyPatchType])
@expose.expose([str], body=[MyPatchType])
def patch(self, patch):
return patch
class MyRoot(rest.RestController):
test = MyTest()
class TestJsonPatchType(api_base.BaseApiTest):
root_controller = ('ironic.tests.unit.api.controllers.v1.'
'test_types.MyRoot')
def setUp(self):
super(TestJsonPatchType, self).setUp()
def _patch_json(self, params, expect_errors=False):
return self.app.patch_json('/test', params=params,
headers={'Accept': 'application/json'},
expect_errors=expect_errors)
def test_valid_patches(self):
valid_patches = [{'path': '/extra/foo', 'op': 'remove'},
{'path': '/extra/foo', 'op': 'add', 'value': 'bar'},
{'path': '/str', 'op': 'replace', 'value': 'bar'},
{'path': '/bool', 'op': 'add', 'value': True},
{'path': '/int', 'op': 'add', 'value': 1},
{'path': '/float', 'op': 'add', 'value': 0.123},
{'path': '/list', 'op': 'add', 'value': [1, 2]},
{'path': '/none', 'op': 'add', 'value': None},
{'path': '/empty_dict', 'op': 'add', 'value': {}},
{'path': '/empty_list', 'op': 'add', 'value': []},
{'path': '/dict', 'op': 'add',
'value': {'cat': 'meow'}}]
ret = self._patch_json(valid_patches, False)
self.assertEqual(http_client.OK, ret.status_int)
self.assertCountEqual(valid_patches, ret.json)
def test_cannot_update_internal_attr(self):
patch = [{'path': '/internal', 'op': 'replace', 'value': 'foo'}]
ret = self._patch_json(patch, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_cannot_update_internal_dict_attr(self):
patch = [{'path': '/internal/test', 'op': 'replace',
'value': 'foo'}]
ret = self._patch_json(patch, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_mandatory_attr(self):
patch = [{'op': 'replace', 'path': '/mandatory', 'value': 'foo'}]
ret = self._patch_json(patch, False)
self.assertEqual(http_client.OK, ret.status_int)
self.assertEqual(patch, ret.json)
def test_cannot_remove_mandatory_attr(self):
patch = [{'op': 'remove', 'path': '/mandatory'}]
ret = self._patch_json(patch, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_cannot_remove_extra_non_removable_attr(self):
patch = [{'op': 'remove', 'path': '/non_removable'}]
ret = self._patch_json(patch, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_missing_required_fields_path(self):
missing_path = [{'op': 'remove'}]
ret = self._patch_json(missing_path, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_missing_required_fields_op(self):
missing_op = [{'path': '/foo'}]
ret = self._patch_json(missing_op, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_invalid_op(self):
patch = [{'path': '/foo', 'op': 'invalid'}]
ret = self._patch_json(patch, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_invalid_path(self):
patch = [{'path': 'invalid-path', 'op': 'remove'}]
ret = self._patch_json(patch, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_cannot_add_with_no_value(self):
patch = [{'path': '/extra/foo', 'op': 'add'}]
ret = self._patch_json(patch, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
def test_cannot_replace_with_no_value(self):
patch = [{'path': '/foo', 'op': 'replace'}]
ret = self._patch_json(patch, True)
self.assertEqual(http_client.BAD_REQUEST, ret.status_int)
self.assertTrue(ret.json['error_message'])
class TestBooleanType(base.TestCase):
def test_valid_true_values(self):
v = types.BooleanType()
self.assertTrue(v.validate("true"))
self.assertTrue(v.validate("TRUE"))
self.assertTrue(v.validate("True"))
self.assertTrue(v.validate("t"))
self.assertTrue(v.validate("1"))
self.assertTrue(v.validate("y"))
self.assertTrue(v.validate("yes"))
self.assertTrue(v.validate("on"))
def test_valid_false_values(self):
v = types.BooleanType()
self.assertFalse(v.validate("false"))
self.assertFalse(v.validate("FALSE"))
self.assertFalse(v.validate("False"))
self.assertFalse(v.validate("f"))
self.assertFalse(v.validate("0"))
self.assertFalse(v.validate("n"))
self.assertFalse(v.validate("no"))
self.assertFalse(v.validate("off"))
def test_invalid_value(self):
v = types.BooleanType()
self.assertRaises(exception.Invalid, v.validate, "invalid-value")
self.assertRaises(exception.Invalid, v.validate, "01")
class TestJsonType(base.TestCase):
def test_valid_values(self):
vt = types.jsontype
value = vt.validate("hello")
self.assertEqual("hello", value)
value = vt.validate(10)
self.assertEqual(10, value)
value = vt.validate(0.123)
self.assertEqual(0.123, value)
value = vt.validate(True)
self.assertTrue(value)
value = vt.validate([1, 2, 3])
self.assertEqual([1, 2, 3], value)
value = vt.validate({'foo': 'bar'})
self.assertEqual({'foo': 'bar'}, value)
value = vt.validate(None)
self.assertIsNone(value)
def test_invalid_values(self):
vt = types.jsontype
self.assertRaises(exception.Invalid, vt.validate, object())
def test_apimultitype_tostring(self):
vts = str(types.jsontype)
self.assertIn(str(str), vts)
self.assertIn(str(int), vts)
self.assertIn(str(float), vts)
self.assertIn(str(types.BooleanType), vts)
self.assertIn(str(list), vts)
self.assertIn(str(dict), vts)
self.assertIn(str(None), vts)
class TestListType(base.TestCase):
def test_list_type(self):
v = types.ListType()
self.assertEqual(['foo', 'bar'], v.validate('foo,bar'))
self.assertNotEqual(['bar', 'foo'], v.validate('foo,bar'))
self.assertEqual(['cat', 'meow'], v.validate("cat , meow"))
self.assertEqual(['spongebob', 'squarepants'],
v.validate("SpongeBob,SquarePants"))
self.assertEqual(['foo', 'bar'], v.validate("foo, ,,bar"))
self.assertEqual(['foo', 'bar'], v.validate("foo,foo,foo,bar"))
self.assertIsInstance(v.validate('foo,bar'), list)

View File

@ -25,7 +25,6 @@ from oslo_utils import uuidutils
from ironic import api
from ironic.api.controllers.v1 import node as api_node
from ironic.api.controllers.v1 import utils
from ironic.api import types as atypes
from ironic.common import exception
from ironic.common import policy
from ironic.common import states
@ -943,7 +942,7 @@ class TestVendorPassthru(base.TestCase):
passthru_mock.assert_called_once_with(
'fake-context', 'fake-ident', 'squarepants', 'POST',
'fake-data', 'fake-topic')
self.assertIsInstance(response, atypes.PassthruResponse)
self.assertIsInstance(response, utils.PassthruResponse)
self.assertEqual('SpongeBob', response.obj)
sc = http_client.ACCEPTED if async_call else http_client.OK
self.assertEqual(sc, response.status_code)
@ -979,7 +978,7 @@ class TestVendorPassthru(base.TestCase):
self.assertIsInstance(response.obj, io.BytesIO)
self.assertEqual(expct_return_value, response.obj.read())
# Assert response message is none
self.assertIsInstance(response, atypes.PassthruResponse)
self.assertIsInstance(response, utils.PassthruResponse)
self.assertEqual(http_client.OK, response.status_code)
def test_vendor_passthru_attach(self):

View File

@ -1,506 +0,0 @@
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import decimal
import io
from webob import multidict
from ironic.api import args
from ironic.api.controllers.v1 import types
from ironic.api import functions
from ironic.api import types as atypes
from ironic.common import exception
from ironic.tests import base as test_base
class Obj(atypes.Base):
id = atypes.wsattr(int, mandatory=True)
name = str
readonly_field = atypes.wsattr(str, readonly=True)
default_field = atypes.wsattr(str, default='foo')
unset_me = str
class NestedObj(atypes.Base):
o = Obj
class TestArgs(test_base.TestCase):
def test_fromjson_array(self):
atype = atypes.ArrayType(int)
self.assertEqual(
[0, 1, 1234, None],
args.fromjson_array(atype, [0, '1', '1_234', None])
)
self.assertRaises(ValueError, args.fromjson_array,
atype, ['one', 'two', 'three'])
self.assertRaises(ValueError, args.fromjson_array,
atype, 'one')
def test_fromjson_dict(self):
dtype = atypes.DictType(str, int)
self.assertEqual({
'zero': 0,
'one': 1,
'etc': 1234,
'none': None
}, args.fromjson_dict(dtype, {
'zero': 0,
'one': '1',
'etc': '1_234',
'none': None
}))
self.assertRaises(ValueError, args.fromjson_dict,
dtype, [])
self.assertRaises(ValueError, args.fromjson_dict,
dtype, {'one': 'one'})
def test_fromjson_bool(self):
for b in (1, 2, True, 'true', 't', 'yes', 'y', 'on', '1'):
self.assertTrue(args.fromjson_bool(b))
for b in (0, False, 'false', 'f', 'no', 'n', 'off', '0'):
self.assertFalse(args.fromjson_bool(b))
for b in ('yup', 'yeet', 'NOPE', 3.14):
self.assertRaises(ValueError, args.fromjson_bool, b)
def test_fromjson(self):
# parse None
self.assertIsNone(args.fromjson(None, None))
# parse array
atype = atypes.ArrayType(int)
self.assertEqual(
[0, 1, 1234, None],
args.fromjson(atype, [0, '1', '1_234', None])
)
# parse dict
dtype = atypes.DictType(str, int)
self.assertEqual({
'zero': 0,
'one': 1,
'etc': 1234,
'none': None
}, args.fromjson(dtype, {
'zero': 0,
'one': '1',
'etc': '1_234',
'none': None
}))
# parse bytes
self.assertEqual(
b'asdf',
args.fromjson(bytes, b'asdf')
)
self.assertEqual(
b'asdf',
args.fromjson(bytes, 'asdf')
)
self.assertEqual(
b'33',
args.fromjson(bytes, 33)
)
self.assertEqual(
b'3.14',
args.fromjson(bytes, 3.14)
)
# parse str
self.assertEqual(
'asdf',
args.fromjson(str, b'asdf')
)
self.assertEqual(
'asdf',
args.fromjson(str, 'asdf')
)
# parse int/float
self.assertEqual(
3,
args.fromjson(int, '3')
)
self.assertEqual(
3,
args.fromjson(int, 3)
)
self.assertEqual(
3.14,
args.fromjson(float, 3.14)
)
# parse bool
self.assertFalse(args.fromjson(bool, 'no'))
self.assertTrue(args.fromjson(bool, 'yes'))
# parse decimal
self.assertEqual(
decimal.Decimal(3.14),
args.fromjson(decimal.Decimal, 3.14)
)
# parse datetime
expected = datetime.datetime(2015, 8, 13, 11, 38, 9, 496475)
self.assertEqual(
expected,
args.fromjson(datetime.datetime, '2015-08-13T11:38:09.496475')
)
# parse complex
n = args.fromjson(NestedObj, {'o': {
'id': 1234,
'name': 'an object'
}})
self.assertIsInstance(n.o, Obj)
self.assertEqual(1234, n.o.id)
self.assertEqual('an object', n.o.name)
self.assertEqual('foo', n.o.default_field)
# parse usertype
self.assertEqual(
['0', '1', '2', 'three'],
args.fromjson(types.listtype, '0,1, 2, three')
)
def test_fromjson_complex(self):
n = args.fromjson_complex(NestedObj, {'o': {
'id': 1234,
'name': 'an object'
}})
self.assertIsInstance(n.o, Obj)
self.assertEqual(1234, n.o.id)
self.assertEqual('an object', n.o.name)
self.assertEqual('foo', n.o.default_field)
e = self.assertRaises(exception.UnknownAttribute,
args.fromjson_complex,
Obj, {'ooo': {}})
self.assertEqual({'ooo'}, e.attributes)
e = self.assertRaises(exception.InvalidInput, args.fromjson_complex,
Obj,
{'name': 'an object'})
self.assertEqual('id', e.fieldname)
self.assertEqual('Mandatory field missing.', e.msg)
e = self.assertRaises(exception.InvalidInput, args.fromjson_complex,
Obj,
{'id': 1234, 'readonly_field': 'foo'})
self.assertEqual('readonly_field', e.fieldname)
self.assertEqual('Cannot set read only field.', e.msg)
def test_parse(self):
# source as bytes
s = b'{"o": {"id": 1234, "name": "an object"}}'
# test bodyarg=True
n = args.parse(s, {"o": NestedObj}, True)['o']
self.assertEqual(1234, n.o.id)
self.assertEqual('an object', n.o.name)
# source as file
s = io.StringIO('{"o": {"id": 1234, "name": "an object"}}')
# test bodyarg=False
n = args.parse(s, {"o": Obj}, False)['o']
self.assertEqual(1234, n.id)
self.assertEqual('an object', n.name)
# fromjson ValueError
s = '{"o": ["id", "name"]}'
self.assertRaises(exception.InvalidInput, args.parse,
s, {"o": atypes.DictType(str, str)}, False)
s = '["id", "name"]'
self.assertRaises(exception.InvalidInput, args.parse,
s, {"o": atypes.DictType(str, str)}, True)
# fromjson UnknownAttribute
s = '{"o": {"foo": "bar", "id": 1234, "name": "an object"}}'
self.assertRaises(exception.UnknownAttribute, args.parse,
s, {"o": NestedObj}, True)
self.assertRaises(exception.UnknownAttribute, args.parse,
s, {"o": Obj}, False)
# invalid json
s = '{Sunn O)))}'
self.assertRaises(exception.ClientSideError, args.parse,
s, {"o": Obj}, False)
# extra args
s = '{"foo": "bar", "o": {"id": 1234, "name": "an object"}}'
self.assertRaises(exception.UnknownArgument, args.parse,
s, {"o": Obj}, False)
def test_from_param(self):
# datetime param
expected = datetime.datetime(2015, 8, 13, 11, 38, 9, 496475)
self.assertEqual(
expected,
args.from_param(datetime.datetime, '2015-08-13T11:38:09.496475')
)
self.assertIsNone(args.from_param(datetime.datetime, None))
# usertype param
self.assertEqual(
['0', '1', '2', 'three'],
args.from_param(types.listtype, '0,1, 2, three')
)
# array param
atype = atypes.ArrayType(int)
self.assertEqual(
[0, 1, 1234, None],
args.from_param(atype, [0, '1', '1_234', None])
)
self.assertIsNone(args.from_param(atype, None))
# string param
self.assertEqual('foo', args.from_param(str, 'foo'))
self.assertIsNone(args.from_param(str, None))
# string param with from_params
hit_paths = set()
params = multidict.MultiDict(
foo='bar',
)
self.assertEqual(
'bar',
args.from_params(str, params, 'foo', hit_paths)
)
self.assertEqual({'foo'}, hit_paths)
def test_array_from_params(self):
hit_paths = set()
datatype = atypes.ArrayType(str)
params = multidict.MultiDict(
foo='bar',
one='two'
)
self.assertEqual(
['bar'],
args.from_params(datatype, params, 'foo', hit_paths)
)
self.assertEqual({'foo'}, hit_paths)
self.assertEqual(
['two'],
args.array_from_params(datatype, params, 'one', hit_paths)
)
self.assertEqual({'foo', 'one'}, hit_paths)
def test_usertype_from_params(self):
hit_paths = set()
datatype = types.listtype
params = multidict.MultiDict(
foo='0,1, 2, three',
)
self.assertEqual(
['0', '1', '2', 'three'],
args.usertype_from_params(datatype, params, 'foo', hit_paths)
)
self.assertEqual(
['0', '1', '2', 'three'],
args.from_params(datatype, params, 'foo', hit_paths)
)
self.assertEqual(
atypes.Unset,
args.usertype_from_params(datatype, params, 'bar', hit_paths)
)
def test_args_from_args(self):
fromargs = ['one', 2, [0, '1', '2_34']]
fromkwargs = {'foo': '1, 2, 3'}
@functions.signature(str, str, int, atypes.ArrayType(int),
types.listtype)
def myfunc(self, first, second, third, foo):
pass
funcdef = functions.FunctionDefinition.get(myfunc)
newargs, newkwargs = args.args_from_args(funcdef, fromargs, fromkwargs)
self.assertEqual(['one', 2, [0, 1, 234]], newargs)
self.assertEqual({'foo': ['1', '2', '3']}, newkwargs)
def test_args_from_params(self):
@functions.signature(str, str, int, atypes.ArrayType(int),
types.listtype)
def myfunc(self, first, second, third, foo):
pass
funcdef = functions.FunctionDefinition.get(myfunc)
params = multidict.MultiDict(
foo='0,1, 2, three',
third='1',
second='2'
)
self.assertEqual(
([], {'foo': ['0', '1', '2', 'three'], 'second': 2, 'third': [1]}),
args.args_from_params(funcdef, params)
)
# unexpected param
params = multidict.MultiDict(bar='baz')
self.assertRaises(exception.UnknownArgument, args.args_from_params,
funcdef, params)
# no params plus a body
params = multidict.MultiDict(__body__='')
self.assertEqual(
([], {}),
args.args_from_params(funcdef, params)
)
def test_args_from_body(self):
@functions.signature(str, body=NestedObj)
def myfunc(self, nested):
pass
funcdef = functions.FunctionDefinition.get(myfunc)
mimetype = 'application/json'
body = b'{"o": {"id": 1234, "name": "an object"}}'
newargs, newkwargs = args.args_from_body(funcdef, body, mimetype)
self.assertEqual(1234, newkwargs['nested'].o.id)
self.assertEqual('an object', newkwargs['nested'].o.name)
self.assertEqual(
((), {}),
args.args_from_body(funcdef, None, mimetype)
)
self.assertRaises(exception.ClientSideError, args.args_from_body,
funcdef, body, 'application/x-corba')
self.assertEqual(
((), {}),
args.args_from_body(funcdef, body,
'application/x-www-form-urlencoded')
)
def test_combine_args(self):
@functions.signature(str, str, int)
def myfunc(self, first, second,):
pass
funcdef = functions.FunctionDefinition.get(myfunc)
# empty
self.assertEqual(
([], {}),
args.combine_args(
funcdef, (
([], {}),
([], {}),
)
)
)
# combine kwargs
self.assertEqual(
([], {'first': 'one', 'second': 'two'}),
args.combine_args(
funcdef, (
([], {}),
([], {'first': 'one', 'second': 'two'}),
)
)
)
# combine mixed args
self.assertEqual(
([], {'first': 'one', 'second': 'two'}),
args.combine_args(
funcdef, (
(['one'], {}),
([], {'second': 'two'}),
)
)
)
# override kwargs
self.assertEqual(
([], {'first': 'two'}),
args.combine_args(
funcdef, (
([], {'first': 'one'}),
([], {'first': 'two'}),
),
allow_override=True
)
)
# override args
self.assertEqual(
([], {'first': 'two', 'second': 'three'}),
args.combine_args(
funcdef, (
(['one', 'three'], {}),
(['two'], {}),
),
allow_override=True
)
)
# can't override args
self.assertRaises(exception.ClientSideError, args.combine_args,
funcdef,
((['one'], {}), (['two'], {})))
# can't override kwargs
self.assertRaises(exception.ClientSideError, args.combine_args,
funcdef,
(([], {'first': 'one'}), ([], {'first': 'two'})))
def test_get_args(self):
@functions.signature(str, str, int, atypes.ArrayType(int),
types.listtype, body=NestedObj)
def myfunc(self, first, second, third, foo, nested):
pass
funcdef = functions.FunctionDefinition.get(myfunc)
params = multidict.MultiDict(
foo='0,1, 2, three',
second='2'
)
mimetype = 'application/json'
body = b'{"o": {"id": 1234, "name": "an object"}}'
fromargs = ['one']
fromkwargs = {'third': '1'}
newargs, newkwargs = args.get_args(funcdef, fromargs, fromkwargs,
params, body, mimetype)
self.assertEqual([], newargs)
n = newkwargs.pop('nested')
self.assertEqual({
'first': 'one',
'foo': ['0', '1', '2', 'three'],
'second': 2,
'third': [1]},
newkwargs
)
self.assertEqual(1234, n.o.id)
self.assertEqual('an object', n.o.name)
# check_arguments missing mandatory argument 'second'
params = multidict.MultiDict(
foo='0,1, 2, three',
)
self.assertRaises(exception.MissingArgument, args.get_args,
funcdef, fromargs, fromkwargs,
params, body, mimetype)

View File

@ -22,7 +22,6 @@ from ironic import api
from ironic.api.controllers import root
from ironic.api.controllers import v1
from ironic.api import method
from ironic.api import types as atypes
from ironic.common import args
from ironic.tests.unit.api import base as test_api_base
@ -48,7 +47,7 @@ class MyThingController(pecan.rest.RestController):
@method.expose()
def response_content(self):
resp = atypes.PassthruResponse('nothing', status_code=200)
resp = v1.utils.PassthruResponse('nothing', status_code=200)
api.response.status_code = resp.status_code
return resp.obj

View File

@ -1,566 +0,0 @@
# coding: utf-8
#
# Copyright 2011-2019 the WSME authors and contributors
# (See https://opendev.org/x/wsme/)
#
# This module is part of WSME and is also released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from ironic.api import types
from ironic.common import exception as exc
from ironic.tests import base as test_base
def gen_class():
d = {}
exec('''class tmp(object): pass''', d)
return d['tmp']
class TestTypes(test_base.TestCase):
def setUp(self):
super(TestTypes, self).setUp()
types.registry = types.Registry()
def test_default_usertype(self):
class MyType(types.UserType):
basetype = str
My = MyType()
assert My.validate('a') == 'a'
assert My.tobasetype('a') == 'a'
assert My.frombasetype('a') == 'a'
def test_unset(self):
u = types.Unset
assert not u
def test_flat_type(self):
class Flat(object):
aint = int
abytes = bytes
atext = str
afloat = float
types.register_type(Flat)
assert len(Flat._wsme_attributes) == 4
attrs = Flat._wsme_attributes
print(attrs)
assert attrs[0].key == 'aint'
assert attrs[0].name == 'aint'
assert isinstance(attrs[0], types.wsattr)
assert attrs[0].datatype == int
assert attrs[0].mandatory is False
assert attrs[1].key == 'abytes'
assert attrs[1].name == 'abytes'
assert attrs[2].key == 'atext'
assert attrs[2].name == 'atext'
assert attrs[3].key == 'afloat'
assert attrs[3].name == 'afloat'
def test_private_attr(self):
class WithPrivateAttrs(object):
_private = 12
types.register_type(WithPrivateAttrs)
assert len(WithPrivateAttrs._wsme_attributes) == 0
def test_attribute_order(self):
class ForcedOrder(object):
_wsme_attr_order = ('a2', 'a1', 'a3')
a1 = int
a2 = int
a3 = int
types.register_type(ForcedOrder)
print(ForcedOrder._wsme_attributes)
assert ForcedOrder._wsme_attributes[0].key == 'a2'
assert ForcedOrder._wsme_attributes[1].key == 'a1'
assert ForcedOrder._wsme_attributes[2].key == 'a3'
c = gen_class()
print(c)
types.register_type(c)
del c._wsme_attributes
c.a2 = int
c.a1 = int
c.a3 = int
types.register_type(c)
assert c._wsme_attributes[0].key == 'a1', c._wsme_attributes[0].key
assert c._wsme_attributes[1].key == 'a2'
assert c._wsme_attributes[2].key == 'a3'
def test_wsproperty(self):
class WithWSProp(object):
def __init__(self):
self._aint = 0
def get_aint(self):
return self._aint
def set_aint(self, value):
self._aint = value
aint = types.wsproperty(int, get_aint, set_aint, mandatory=True)
types.register_type(WithWSProp)
print(WithWSProp._wsme_attributes)
assert len(WithWSProp._wsme_attributes) == 1
a = WithWSProp._wsme_attributes[0]
assert a.key == 'aint'
assert a.datatype == int
assert a.mandatory
o = WithWSProp()
o.aint = 12
assert o.aint == 12
def test_nested(self):
class Inner(object):
aint = int
class Outer(object):
inner = Inner
types.register_type(Outer)
assert hasattr(Inner, '_wsme_attributes')
assert len(Inner._wsme_attributes) == 1
def test_inspect_with_inheritance(self):
class Parent(object):
parent_attribute = int
class Child(Parent):
child_attribute = int
types.register_type(Parent)
types.register_type(Child)
assert len(Child._wsme_attributes) == 2
def test_selfreftype(self):
class SelfRefType(object):
pass
SelfRefType.parent = SelfRefType
types.register_type(SelfRefType)
def test_inspect_with_property(self):
class AType(object):
@property
def test(self):
return 'test'
types.register_type(AType)
assert len(AType._wsme_attributes) == 0
assert AType().test == 'test'
def test_enum(self):
aenum = types.Enum(str, 'v1', 'v2')
assert aenum.basetype is str
class AType(object):
a = aenum
types.register_type(AType)
assert AType.a.datatype is aenum
obj = AType()
obj.a = 'v1'
assert obj.a == 'v1', repr(obj.a)
self.assertRaisesRegexp(exc.InvalidInput,
"Invalid input for field/attribute a. \
Value: 'v3'. Value should be one of: v., v.",
setattr,
obj,
'a',
'v3')
def test_attribute_validation(self):
class AType(object):
alist = [int]
aint = int
types.register_type(AType)
obj = AType()
obj.alist = [1, 2, 3]
assert obj.alist == [1, 2, 3]
obj.aint = 5
assert obj.aint == 5
self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', 12)
self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', [2, 'a'])
def test_attribute_validation_minimum(self):
class ATypeInt(object):
attr = types.IntegerType(minimum=1, maximum=5)
types.register_type(ATypeInt)
obj = ATypeInt()
obj.attr = 2
# comparison between 'zero' value and intger minimum (1) raises a
# TypeError which must be wrapped into an InvalidInput exception
self.assertRaises(exc.InvalidInput, setattr, obj, 'attr', 'zero')
def test_text_attribute_conversion(self):
class SType(object):
atext = str
abytes = bytes
types.register_type(SType)
obj = SType()
obj.atext = b'somebytes'
assert obj.atext == 'somebytes'
assert isinstance(obj.atext, str)
obj.abytes = 'sometext'
assert obj.abytes == b'sometext'
assert isinstance(obj.abytes, bytes)
def test_named_attribute(self):
class ABCDType(object):
a_list = types.wsattr([int], name='a.list')
astr = str
types.register_type(ABCDType)
assert len(ABCDType._wsme_attributes) == 2
attrs = ABCDType._wsme_attributes
assert attrs[0].key == 'a_list', attrs[0].key
assert attrs[0].name == 'a.list', attrs[0].name
assert attrs[1].key == 'astr', attrs[1].key
assert attrs[1].name == 'astr', attrs[1].name
def test_wsattr_del(self):
class MyType(object):
a = types.wsattr(int)
types.register_type(MyType)
value = MyType()
value.a = 5
assert value.a == 5
del value.a
assert value.a is types.Unset
def test_validate_dict(self):
assert types.validate_value({int: str}, {1: '1', 5: '5'})
self.assertRaises(ValueError, types.validate_value,
{int: str}, [])
assert types.validate_value({int: str}, {'1': '1', 5: '5'})
self.assertRaises(ValueError, types.validate_value,
{int: str}, {1: 1, 5: '5'})
def test_validate_list_valid(self):
assert types.validate_value([int], [1, 2])
assert types.validate_value([int], ['5'])
def test_validate_list_empty(self):
assert types.validate_value([int], []) == []
def test_validate_list_none(self):
v = types.ArrayType(int)
assert v.validate(None) is None
def test_validate_list_invalid_member(self):
self.assertRaises(ValueError, types.validate_value, [int],
['not-a-number'])
def test_validate_list_invalid_type(self):
self.assertRaises(ValueError, types.validate_value, [int], 1)
def test_validate_float(self):
self.assertEqual(types.validate_value(float, 1), 1.0)
self.assertEqual(types.validate_value(float, '1'), 1.0)
self.assertEqual(types.validate_value(float, 1.1), 1.1)
self.assertRaises(ValueError, types.validate_value, float, [])
self.assertRaises(ValueError, types.validate_value, float,
'not-a-float')
def test_validate_int(self):
self.assertEqual(types.validate_value(int, 1), 1)
self.assertEqual(types.validate_value(int, '1'), 1)
self.assertRaises(ValueError, types.validate_value, int, 1.1)
def test_validate_integer_type(self):
v = types.IntegerType(minimum=1, maximum=10)
v.validate(1)
v.validate(5)
v.validate(10)
self.assertRaises(ValueError, v.validate, 0)
self.assertRaises(ValueError, v.validate, 11)
def test_validate_string_type(self):
v = types.StringType(min_length=1, max_length=10,
pattern='^[a-zA-Z0-9]*$')
v.validate('1')
v.validate('12345')
v.validate('1234567890')
self.assertRaises(ValueError, v.validate, '')
self.assertRaises(ValueError, v.validate, '12345678901')
# Test a pattern validation
v.validate('a')
v.validate('A')
self.assertRaises(ValueError, v.validate, '_')
def test_validate_string_type_precompile(self):
precompile = re.compile('^[a-zA-Z0-9]*$')
v = types.StringType(min_length=1, max_length=10,
pattern=precompile)
# Test a pattern validation
v.validate('a')
v.validate('A')
self.assertRaises(ValueError, v.validate, '_')
def test_validate_string_type_pattern_exception_message(self):
regex = '^[a-zA-Z0-9]*$'
v = types.StringType(pattern=regex)
try:
v.validate('_')
self.assertFail()
except ValueError as e:
self.assertIn(regex, str(e))
def test_register_invalid_array(self):
self.assertRaises(ValueError, types.register_type, [])
self.assertRaises(ValueError, types.register_type, [int, str])
self.assertRaises(AttributeError, types.register_type, [1])
def test_register_invalid_dict(self):
self.assertRaises(ValueError, types.register_type, {})
self.assertRaises(ValueError, types.register_type,
{int: str, str: int})
self.assertRaises(ValueError, types.register_type,
{types.Unset: str})
def test_list_attribute_no_auto_register(self):
class MyType(object):
aint = int
assert not hasattr(MyType, '_wsme_attributes')
self.assertRaises(TypeError, types.list_attributes, MyType)
assert not hasattr(MyType, '_wsme_attributes')
def test_list_of_complextypes(self):
class A(object):
bs = types.wsattr(['B'])
class B(object):
i = int
types.register_type(A)
types.register_type(B)
assert A.bs.datatype.item_type is B
def test_cross_referenced_types(self):
class A(object):
b = types.wsattr('B')
class B(object):
a = A
types.register_type(A)
types.register_type(B)
assert A.b.datatype is B
def test_base(self):
class B1(types.Base):
b2 = types.wsattr('B2')
class B2(types.Base):
b2 = types.wsattr('B2')
assert B1.b2.datatype is B2, repr(B1.b2.datatype)
assert B2.b2.datatype is B2
def test_base_init(self):
class C1(types.Base):
s = str
c = C1(s='test')
assert c.s == 'test'
def test_array_eq(self):
ell = [types.ArrayType(str)]
assert types.ArrayType(str) in ell
def test_array_sample(self):
s = types.ArrayType(str).sample()
assert isinstance(s, list)
assert s
assert s[0] == ''
def test_dict_sample(self):
s = types.DictType(str, str).sample()
assert isinstance(s, dict)
assert s
assert s == {'': ''}
def test_binary_to_base(self):
import base64
assert types.binary.tobasetype(None) is None
expected = base64.encodebytes(b'abcdef')
assert types.binary.tobasetype(b'abcdef') == expected
def test_binary_from_base(self):
import base64
assert types.binary.frombasetype(None) is None
encoded = base64.encodebytes(b'abcdef')
assert types.binary.frombasetype(encoded) == b'abcdef'
def test_wsattr_weakref_datatype(self):
# If the datatype inside the wsattr ends up a weakref, it
# should be converted to the real type when accessed again by
# the property getter.
import weakref
a = types.wsattr(int)
a.datatype = weakref.ref(int)
assert a.datatype is int
def test_wsattr_list_datatype(self):
# If the datatype inside the wsattr ends up a list of weakrefs
# to types, it should be converted to the real types when
# accessed again by the property getter.
import weakref
a = types.wsattr(int)
a.datatype = [weakref.ref(int)]
assert isinstance(a.datatype, list)
assert a.datatype[0] is int
def test_unregister(self):
class TempType(object):
pass
types.registry.register(TempType)
v = types.registry.lookup('TempType')
self.assertIs(v, TempType)
types.registry._unregister(TempType)
after = types.registry.lookup('TempType')
self.assertIsNone(after)
def test_unregister_twice(self):
class TempType(object):
pass
types.registry.register(TempType)
v = types.registry.lookup('TempType')
self.assertIs(v, TempType)
types.registry._unregister(TempType)
# Second call should not raise an exception
types.registry._unregister(TempType)
after = types.registry.lookup('TempType')
self.assertIsNone(after)
def test_unregister_array_type(self):
class TempType(object):
pass
t = [TempType]
types.registry.register(t)
self.assertNotEqual(types.registry.array_types, set())
types.registry._unregister(t)
self.assertEqual(types.registry.array_types, set())
def test_unregister_array_type_twice(self):
class TempType(object):
pass
t = [TempType]
types.registry.register(t)
self.assertNotEqual(types.registry.array_types, set())
types.registry._unregister(t)
# Second call should not raise an exception
types.registry._unregister(t)
self.assertEqual(types.registry.array_types, set())
def test_unregister_dict_type(self):
class TempType(object):
pass
t = {str: TempType}
types.registry.register(t)
self.assertNotEqual(types.registry.dict_types, set())
types.registry._unregister(t)
self.assertEqual(types.registry.dict_types, set())
def test_unregister_dict_type_twice(self):
class TempType(object):
pass
t = {str: TempType}
types.registry.register(t)
self.assertNotEqual(types.registry.dict_types, set())
types.registry._unregister(t)
# Second call should not raise an exception
types.registry._unregister(t)
self.assertEqual(types.registry.dict_types, set())
def test_reregister(self):
class TempType(object):
pass
types.registry.register(TempType)
v = types.registry.lookup('TempType')
self.assertIs(v, TempType)
types.registry.reregister(TempType)
after = types.registry.lookup('TempType')
self.assertIs(after, TempType)
def test_reregister_and_add_attr(self):
class TempType(object):
pass
types.registry.register(TempType)
attrs = types.list_attributes(TempType)
self.assertEqual(attrs, [])
TempType.one = str
types.registry.reregister(TempType)
after = types.list_attributes(TempType)
self.assertNotEqual(after, [])
def test_non_registered_complex_type(self):
class TempType(types.Base):
__registry__ = None
self.assertFalse(types.iscomplex(TempType))
types.registry.register(TempType)
self.assertTrue(types.iscomplex(TempType))