Browse Source

Merge "Expunge the internal version of WSME"

changes/79/763279/2
Zuul 4 days ago
committed by Gerrit Code Review
parent
commit
585f90212a
16 changed files with 15 additions and 3389 deletions
  1. +0
    -381
      ironic/api/args.py
  2. +0
    -53
      ironic/api/controllers/base.py
  3. +0
    -29
      ironic/api/controllers/v1/collection.py
  4. +1
    -4
      ironic/api/controllers/v1/notification_utils.py
  5. +0
    -31
      ironic/api/controllers/v1/state.py
  6. +0
    -263
      ironic/api/controllers/v1/types.py
  7. +11
    -2
      ironic/api/controllers/v1/utils.py
  8. +0
    -222
      ironic/api/expose.py
  9. +0
    -709
      ironic/api/types.py
  10. +0
    -315
      ironic/tests/unit/api/controllers/v1/test_expose.py
  11. +0
    -12
      ironic/tests/unit/api/controllers/v1/test_notification_utils.py
  12. +0
    -291
      ironic/tests/unit/api/controllers/v1/test_types.py
  13. +2
    -3
      ironic/tests/unit/api/controllers/v1/test_utils.py
  14. +0
    -506
      ironic/tests/unit/api/test_args.py
  15. +1
    -2
      ironic/tests/unit/api/test_method.py
  16. +0
    -566
      ironic/tests/unit/api/test_types.py

+ 0
- 381
ironic/api/args.py View File

@@ -1,381 +0,0 @@
# Copyright 2011-2019 the WSME authors and contributors
# (See https://opendev.org/x/wsme/)
#
# This module is part of WSME and is also released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import decimal
import json
import logging

from dateutil import parser as dateparser

from ironic.api import types as atypes
from ironic.common import exception

LOG = logging.getLogger(__name__)


CONTENT_TYPE = 'application/json'
ACCEPT_CONTENT_TYPES = [
CONTENT_TYPE,
'text/javascript',
'application/javascript'
]
ENUM_TRUE = ('true', 't', 'yes', 'y', 'on', '1')
ENUM_FALSE = ('false', 'f', 'no', 'n', 'off', '0')


def fromjson_array(datatype, value):
if not isinstance(value, list):
raise ValueError("Value not a valid list: %s" % value)
return [fromjson(datatype.item_type, item) for item in value]


def fromjson_dict(datatype, value):
if not isinstance(value, dict):
raise ValueError("Value not a valid dict: %s" % value)
return dict((
(fromjson(datatype.key_type, item[0]),
fromjson(datatype.value_type, item[1]))
for item in value.items()))


def fromjson_bool(value):
if isinstance(value, (int, bool)):
return bool(value)
if value in ENUM_TRUE:
return True
if value in ENUM_FALSE:
return False
raise ValueError("Value not an unambiguous boolean: %s" % value)


def fromjson(datatype, value):
"""A generic converter from json base types to python datatype.

"""
if value is None:
return None

if isinstance(datatype, atypes.ArrayType):
return fromjson_array(datatype, value)

if isinstance(datatype, atypes.DictType):
return fromjson_dict(datatype, value)

if datatype is bytes:
if isinstance(value, (str, int, float)):
return str(value).encode('utf8')
return value

if datatype is str:
if isinstance(value, bytes):
return value.decode('utf-8')
return value

if datatype in (int, float):
return datatype(value)

if datatype is bool:
return fromjson_bool(value)

if datatype is decimal.Decimal:
return decimal.Decimal(value)

if datatype is datetime.datetime:
return dateparser.parse(value)

if atypes.iscomplex(datatype):
return fromjson_complex(datatype, value)

if atypes.isusertype(datatype):
return datatype.frombasetype(fromjson(datatype.basetype, value))

return value


def fromjson_complex(datatype, value):
obj = datatype()
attributes = atypes.list_attributes(datatype)

# Here we check that all the attributes in the value are also defined
# in our type definition, otherwise we raise an Error.
v_keys = set(value.keys())
a_keys = set(adef.name for adef in attributes)
if not v_keys <= a_keys:
raise exception.UnknownAttribute(None, v_keys - a_keys)

for attrdef in attributes:
if attrdef.name in value:
try:
val_fromjson = fromjson(attrdef.datatype,
value[attrdef.name])
except exception.UnknownAttribute as e:
e.add_fieldname(attrdef.name)
raise
if getattr(attrdef, 'readonly', False):
raise exception.InvalidInput(attrdef.name, val_fromjson,
"Cannot set read only field.")
setattr(obj, attrdef.key, val_fromjson)
elif attrdef.mandatory:
raise exception.InvalidInput(attrdef.name, None,
"Mandatory field missing.")

return atypes.validate_value(datatype, obj)


def parse(s, datatypes, bodyarg, encoding='utf8'):
jload = json.load
if not hasattr(s, 'read'):
if isinstance(s, bytes):
s = s.decode(encoding)
jload = json.loads
try:
jdata = jload(s)
except ValueError:
raise exception.ClientSideError("Request is not in valid JSON format")
if bodyarg:
argname = list(datatypes.keys())[0]
try:
kw = {argname: fromjson(datatypes[argname], jdata)}
except ValueError as e:
raise exception.InvalidInput(argname, jdata, e.args[0])
except exception.UnknownAttribute as e:
# We only know the fieldname at this level, not in the
# called function. We fill in this information here.
e.add_fieldname(argname)
raise
else:
kw = {}
extra_args = []
if not isinstance(jdata, dict):
raise exception.ClientSideError("Request must be a JSON dict")
for key in jdata:
if key not in datatypes:
extra_args.append(key)
else:
try:
kw[key] = fromjson(datatypes[key], jdata[key])
except ValueError as e:
raise exception.InvalidInput(key, jdata[key], e.args[0])
except exception.UnknownAttribute as e:
# We only know the fieldname at this level, not in the
# called function. We fill in this information here.
e.add_fieldname(key)
raise
if extra_args:
raise exception.UnknownArgument(', '.join(extra_args))
return kw


def from_param(datatype, value):
if datatype is datetime.datetime:
return dateparser.parse(value) if value else None

if isinstance(datatype, atypes.UserType):
return datatype.frombasetype(
from_param(datatype.basetype, value))

if isinstance(datatype, atypes.ArrayType):
if value is None:
return value
return [
from_param(datatype.item_type, item)
for item in value
]

return datatype(value) if value is not None else None


def from_params(datatype, params, path, hit_paths):
if isinstance(datatype, atypes.ArrayType):
return array_from_params(datatype, params, path, hit_paths)

if isinstance(datatype, atypes.UserType):
return usertype_from_params(datatype, params, path, hit_paths)

if path in params:
assert not isinstance(datatype, atypes.DictType), \
'DictType unsupported'
assert not atypes.iscomplex(datatype) or datatype is atypes.File, \
'complex type unsupported'
hit_paths.add(path)
return from_param(datatype, params[path])
return atypes.Unset


def array_from_params(datatype, params, path, hit_paths):
if hasattr(params, 'getall'):
# webob multidict
def getall(params, path):
return params.getall(path)
elif hasattr(params, 'getlist'):
# werkzeug multidict
def getall(params, path): # noqa
return params.getlist(path)
if path in params:
hit_paths.add(path)
return [
from_param(datatype.item_type, value)
for value in getall(params, path)]

return atypes.Unset


def usertype_from_params(datatype, params, path, hit_paths):
if path in params:
hit_paths.add(path)
value = from_param(datatype.basetype, params[path])
if value is not atypes.Unset:
return datatype.frombasetype(value)
return atypes.Unset


def args_from_args(funcdef, args, kwargs):
newargs = []
for argdef, arg in zip(funcdef.arguments[:len(args)], args):
try:
newargs.append(from_param(argdef.datatype, arg))
except Exception as e:
if isinstance(argdef.datatype, atypes.UserType):
datatype_name = argdef.datatype.name
elif isinstance(argdef.datatype, type):
datatype_name = argdef.datatype.__name__
else:
datatype_name = argdef.datatype.__class__.__name__
raise exception.InvalidInput(
argdef.name,
arg,
"unable to convert to %(datatype)s. Error: %(error)s" % {
'datatype': datatype_name, 'error': e})
newkwargs = {}
for argname, value in kwargs.items():
newkwargs[argname] = from_param(
funcdef.get_arg(argname).datatype, value
)
return newargs, newkwargs


def args_from_params(funcdef, params):
kw = {}
hit_paths = set()
for argdef in funcdef.arguments:
value = from_params(
argdef.datatype, params, argdef.name, hit_paths)
if value is not atypes.Unset:
kw[argdef.name] = value
paths = set(params.keys())
unknown_paths = paths - hit_paths
if '__body__' in unknown_paths:
unknown_paths.remove('__body__')
if not funcdef.ignore_extra_args and unknown_paths:
raise exception.UnknownArgument(', '.join(unknown_paths))
return [], kw


def args_from_body(funcdef, body, mimetype):
if funcdef.body_type is not None:
datatypes = {funcdef.arguments[-1].name: funcdef.body_type}
else:
datatypes = dict(((a.name, a.datatype) for a in funcdef.arguments))

if not body:
return (), {}

if mimetype == "application/x-www-form-urlencoded":
# the parameters should have been parsed in params
return (), {}
elif mimetype not in ACCEPT_CONTENT_TYPES:
raise exception.ClientSideError("Unknown mimetype: %s" % mimetype,
status_code=415)

try:
kw = parse(
body, datatypes, bodyarg=funcdef.body_type is not None
)
except exception.UnknownArgument:
if not funcdef.ignore_extra_args:
raise
kw = {}

return (), kw


def combine_args(funcdef, akw, allow_override=False):
newargs, newkwargs = [], {}
for args, kwargs in akw:
for i, arg in enumerate(args):
n = funcdef.arguments[i].name
if not allow_override and n in newkwargs:
raise exception.ClientSideError(
"Parameter %s was given several times" % n)
newkwargs[n] = arg
for name, value in kwargs.items():
n = str(name)
if not allow_override and n in newkwargs:
raise exception.ClientSideError(
"Parameter %s was given several times" % n)
newkwargs[n] = value
return newargs, newkwargs


def get_args(funcdef, args, kwargs, params, body, mimetype):
"""Combine arguments from multiple sources

Combine arguments from :
* the host framework args and kwargs
* the request params
* the request body

Note that the host framework args and kwargs can be overridden
by arguments from params of body

"""
# get the body from params if not given directly
if not body and '__body__' in params:
body = params['__body__']

# extract args from the host args and kwargs
from_args = args_from_args(funcdef, args, kwargs)

# extract args from the request parameters
from_params = args_from_params(funcdef, params)

# extract args from the request body
from_body = args_from_body(funcdef, body, mimetype)

# combine params and body arguments
from_params_and_body = combine_args(
funcdef,
(from_params, from_body)
)

args, kwargs = combine_args(
funcdef,
(from_args, from_params_and_body),
allow_override=True
)
check_arguments(funcdef, args, kwargs)
return args, kwargs


def check_arguments(funcdef, args, kw):
"""Check if some arguments are missing"""
assert len(args) == 0
for arg in funcdef.arguments:
if arg.mandatory and arg.name not in kw:
raise exception.MissingArgument(arg.name)

+ 0
- 53
ironic/api/controllers/base.py View File

@@ -12,66 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import functools

from webob import exc

from ironic.api import types as atypes
from ironic.common.i18n import _


class AsDictMixin(object):
"""Mixin class adding an as_dict() method."""

def as_dict(self):
"""Render this object as a dict of its fields."""
def _attr_as_pod(attr):
"""Return an attribute as a Plain Old Data (POD) type."""
if isinstance(attr, list):
return [_attr_as_pod(item) for item in attr]
# Recursively evaluate objects that support as_dict().
try:
return attr.as_dict()
except AttributeError:
return attr

return dict((k, _attr_as_pod(getattr(self, k)))
for k in self.fields
if hasattr(self, k)
and getattr(self, k) != atypes.Unset)


class Base(AsDictMixin):
"""Base type for complex types"""
def __init__(self, **kw):
for key, value in kw.items():
if hasattr(self, key):
setattr(self, key, value)

def unset_fields_except(self, except_list=None):
"""Unset fields so they don't appear in the message body.

:param except_list: A list of fields that won't be touched.

"""
if except_list is None:
except_list = []

for k in self.as_dict():
if k not in except_list:
setattr(self, k, atypes.Unset)


class APIBase(Base):

created_at = atypes.wsattr(datetime.datetime, readonly=True)
"""The time in UTC at which the object is created"""

updated_at = atypes.wsattr(datetime.datetime, readonly=True)
"""The time in UTC at which the object is updated"""


@functools.total_ordering
class Version(object):
"""API Version object."""


+ 0
- 29
ironic/api/controllers/v1/collection.py View File

@@ -14,9 +14,7 @@
# under the License.

from ironic import api
from ironic.api.controllers import base
from ironic.api.controllers import link
from ironic.api import types as atypes


def has_next(collection, limit):
@@ -87,30 +85,3 @@ def get_next(collection, limit, url=None, key_field='uuid', **kwargs):

return link.make_link('next', api.request.public_url,
url, next_args)['href']


class Collection(base.Base):

next = str
"""A link to retrieve the next subset of the collection"""

@property
def collection(self):
return getattr(self, self._type)

@classmethod
def get_key_field(cls):
return 'uuid'

def has_next(self, limit):
"""Return whether collection has more items."""
return has_next(self.collection, limit)

def get_next(self, limit, url=None, **kwargs):
"""Return a link to the next subset of the collection."""
resource_url = url or self._type
the_next = get_next(self.collection, limit, url=resource_url,
key_field=self.get_key_field(), **kwargs)
if the_next is None:
return atypes.Unset
return the_next

+ 1
- 4
ironic/api/controllers/v1/notification_utils.py View File

@@ -18,7 +18,6 @@ from oslo_messaging import exceptions as oslo_msg_exc
from oslo_utils import excutils
from oslo_versionedobjects import exception as oslo_vo_exc

from ironic.api import types as atypes
from ironic.common import exception
from ironic.common.i18n import _
from ironic.objects import allocation as allocation_objects
@@ -71,9 +70,7 @@ def _emit_api_notification(context, obj, action, level, status, **kwargs):
:param kwargs: kwargs to use when creating the notification payload.
"""
resource = obj.__class__.__name__.lower()
# value atypes.Unset can be passed from API representation of resource
extra_args = {k: (v if v != atypes.Unset else None)
for k, v in kwargs.items()}
extra_args = kwargs
try:
try:
if action == 'maintenance_set':


+ 0
- 31
ironic/api/controllers/v1/state.py View File

@@ -1,31 +0,0 @@
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from ironic.api.controllers import base


class State(base.APIBase):

current = str
"""The current state"""

target = str
"""The user modified desired state"""

available = [str]
"""A list of available states it is able to transition to"""

links = None
"""A list containing a self link and associated state links"""

+ 0
- 263
ironic/api/controllers/v1/types.py View File

@@ -1,263 +0,0 @@
# coding: utf-8
#
# Copyright 2013 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import inspect
import json

from oslo_log import log
from oslo_utils import strutils
from oslo_utils import uuidutils

from ironic.api.controllers import base
from ironic.api.controllers.v1 import utils as v1_utils
from ironic.api import types as atypes
from ironic.common import exception
from ironic.common.i18n import _
from ironic.common import utils


LOG = log.getLogger(__name__)


class MacAddressType(atypes.UserType):
"""A simple MAC address type."""

basetype = str
name = 'macaddress'

@staticmethod
def validate(value):
return utils.validate_and_normalize_mac(value)

@staticmethod
def frombasetype(value):
if value is None:
return None
return MacAddressType.validate(value)


class UuidOrNameType(atypes.UserType):
"""A simple UUID or logical name type."""

basetype = str
name = 'uuid_or_name'

@staticmethod
def validate(value):
if not (uuidutils.is_uuid_like(value)
or v1_utils.is_valid_logical_name(value)):
raise exception.InvalidUuidOrName(name=value)
return value

@staticmethod
def frombasetype(value):
if value is None:
return None
return UuidOrNameType.validate(value)


class NameType(atypes.UserType):
"""A simple logical name type."""

basetype = str
name = 'name'

@staticmethod
def validate(value):
if not v1_utils.is_valid_logical_name(value):
raise exception.InvalidName(name=value)
return value

@staticmethod
def frombasetype(value):
if value is None:
return None
return NameType.validate(value)


class UuidType(atypes.UserType):
"""A simple UUID type."""

basetype = str
name = 'uuid'

@staticmethod
def validate(value):
if not uuidutils.is_uuid_like(value):
raise exception.InvalidUUID(uuid=value)
return value

@staticmethod
def frombasetype(value):
if value is None:
return None
return UuidType.validate(value)


class BooleanType(atypes.UserType):
"""A simple boolean type."""

basetype = str
name = 'boolean'

@staticmethod
def validate(value):
try:
return strutils.bool_from_string(value, strict=True)
except ValueError as e:
# raise Invalid to return 400 (BadRequest) in the API
raise exception.Invalid(str(e))

@staticmethod
def frombasetype(value):
if value is None:
return None
return BooleanType.validate(value)


class JsonType(atypes.UserType):
"""A simple JSON type."""

basetype = str
name = 'json'

def __str__(self):
# These are the json serializable native types
return ' | '.join(map(str, (str, int, float,
BooleanType, list, dict, None)))

@staticmethod
def validate(value):
try:
json.dumps(value)
except TypeError:
raise exception.Invalid(_('%s is not JSON serializable') % value)
else:
return value

@staticmethod
def frombasetype(value):
return JsonType.validate(value)


class ListType(atypes.UserType):
"""A simple list type."""

basetype = str
name = 'list'

@staticmethod
def validate(value):
"""Validate and convert the input to a ListType.

:param value: A comma separated string of values
:returns: A list of unique values (lower-cased), maintaining the
same order
"""
items = []
for v in str(value).split(','):
v_norm = v.strip().lower()
if v_norm and v_norm not in items:
items.append(v_norm)
return items

@staticmethod
def frombasetype(value):
if value is None:
return None
return ListType.validate(value)


macaddress = MacAddressType()
uuid_or_name = UuidOrNameType()
name = NameType()
uuid = UuidType()
boolean = BooleanType()
listtype = ListType()
# Can't call it 'json' because that's the name of the stdlib module
jsontype = JsonType()


class JsonPatchType(base.Base):
"""A complex type that represents a single json-patch operation."""

path = atypes.wsattr(atypes.StringType(pattern='^(/[\\w-]+)+$'),
mandatory=True)
op = atypes.wsattr(atypes.Enum(str, 'add', 'replace', 'remove'),
mandatory=True)
value = atypes.wsattr(jsontype, default=atypes.Unset)

# The class of the objects being patched. Override this in subclasses.
# Should probably be a subclass of ironic.api.controllers.base.APIBase.
_api_base = None

# Attributes that are not required for construction, but which may not be
# removed if set. Override in subclasses if needed.
_extra_non_removable_attrs = set()

# Set of non-removable attributes, calculated lazily.
_non_removable_attrs = None

@staticmethod
def internal_attrs():
"""Returns a list of internal attributes.

Internal attributes can't be added, replaced or removed. This
method may be overwritten by derived class.

"""
return ['/created_at', '/id', '/links', '/updated_at', '/uuid']

@classmethod
def non_removable_attrs(cls):
"""Returns a set of names of attributes that may not be removed.

Attributes whose 'mandatory' property is True are automatically added
to this set. To add additional attributes to the set, override the
field _extra_non_removable_attrs in subclasses, with a set of the form
{'/foo', '/bar'}.
"""
if cls._non_removable_attrs is None:
cls._non_removable_attrs = cls._extra_non_removable_attrs.copy()
if cls._api_base:
fields = inspect.getmembers(cls._api_base,
lambda a: not inspect.isroutine(a))
for name, field in fields:
if getattr(field, 'mandatory', False):
cls._non_removable_attrs.add('/%s' % name)
return cls._non_removable_attrs

@staticmethod
def validate(patch):
_path = '/' + patch.path.split('/')[1]
if _path in patch.internal_attrs():
msg = _("'%s' is an internal attribute and can not be updated")
raise exception.ClientSideError(msg % patch.path)

if patch.path in patch.non_removable_attrs() and patch.op == 'remove':
msg = _("'%s' is a mandatory attribute and can not be removed")
raise exception.ClientSideError(msg % patch.path)

if patch.op != 'remove':
if patch.value is atypes.Unset:
msg = _("'add' and 'replace' operations need a value")
raise exception.ClientSideError(msg)

ret = {'path': patch.path, 'op': patch.op}
if patch.value is not atypes.Unset:
ret['value'] = patch.value
return ret

+ 11
- 2
ironic/api/controllers/v1/utils.py View File

@@ -30,7 +30,6 @@ from pecan import rest
from ironic import api
from ironic.api.controllers import link
from ironic.api.controllers.v1 import versions
from ironic.api import types as atypes
from ironic.common import args
from ironic.common import exception
from ironic.common import faults
@@ -677,6 +676,16 @@ def is_valid_logical_name(name):
return utils.is_valid_logical_name(name)


class PassthruResponse(object):
"""Object to hold the "response" from a passthru call"""
def __init__(self, obj, status_code=None):
#: Store the result object from the view
self.obj = obj

#: Store an optional status_code
self.status_code = status_code


def vendor_passthru(ident, method, topic, data=None, driver_passthru=False):
"""Call a vendor passthru API extension.

@@ -719,7 +728,7 @@ def vendor_passthru(ident, method, topic, data=None, driver_passthru=False):
return_value = return_value.encode('utf-8')
return_value = io.BytesIO(return_value)

return atypes.PassthruResponse(return_value, status_code=status_code)
return PassthruResponse(return_value, status_code=status_code)


def check_for_invalid_fields(fields, object_fields):


+ 0
- 222
ironic/api/expose.py View File

@@ -1,222 +0,0 @@
#
# Copyright 2015 Rackspace, Inc
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import datetime
import functools
from http import client as http_client
import inspect
import json
import sys
import traceback

from oslo_config import cfg
from oslo_log import log
import pecan
from webob import static

from ironic.api import args as api_args
from ironic.api import functions
from ironic.api import types as atypes

LOG = log.getLogger(__name__)


class JSonRenderer(object):
@staticmethod
def __init__(path, extra_vars):
pass

@staticmethod
def render(template_path, namespace):
if 'faultcode' in namespace:
return encode_error(None, namespace)
result = encode_result(
namespace['result'],
namespace['datatype']
)
return result


pecan.templating._builtin_renderers['wsmejson'] = JSonRenderer

pecan_json_decorate = pecan.expose(
template='wsmejson:',
content_type='application/json',
generic=False)


def expose(*args, **kwargs):
sig = functions.signature(*args, **kwargs)

def decorate(f):
sig(f)
funcdef = functions.FunctionDefinition.get(f)
funcdef.resolve_types(atypes.registry)

@functools.wraps(f)
def callfunction(self, *args, **kwargs):
return_type = funcdef.return_type

try:
args, kwargs = api_args.get_args(
funcdef, args, kwargs, pecan.request.params,
pecan.request.body, pecan.request.content_type
)
result = f(self, *args, **kwargs)

# NOTE: Support setting of status_code with default 201
pecan.response.status = funcdef.status_code
if isinstance(result, atypes.PassthruResponse):
pecan.response.status = result.status_code

# NOTE(lucasagomes): If the return code is 204
# (No Response) we have to make sure that we are not
# returning anything in the body response and the
# content-length is 0
if result.status_code == 204:
return_type = None

if callable(getattr(result.obj, 'read', None)):
# Stream the files-like data directly to the response
pecan.response.app_iter = static.FileIter(result.obj)
return_type = None
result = None
else:
result = result.obj

except Exception:
try:
exception_info = sys.exc_info()
orig_exception = exception_info[1]
orig_code = getattr(orig_exception, 'code', None)
data = format_exception(
exception_info,
cfg.CONF.debug_tracebacks_in_api
)
finally:
del exception_info

if orig_code and orig_code in http_client.responses:
pecan.response.status = orig_code
else:
pecan.response.status = 500

return data

if return_type is None:
pecan.request.pecan['content_type'] = None
pecan.response.content_type = None
return ''

return dict(
datatype=return_type,
result=result
)

pecan_json_decorate(callfunction)
pecan.util._cfg(callfunction)['argspec'] = inspect.getfullargspec(f)
callfunction._wsme_definition = funcdef
return callfunction

return decorate


def tojson(datatype, value):
"""A generic converter from python to jsonify-able datatypes.

"""
if value is None:
return None
if isinstance(datatype, atypes.ArrayType):
return [tojson(datatype.item_type, item) for item in value]
if isinstance(datatype, atypes.DictType):
return dict((
(tojson(datatype.key_type, item[0]),
tojson(datatype.value_type, item[1]))
for item in value.items()
))
if isinstance(value, datetime.datetime):
return value.isoformat()
if atypes.iscomplex(datatype):
d = dict()
for attr in atypes.list_attributes(datatype):
attr_value = getattr(value, attr.key)
if attr_value is not atypes.Unset:
d[attr.name] = tojson(attr.datatype, attr_value)
return d
if isinstance(datatype, atypes.UserType):
return tojson(datatype.basetype, datatype.tobasetype(value))
return value


def encode_result(value, datatype, **options):
jsondata = tojson(datatype, value)
return json.dumps(jsondata)


def encode_error(context, errordetail):
return json.dumps(errordetail)


def format_exception(excinfo, debug=False):
"""Extract informations that can be sent to the client."""
error = excinfo[1]
code = getattr(error, 'code', None)
if code and code in http_client.responses and (400 <= code < 500):
faultstring = (error.faultstring if hasattr(error, 'faultstring')
else str(error))
faultcode = getattr(error, 'faultcode', 'Client')
r = dict(faultcode=faultcode,
faultstring=faultstring)
LOG.debug("Client-side error: %s", r['faultstring'])
r['debuginfo'] = None
return r
else:
faultstring = str(error)
debuginfo = "\n".join(traceback.format_exception(*excinfo))

LOG.error('Server-side error: "%s". Detail: \n%s',
faultstring, debuginfo)

faultcode = getattr(error, 'faultcode', 'Server')
r = dict(faultcode=faultcode, faultstring=faultstring)
if debug:
r['debuginfo'] = debuginfo
else:
r['debuginfo'] = None
return r


class validate(object):
"""Decorator that define the arguments types of a function.


Example::

class MyController(object):
@expose(str)
@validate(datetime.date, datetime.time)
def format(self, d, t):
return d.isoformat() + ' ' + t.isoformat()
"""
def __init__(self, *param_types):
self.param_types = param_types

def __call__(self, func):
argspec = functions.getargspec(func)
fd = functions.FunctionDefinition.get(func)
fd.set_arg_types(argspec, self.param_types)
return func

+ 0
- 709
ironic/api/types.py View File

@@ -1,709 +0,0 @@
# coding: utf-8
#
# Copyright 2011-2019 the WSME authors and contributors
# (See https://opendev.org/x/wsme/)
#
# This module is part of WSME and is also released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.


import base64
import datetime
import decimal
import inspect
import re
import weakref

from oslo_log import log

from ironic.common import exception


LOG = log.getLogger(__name__)


pod_types = (int, bytes, str, float, bool)
native_types = pod_types + (datetime.datetime, decimal.Decimal)
_promotable_types = (int, str, bytes)


class ArrayType(object):
def __init__(self, item_type):
if iscomplex(item_type):
self._item_type = weakref.ref(item_type)
else:
self._item_type = item_type

def __hash__(self):
return hash(self.item_type)

def __eq__(self, other):
return isinstance(other, ArrayType) \
and self.item_type == other.item_type

def sample(self):
return [getattr(self.item_type, 'sample', self.item_type)()]

@property
def item_type(self):
if isinstance(self._item_type, weakref.ref):
return self._item_type()
else:
return self._item_type

def validate(self, value):
if value is None:
return
if not isinstance(value, list):
raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
self.item_type, type(value)
))
return [
validate_value(self.item_type, item)
for item in value
]


class DictType(object):
def __init__(self, key_type, value_type):
if key_type not in (int, bytes, str, float, bool):
raise ValueError("Dictionaries key can only be a pod type")
self.key_type = key_type
if iscomplex(value_type):
self._value_type = weakref.ref(value_type)
else:
self._value_type = value_type

def __hash__(self):
return hash((self.key_type, self.value_type))

def sample(self):
key = getattr(self.key_type, 'sample', self.key_type)()
value = getattr(self.value_type, 'sample', self.value_type)()
return {key: value}

@property
def value_type(self):
if isinstance(self._value_type, weakref.ref):
return self._value_type()
else:
return self._value_type

def validate(self, value):
if not isinstance(value, dict):
raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
self.key_type, self.value_type, type(value)
))
return dict((
(
validate_value(self.key_type, key),
validate_value(self.value_type, v)
) for key, v in value.items()
))


class UserType(object):
basetype = None
name = None

def validate(self, value):
return value

def tobasetype(self, value):
return value

def frombasetype(self, value):
return value


def isusertype(class_):
return isinstance(class_, UserType)


class BinaryType(UserType):
"""A user type that use base64 strings to carry binary data.

"""
basetype = bytes
name = 'binary'

def tobasetype(self, value):
if value is None:
return None
return base64.encodebytes(value)

def frombasetype(self, value):
if value is None:
return None
return base64.decodebytes(value)


#: The binary almost-native type
binary = BinaryType()


class IntegerType(UserType):
"""A simple integer type. Can validate a value range.

:param minimum: Possible minimum value
:param maximum: Possible maximum value

Example::

Price = IntegerType(minimum=1)

"""
basetype = int
name = "integer"

def __init__(self, minimum=None, maximum=None):
self.minimum = minimum
self.maximum = maximum

@staticmethod
def frombasetype(value):
return int(value) if value is not None else None

def validate(self, value):
if self.minimum is not None and value < self.minimum:
error = 'Value should be greater or equal to %s' % self.minimum
raise ValueError(error)

if self.maximum is not None and value > self.maximum:
error = 'Value should be lower or equal to %s' % self.maximum
raise ValueError(error)

return value


class StringType(UserType):
"""A simple string type. Can validate a length and a pattern.

:param min_length: Possible minimum length
:param max_length: Possible maximum length
:param pattern: Possible string pattern

Example::

Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$')

"""
basetype = str
name = "string"

def __init__(self, min_length=None, max_length=None, pattern=None):
self.min_length = min_length
self.max_length = max_length
if isinstance(pattern, str):
self.pattern = re.compile(pattern)
else:
self.pattern = pattern

def validate(self, value):
if not isinstance(value, self.basetype):
error = 'Value should be string'
raise ValueError(error)

if self.min_length is not None and len(value) < self.min_length:
error = 'Value should have a minimum character requirement of %s' \
% self.min_length
raise ValueError(error)

if self.max_length is not None and len(value) > self.max_length:
error = 'Value should have a maximum character requirement of %s' \
% self.max_length
raise ValueError(error)

if self.pattern is not None and not self.pattern.search(value):
error = 'Value should match the pattern %s' % self.pattern.pattern
raise ValueError(error)

return value


class Enum(UserType):
"""A simple enumeration type. Can be based on any non-complex type.

:param basetype: The actual data type
:param values: A set of possible values

If nullable, 'None' should be added the values set.

Example::

Gender = Enum(str, 'male', 'female')
Specie = Enum(str, 'cat', 'dog')

"""
def __init__(self, basetype, *values, **kw):
self.basetype = basetype
self.values = set(values)
name = kw.pop('name', None)
if name is None:
name = "Enum(%s)" % ', '.join((str(v) for v in values))
self.name = name

def validate(self, value):
if value not in self.values:
raise ValueError("Value should be one of: %s" %
', '.join(map(str, self.values)))
return value

def tobasetype(self, value):
return value

def frombasetype(self, value):
return value


class UnsetType(object):
def __bool__(self):
return False

def __repr__(self):
return 'Unset'


Unset = UnsetType()


def validate_value(datatype, value):
if value in (Unset, None) or datatype is None:
return value

# Try to promote the data type to one of our complex types.
if isinstance(datatype, list):
datatype = ArrayType(datatype[0])
elif isinstance(datatype, dict):
datatype = DictType(*list(datatype.items())[0])

# If the datatype has its own validator, use that.
if hasattr(datatype, 'validate'):
return datatype.validate(value)

# Do type promotion/conversion and data validation for builtin
# types.
v_type = type(value)
if datatype == int:
if v_type in _promotable_types:
try:
# Try to turn the value into an int
value = datatype(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is float and v_type in _promotable_types:
try:
value = float(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is str and isinstance(value, bytes):
value = value.decode()
elif datatype is bytes and isinstance(value, str):
value = value.encode()

if not isinstance(value, datatype):
raise ValueError(
"Wrong type. Expected '%s', got '%s'" % (
datatype, v_type
))
return value


def iscomplex(datatype):
return inspect.isclass(datatype) \
and '_wsme_attributes' in datatype.__dict__


class wsproperty(property):
"""A specialised :class:`property` to define typed-property on complex types.

Example::

class MyComplexType(Base):
def get_aint(self):
return self._aint

def set_aint(self, value):
assert avalue < 10 # Dummy input validation
self._aint = value

aint = wsproperty(int, get_aint, set_aint, mandatory=True)

"""
def __init__(self, datatype, fget, fset=None,
mandatory=False, doc=None, name=None):
property.__init__(self, fget, fset)
#: The property name in the parent python class
self.key = None
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
#: property data type
self.datatype = datatype
#: True if the property is mandatory
self.mandatory = mandatory


class wsattr(object):
"""Complex type attribute definition.

Example::

class MyComplexType(ctypes.Base):
optionalvalue = int
mandatoryvalue = wsattr(int, mandatory=True)
named_value = wsattr(int, name='named.value')

After inspection, the non-wsattr attributes will be replaced, and
the above class will be equivalent to::

class MyComplexType(ctypes.Base):
optionalvalue = wsattr(int)
mandatoryvalue = wsattr(int, mandatory=True)

"""
def __init__(self, datatype, mandatory=False, name=None, default=Unset,
readonly=False):
#: The attribute name in the parent python class.
#: Set by :func:`inspect_class`
self.key = None # will be set by class inspection
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
self._datatype = (datatype,)
#: True if the attribute is mandatory
self.mandatory = mandatory
#: Default value. The attribute will return this instead
#: of :data:`Unset` if no value has been set.
self.default = default
#: If True value cannot be set from json/xml input data
self.readonly = readonly

self.complextype = None

def _get_dataholder(self, instance):
dataholder = getattr(instance, '_wsme_dataholder', None)
if dataholder is None:
dataholder = instance._wsme_DataHolderClass()
instance._wsme_dataholder = dataholder
return dataholder

def __get__(self, instance, owner):
if instance is None:
return self
return getattr(
self._get_dataholder(instance),
self.key,
self.default
)

def __set__(self, instance, value):
try:
value = validate_value(self.datatype, value)
except (ValueError, TypeError) as e:
raise exception.InvalidInput(self.name, value, str(e))
dataholder = self._get_dataholder(instance)
if value is Unset:
if hasattr(dataholder, self.key):
delattr(dataholder, self.key)
else:
setattr(dataholder, self.key, value)

def __delete__(self, instance):
self.__set__(instance, Unset)

def _get_datatype(self):
if isinstance(self._datatype, tuple):
self._datatype = \
self.complextype().__registry__.resolve_type(self._datatype[0])
if isinstance(self._datatype, weakref.ref):
return self._datatype()
if isinstance(self._datatype, list):
return [
item() if isinstance(item, weakref.ref) else item
for item in self._datatype
]
return self._datatype

def _set_datatype(self, datatype):
self._datatype = datatype

#: attribute data type. Can be either an actual type,
#: or a type name, in which case the actual type will be
#: determined when needed (generally just before scanning the api).
datatype = property(_get_datatype, _set_datatype)


def iswsattr(attr):
if inspect.isfunction(attr) or inspect.ismethod(attr):
return False
if isinstance(attr, property) and not isinstance(attr, wsproperty):
return False
return True


def sort_attributes(class_, attributes):
"""Sort a class attributes list.

3 mechanisms are attempted :

#. Look for a _wsme_attr_order attribute on the class. This allow
to define an arbitrary order of the attributes (useful for
generated types).

#. Access the object source code to find the declaration order.

#. Sort by alphabetically

"""

if not len(attributes):
return

attrs = dict((a.key, a) for a in attributes)

if hasattr(class_, '_wsme_attr_order'):
names_order = class_._wsme_attr_order
else:
names = attrs.keys()
names_order = []
try:
lines = []
for cls in inspect.getmro(class_):
if cls is object:
continue
lines[len(lines):] = inspect.getsourcelines(cls)[0]
for line in lines:
line = line.strip().replace(" ", "")
if '=' in line:
aname = line[:line.index('=')]
if aname in names and aname not in names_order:
names_order.append(aname)
if len(names_order) < len(names):
names_order.extend((
name for name in names if name not in names_order))
assert len(names_order) == len(names)
except (TypeError, IOError):
names_order = list(names)
names_order.sort()

attributes[:] = [attrs[name] for name in names_order]


def inspect_class(class_):
"""Extract a list of (name, wsattr|wsproperty) for the given class"""
attributes = []
for name, attr in inspect.getmembers(class_, iswsattr):
if name.startswith('_'):
continue
if inspect.isroutine(attr):
continue

if isinstance(attr, (wsattr, wsproperty)):
attrdef = attr
else:
if (attr not in native_types
and (inspect.isclass(attr) or isinstance(attr, (list, dict)))):
register_type(attr)
attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr)

attrdef.key = name
if attrdef.name is None:
attrdef.name = name
attrdef.complextype = weakref.ref(class_)
attributes.append(attrdef)
setattr(class_, name, attrdef)

sort_attributes(class_, attributes)
return attributes


def list_attributes(class_):
"""Returns a list of a complex type attributes."""
if not iscomplex(class_):
raise TypeError("%s is not a registered type")
return class_._wsme_attributes


def make_dataholder(class_):
# the slots are computed outside the class scope to avoid
# 'attr' to pullute the class namespace, which leads to weird
# things if one of the slots is named 'attr'.
slots = [attr.key for attr in class_._wsme_attributes]

class DataHolder(object):
__slots__ = slots

DataHolder.__name__ = class_.__name__ + 'DataHolder'
return DataHolder


class Registry(object):
def __init__(self):
self._complex_types = []
self.array_types = set()
self.dict_types = set()

@property
def complex_types(self):
return [t() for t in self._complex_types if t()]

def register(self, class_):
"""Make sure a type is registered.

It is automatically called by :class:`expose() <expose.expose>`
and :class:`validate() <expose.validate>`.
Unless you want to control when the class inspection is done there
is no need to call it.

"""
if class_ is None or \
class_ in native_types or \
isinstance(class_, UserType) or iscomplex(class_) or \
isinstance(class_, ArrayType) or isinstance(class_, DictType):
return class_

if isinstance(class_, list):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = ArrayType(class_[0])
self.register(dt.item_type)
self.array_types.add(dt)
return dt

if isinstance(class_, dict):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = DictType(*list(class_.items())[0])
self.register(dt.value_type)
self.dict_types.add(dt)
return dt

class_._wsme_attributes = None
class_._wsme_attributes = inspect_class(class_)
class_._wsme_DataHolderClass = make_dataholder(class_)

class_.__registry__ = self
self._complex_types.append(weakref.ref(class_))
return class_

def reregister(self, class_):
"""Register a type which may already have been registered.

"""
self._unregister(class_)
return self.register(class_)

def _unregister(self, class_):
"""Remove a previously registered type.

"""
# Clear the existing attribute reference so it is rebuilt if
# the class is registered again later.
if hasattr(class_, '_wsme_attributes'):
del class_._wsme_attributes
# FIXME(dhellmann): This method does not recurse through the
# types like register() does. Should it?
if isinstance(class_, list):
at = ArrayType(class_[0])
try:
self.array_types.remove(at)
except KeyError:
pass
elif isinstance(class_, dict):
key_type, value_type = list(class_.items())[0]
self.dict_types = set(
dt for dt in self.dict_types
if (dt.key_type, dt.value_type) != (key_type, value_type)
)
# We can't use remove() here because the items in
# _complex_types are weakref objects pointing to the classes,
# so we can't compare with them directly.
self._complex_types = [
ct for ct in self._complex_types
if ct() is not class_
]

def lookup(self, typename):
LOG.debug('Lookup %s', typename)
modname = None
if '.' in typename:
modname, typename = typename.rsplit('.', 1)
for ct in self._complex_types:
ct = ct()
if ct is not None and typename == ct.__name__ and (
modname is None or modname == ct.__module__):
return ct

def resolve_type(self, type_):
if isinstance(type_, str):
return self.lookup(type_)
if isinstance(type_, list):
type_ = ArrayType(type_[0])
if isinstance(type_, dict):
type_ = DictType(list(type_.keys())[0], list(type_.values())[0])
if isinstance(type_, ArrayType):
type_ = ArrayType(self.resolve_type(type_.item_type))
self.array_types.add(type_)
elif isinstance(type_, DictType):
type_ = DictType(
type_.key_type,
self.resolve_type(type_.value_type)
)
self.dict_types.add(type_)
else:
type_ = self.register(type_)
return type_


# Default type registry
registry = Registry()


def register_type(class_):
return registry.register(class_)


class BaseMeta(type):
def __new__(cls, name, bases, dct):
if bases and bases[0] is not object and '__registry__' not in dct:
dct['__registry__'] = registry
return type.__new__(cls, name, bases, dct)

def __init__(cls, name, bases, dct):
if bases and bases[0] is not object and cls.__registry__:
cls.__registry__.register(cls)


class Base(metaclass=BaseMeta):
"""Base type for complex types"""
def __init__(self, **kw):
for key, value in kw.items():
if hasattr(self, key):
setattr(self, key, value)


class PassthruResponse(object):
"""Object to hold the "response" from a passthru call"""
def __init__(self, obj, status_code=None):
#: Store the result object from the view
self.obj = obj

#: Store an optional status_code
self.status_code = status_code

+ 0
- 315
ironic/tests/unit/api/controllers/v1/test_expose.py View File

@@ -1,315 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import datetime
from http import client as http_client
from importlib import machinery
import inspect
import json
import os
import sys
from unittest import mock

from oslo_utils import uuidutils
import pecan.rest
import pecan.testing

from ironic.api.controllers import root
from ironic.api.controllers import v1
from ironic.api import expose
from ironic.api import types as atypes
from ironic.common import exception
from ironic.tests import base as test_base
from ironic.tests.unit.api import base as test_api_base


class TestExposedAPIMethodsCheckPolicy(test_base.TestCase):
"""Ensure that all exposed HTTP endpoints call authorize."""

def setUp(self):
super(TestExposedAPIMethodsCheckPolicy, self).setUp()
self.original_method = sys.modules['ironic.api.expose'].expose
self.exposed_methods = set()

def expose_and_track(*args, **kwargs):
def wrap(f):
if f not in self.exposed_methods:
self.exposed_methods.add(f)
e = self.original_method(*args, **kwargs)
return e(f)
return wrap

p = mock.patch('ironic.api.expose.expose', expose_and_track)
p.start()
self.addCleanup(p.stop)

def _test(self, module):
module_path = os.path.abspath(sys.modules[module].__file__)
machinery.SourceFileLoader(uuidutils.generate_uuid(),
module_path).load_module()
expected_calls = [
'api_utils.check_node_policy_and_retrieve',
'api_utils.check_list_policy',
'api_utils.check_multiple_node_policies_and_retrieve',
'self._get_node_and_topic',
'api_utils.check_port_policy_and_retrieve',
'api_utils.check_port_list_policy',
'self._authorize_patch_and_get_node',
]

for func in self.exposed_methods:
src = inspect.getsource(func)
self.assertTrue(
any(call in src for call in expected_calls)
or ('policy.authorize' in src
and 'context.to_policy_values' in src),
'no policy check found in in exposed method %s' % func)

def test_chassis_api_policy(self):
self._test('ironic.api.controllers.v1.chassis')

def test_driver_api_policy(self):
self._test('ironic.api.controllers.v1.driver')

def test_node_api_policy(self):
self._test('ironic.api.controllers.v1.node')

def test_port_api_policy(self):
self._test('ironic.api.controllers.v1.port')

def test_portgroup_api_policy(self):
self._test('ironic.api.controllers.v1.portgroup')

def test_ramdisk_api_policy(self):
self._test('ironic.api.controllers.v1.ramdisk')

def test_conductor_api_policy(self):
self._test('ironic.api.controllers.v1.conductor')


class UnderscoreStr(atypes.UserType):
basetype = str
name = "custom string"

def tobasetype(self, value):
return '__' + value


class Obj(atypes.Base):
id = int
name = str
unset_me = str


class NestedObj(atypes.Base):
o = Obj


class TestJsonRenderer(test_base.TestCase):

def setUp(self):
super(TestJsonRenderer, self).setUp()
self.renderer = expose.JSonRenderer('/', None)

def test_render_error(self):
error_dict = {
'faultcode': 500,
'faultstring': 'ouch'
}
self.assertEqual(
error_dict,
json.loads(self.renderer.render('/', error_dict))
)

def test_render_exception(self):
error_dict = {
'faultcode': 'Server',
'faultstring': 'ouch',
'debuginfo': None
}
try:
raise Exception('ouch')
except Exception:
excinfo = sys.exc_info()
self.assertEqual(
json.dumps(error_dict),
self.renderer.render('/', expose.format_exception(excinfo))
)

def test_render_http_exception(self):
error_dict = {
'faultcode': '403',
'faultstring': 'Not authorized',
'debuginfo': None
}
try:
e = exception.NotAuthorized()
e.code = 403
except exception.IronicException:
excinfo = sys.exc_info()
self.assertEqual(
json.dumps(error_dict),
self.renderer.render('/', expose.format_exception(excinfo))
)

def test_render_int(self):
self.assertEqual(
'42',
self.renderer.render('/', {
'result': 42,
'datatype': int
})
)

def test_render_none(self):
self.assertEqual(
'null',
self.renderer.render('/', {
'result': None,
'datatype': str
})
)

def test_render_str(self):
self.assertEqual(
'"a string"',
self.renderer.render('/', {
'result': 'a string',
'datatype': str
})
)

def test_render_datetime(self):
self.assertEqual(
'"2020-04-14T10:35:10.586431"',
self.renderer.render('/', {
'result': datetime.datetime(2020, 4, 14, 10, 35, 10, 586431),
'datatype': datetime.datetime
})
)

def test_render_array(self):
self.assertEqual(
json.dumps(['one', 'two', 'three']),
self.renderer.render('/', {
'result': ['one', 'two', 'three'],
'datatype': atypes.ArrayType(str)
})
)

def test_render_dict(self):
self.assertEqual(
json.dumps({'one': 'a', 'two': 'b', 'three': 'c'}),
self.renderer.render('/', {
'result': {'one': 'a', 'two': 'b', 'three': 'c'},
'datatype': atypes.DictType(str, str)
})
)

def test_complex_type(self):
o = Obj()
o.id = 1
o.name = 'one'
o.unset_me = atypes.Unset

n = NestedObj()
n.o = o
self.assertEqual(
json.dumps({'o': {'id': 1, 'name': 'one'}}),
self.renderer.render('/', {
'result': n,
'datatype': NestedObj
})
)

def test_user_type(self):
self.assertEqual(
'"__foo"',
self.renderer.render('/', {
'result': 'foo',
'datatype': UnderscoreStr()
})
)


class MyThingController(pecan.rest.RestController):

_custom_actions = {
'no_content': ['GET'],
'response_content': ['GET'],
'ouch': ['GET'],
}

@expose.expose(int, str, int)
def get(self, name, number):
return {name: number}

@expose.expose(str)
def no_content(self):
return atypes.PassthruResponse('nothing', status_code=204)

@expose.expose(str)
def response_content(self):
return atypes.PassthruResponse('nothing', status_code=200)

@expose.expose(str)
def ouch(self):
raise Exception('ouch')


class MyV1Controller(v1.Controller):

things = MyThingController()


class MyRootController(root.RootController):

v1 = MyV1Controller()


class TestExpose(test_api_base.BaseApiTest):

block_execute = False

root_controller = '%s.%s' % (MyRootController.__module__,
MyRootController.__name__)

def test_expose(self):
self.assertEqual(
{'foo': 1},
self.get_json('/things/', name='foo', number=1)
)

def test_response_204(self):
response = self.get_json('/things/no_content', expect_errors=True)
self.assertEqual(http_client.NO_CONTENT, response.status_int)
self.assertIsNone(response.content_type)
self.assertEqual(b'', response.normal_body)

def test_response_content(self):
response = self.get_json('/things/response_content',
expect_errors=True)
self.assertEqual(http_client.OK, response.status_int)
self.assertEqual(b'"nothing"', response.normal_body)
self.assertEqual('application/json', response.content_type)

def test_exception(self):
response = self.get_json('/things/ouch',
expect_errors=True)