diff --git a/ironic/api/args.py b/ironic/api/args.py deleted file mode 100644 index 7addecf8b3..0000000000 --- a/ironic/api/args.py +++ /dev/null @@ -1,381 +0,0 @@ -# Copyright 2011-2019 the WSME authors and contributors -# (See https://opendev.org/x/wsme/) -# -# This module is part of WSME and is also released under -# the MIT License: http://www.opensource.org/licenses/mit-license.php -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import decimal -import json -import logging - -from dateutil import parser as dateparser - -from ironic.api import types as atypes -from ironic.common import exception - -LOG = logging.getLogger(__name__) - - -CONTENT_TYPE = 'application/json' -ACCEPT_CONTENT_TYPES = [ - CONTENT_TYPE, - 'text/javascript', - 'application/javascript' -] -ENUM_TRUE = ('true', 't', 'yes', 'y', 'on', '1') -ENUM_FALSE = ('false', 'f', 'no', 'n', 'off', '0') - - -def fromjson_array(datatype, value): - if not isinstance(value, list): - raise ValueError("Value not a valid list: %s" % value) - return [fromjson(datatype.item_type, item) for item in value] - - -def fromjson_dict(datatype, value): - if not isinstance(value, dict): - raise ValueError("Value not a valid dict: %s" % value) - return dict(( - (fromjson(datatype.key_type, item[0]), - fromjson(datatype.value_type, item[1])) - for item in value.items())) - - -def fromjson_bool(value): - if isinstance(value, (int, bool)): - return bool(value) - if value in ENUM_TRUE: - return True - if value in ENUM_FALSE: - return False - raise ValueError("Value not an unambiguous boolean: %s" % value) - - -def fromjson(datatype, value): - """A generic converter from json base types to python datatype. - - """ - if value is None: - return None - - if isinstance(datatype, atypes.ArrayType): - return fromjson_array(datatype, value) - - if isinstance(datatype, atypes.DictType): - return fromjson_dict(datatype, value) - - if datatype is bytes: - if isinstance(value, (str, int, float)): - return str(value).encode('utf8') - return value - - if datatype is str: - if isinstance(value, bytes): - return value.decode('utf-8') - return value - - if datatype in (int, float): - return datatype(value) - - if datatype is bool: - return fromjson_bool(value) - - if datatype is decimal.Decimal: - return decimal.Decimal(value) - - if datatype is datetime.datetime: - return dateparser.parse(value) - - if atypes.iscomplex(datatype): - return fromjson_complex(datatype, value) - - if atypes.isusertype(datatype): - return datatype.frombasetype(fromjson(datatype.basetype, value)) - - return value - - -def fromjson_complex(datatype, value): - obj = datatype() - attributes = atypes.list_attributes(datatype) - - # Here we check that all the attributes in the value are also defined - # in our type definition, otherwise we raise an Error. - v_keys = set(value.keys()) - a_keys = set(adef.name for adef in attributes) - if not v_keys <= a_keys: - raise exception.UnknownAttribute(None, v_keys - a_keys) - - for attrdef in attributes: - if attrdef.name in value: - try: - val_fromjson = fromjson(attrdef.datatype, - value[attrdef.name]) - except exception.UnknownAttribute as e: - e.add_fieldname(attrdef.name) - raise - if getattr(attrdef, 'readonly', False): - raise exception.InvalidInput(attrdef.name, val_fromjson, - "Cannot set read only field.") - setattr(obj, attrdef.key, val_fromjson) - elif attrdef.mandatory: - raise exception.InvalidInput(attrdef.name, None, - "Mandatory field missing.") - - return atypes.validate_value(datatype, obj) - - -def parse(s, datatypes, bodyarg, encoding='utf8'): - jload = json.load - if not hasattr(s, 'read'): - if isinstance(s, bytes): - s = s.decode(encoding) - jload = json.loads - try: - jdata = jload(s) - except ValueError: - raise exception.ClientSideError("Request is not in valid JSON format") - if bodyarg: - argname = list(datatypes.keys())[0] - try: - kw = {argname: fromjson(datatypes[argname], jdata)} - except ValueError as e: - raise exception.InvalidInput(argname, jdata, e.args[0]) - except exception.UnknownAttribute as e: - # We only know the fieldname at this level, not in the - # called function. We fill in this information here. - e.add_fieldname(argname) - raise - else: - kw = {} - extra_args = [] - if not isinstance(jdata, dict): - raise exception.ClientSideError("Request must be a JSON dict") - for key in jdata: - if key not in datatypes: - extra_args.append(key) - else: - try: - kw[key] = fromjson(datatypes[key], jdata[key]) - except ValueError as e: - raise exception.InvalidInput(key, jdata[key], e.args[0]) - except exception.UnknownAttribute as e: - # We only know the fieldname at this level, not in the - # called function. We fill in this information here. - e.add_fieldname(key) - raise - if extra_args: - raise exception.UnknownArgument(', '.join(extra_args)) - return kw - - -def from_param(datatype, value): - if datatype is datetime.datetime: - return dateparser.parse(value) if value else None - - if isinstance(datatype, atypes.UserType): - return datatype.frombasetype( - from_param(datatype.basetype, value)) - - if isinstance(datatype, atypes.ArrayType): - if value is None: - return value - return [ - from_param(datatype.item_type, item) - for item in value - ] - - return datatype(value) if value is not None else None - - -def from_params(datatype, params, path, hit_paths): - if isinstance(datatype, atypes.ArrayType): - return array_from_params(datatype, params, path, hit_paths) - - if isinstance(datatype, atypes.UserType): - return usertype_from_params(datatype, params, path, hit_paths) - - if path in params: - assert not isinstance(datatype, atypes.DictType), \ - 'DictType unsupported' - assert not atypes.iscomplex(datatype) or datatype is atypes.File, \ - 'complex type unsupported' - hit_paths.add(path) - return from_param(datatype, params[path]) - return atypes.Unset - - -def array_from_params(datatype, params, path, hit_paths): - if hasattr(params, 'getall'): - # webob multidict - def getall(params, path): - return params.getall(path) - elif hasattr(params, 'getlist'): - # werkzeug multidict - def getall(params, path): # noqa - return params.getlist(path) - if path in params: - hit_paths.add(path) - return [ - from_param(datatype.item_type, value) - for value in getall(params, path)] - - return atypes.Unset - - -def usertype_from_params(datatype, params, path, hit_paths): - if path in params: - hit_paths.add(path) - value = from_param(datatype.basetype, params[path]) - if value is not atypes.Unset: - return datatype.frombasetype(value) - return atypes.Unset - - -def args_from_args(funcdef, args, kwargs): - newargs = [] - for argdef, arg in zip(funcdef.arguments[:len(args)], args): - try: - newargs.append(from_param(argdef.datatype, arg)) - except Exception as e: - if isinstance(argdef.datatype, atypes.UserType): - datatype_name = argdef.datatype.name - elif isinstance(argdef.datatype, type): - datatype_name = argdef.datatype.__name__ - else: - datatype_name = argdef.datatype.__class__.__name__ - raise exception.InvalidInput( - argdef.name, - arg, - "unable to convert to %(datatype)s. Error: %(error)s" % { - 'datatype': datatype_name, 'error': e}) - newkwargs = {} - for argname, value in kwargs.items(): - newkwargs[argname] = from_param( - funcdef.get_arg(argname).datatype, value - ) - return newargs, newkwargs - - -def args_from_params(funcdef, params): - kw = {} - hit_paths = set() - for argdef in funcdef.arguments: - value = from_params( - argdef.datatype, params, argdef.name, hit_paths) - if value is not atypes.Unset: - kw[argdef.name] = value - paths = set(params.keys()) - unknown_paths = paths - hit_paths - if '__body__' in unknown_paths: - unknown_paths.remove('__body__') - if not funcdef.ignore_extra_args and unknown_paths: - raise exception.UnknownArgument(', '.join(unknown_paths)) - return [], kw - - -def args_from_body(funcdef, body, mimetype): - if funcdef.body_type is not None: - datatypes = {funcdef.arguments[-1].name: funcdef.body_type} - else: - datatypes = dict(((a.name, a.datatype) for a in funcdef.arguments)) - - if not body: - return (), {} - - if mimetype == "application/x-www-form-urlencoded": - # the parameters should have been parsed in params - return (), {} - elif mimetype not in ACCEPT_CONTENT_TYPES: - raise exception.ClientSideError("Unknown mimetype: %s" % mimetype, - status_code=415) - - try: - kw = parse( - body, datatypes, bodyarg=funcdef.body_type is not None - ) - except exception.UnknownArgument: - if not funcdef.ignore_extra_args: - raise - kw = {} - - return (), kw - - -def combine_args(funcdef, akw, allow_override=False): - newargs, newkwargs = [], {} - for args, kwargs in akw: - for i, arg in enumerate(args): - n = funcdef.arguments[i].name - if not allow_override and n in newkwargs: - raise exception.ClientSideError( - "Parameter %s was given several times" % n) - newkwargs[n] = arg - for name, value in kwargs.items(): - n = str(name) - if not allow_override and n in newkwargs: - raise exception.ClientSideError( - "Parameter %s was given several times" % n) - newkwargs[n] = value - return newargs, newkwargs - - -def get_args(funcdef, args, kwargs, params, body, mimetype): - """Combine arguments from multiple sources - - Combine arguments from : - * the host framework args and kwargs - * the request params - * the request body - - Note that the host framework args and kwargs can be overridden - by arguments from params of body - - """ - # get the body from params if not given directly - if not body and '__body__' in params: - body = params['__body__'] - - # extract args from the host args and kwargs - from_args = args_from_args(funcdef, args, kwargs) - - # extract args from the request parameters - from_params = args_from_params(funcdef, params) - - # extract args from the request body - from_body = args_from_body(funcdef, body, mimetype) - - # combine params and body arguments - from_params_and_body = combine_args( - funcdef, - (from_params, from_body) - ) - - args, kwargs = combine_args( - funcdef, - (from_args, from_params_and_body), - allow_override=True - ) - check_arguments(funcdef, args, kwargs) - return args, kwargs - - -def check_arguments(funcdef, args, kw): - """Check if some arguments are missing""" - assert len(args) == 0 - for arg in funcdef.arguments: - if arg.mandatory and arg.name not in kw: - raise exception.MissingArgument(arg.name) diff --git a/ironic/api/controllers/base.py b/ironic/api/controllers/base.py index eba5cd0a92..3512000de5 100644 --- a/ironic/api/controllers/base.py +++ b/ironic/api/controllers/base.py @@ -12,66 +12,13 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime import functools from webob import exc -from ironic.api import types as atypes from ironic.common.i18n import _ -class AsDictMixin(object): - """Mixin class adding an as_dict() method.""" - - def as_dict(self): - """Render this object as a dict of its fields.""" - def _attr_as_pod(attr): - """Return an attribute as a Plain Old Data (POD) type.""" - if isinstance(attr, list): - return [_attr_as_pod(item) for item in attr] - # Recursively evaluate objects that support as_dict(). - try: - return attr.as_dict() - except AttributeError: - return attr - - return dict((k, _attr_as_pod(getattr(self, k))) - for k in self.fields - if hasattr(self, k) - and getattr(self, k) != atypes.Unset) - - -class Base(AsDictMixin): - """Base type for complex types""" - def __init__(self, **kw): - for key, value in kw.items(): - if hasattr(self, key): - setattr(self, key, value) - - def unset_fields_except(self, except_list=None): - """Unset fields so they don't appear in the message body. - - :param except_list: A list of fields that won't be touched. - - """ - if except_list is None: - except_list = [] - - for k in self.as_dict(): - if k not in except_list: - setattr(self, k, atypes.Unset) - - -class APIBase(Base): - - created_at = atypes.wsattr(datetime.datetime, readonly=True) - """The time in UTC at which the object is created""" - - updated_at = atypes.wsattr(datetime.datetime, readonly=True) - """The time in UTC at which the object is updated""" - - @functools.total_ordering class Version(object): """API Version object.""" diff --git a/ironic/api/controllers/v1/collection.py b/ironic/api/controllers/v1/collection.py index 5d5125c19c..8368069e88 100644 --- a/ironic/api/controllers/v1/collection.py +++ b/ironic/api/controllers/v1/collection.py @@ -14,9 +14,7 @@ # under the License. from ironic import api -from ironic.api.controllers import base from ironic.api.controllers import link -from ironic.api import types as atypes def has_next(collection, limit): @@ -87,30 +85,3 @@ def get_next(collection, limit, url=None, key_field='uuid', **kwargs): return link.make_link('next', api.request.public_url, url, next_args)['href'] - - -class Collection(base.Base): - - next = str - """A link to retrieve the next subset of the collection""" - - @property - def collection(self): - return getattr(self, self._type) - - @classmethod - def get_key_field(cls): - return 'uuid' - - def has_next(self, limit): - """Return whether collection has more items.""" - return has_next(self.collection, limit) - - def get_next(self, limit, url=None, **kwargs): - """Return a link to the next subset of the collection.""" - resource_url = url or self._type - the_next = get_next(self.collection, limit, url=resource_url, - key_field=self.get_key_field(), **kwargs) - if the_next is None: - return atypes.Unset - return the_next diff --git a/ironic/api/controllers/v1/notification_utils.py b/ironic/api/controllers/v1/notification_utils.py index d0fcb2273b..14afbe22e7 100644 --- a/ironic/api/controllers/v1/notification_utils.py +++ b/ironic/api/controllers/v1/notification_utils.py @@ -18,7 +18,6 @@ from oslo_messaging import exceptions as oslo_msg_exc from oslo_utils import excutils from oslo_versionedobjects import exception as oslo_vo_exc -from ironic.api import types as atypes from ironic.common import exception from ironic.common.i18n import _ from ironic.objects import allocation as allocation_objects @@ -71,9 +70,7 @@ def _emit_api_notification(context, obj, action, level, status, **kwargs): :param kwargs: kwargs to use when creating the notification payload. """ resource = obj.__class__.__name__.lower() - # value atypes.Unset can be passed from API representation of resource - extra_args = {k: (v if v != atypes.Unset else None) - for k, v in kwargs.items()} + extra_args = kwargs try: try: if action == 'maintenance_set': diff --git a/ironic/api/controllers/v1/state.py b/ironic/api/controllers/v1/state.py deleted file mode 100644 index f6972ff82c..0000000000 --- a/ironic/api/controllers/v1/state.py +++ /dev/null @@ -1,31 +0,0 @@ -# Copyright 2013 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from ironic.api.controllers import base - - -class State(base.APIBase): - - current = str - """The current state""" - - target = str - """The user modified desired state""" - - available = [str] - """A list of available states it is able to transition to""" - - links = None - """A list containing a self link and associated state links""" diff --git a/ironic/api/controllers/v1/types.py b/ironic/api/controllers/v1/types.py deleted file mode 100644 index d51ac8a563..0000000000 --- a/ironic/api/controllers/v1/types.py +++ /dev/null @@ -1,263 +0,0 @@ -# coding: utf-8 -# -# Copyright 2013 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import inspect -import json - -from oslo_log import log -from oslo_utils import strutils -from oslo_utils import uuidutils - -from ironic.api.controllers import base -from ironic.api.controllers.v1 import utils as v1_utils -from ironic.api import types as atypes -from ironic.common import exception -from ironic.common.i18n import _ -from ironic.common import utils - - -LOG = log.getLogger(__name__) - - -class MacAddressType(atypes.UserType): - """A simple MAC address type.""" - - basetype = str - name = 'macaddress' - - @staticmethod - def validate(value): - return utils.validate_and_normalize_mac(value) - - @staticmethod - def frombasetype(value): - if value is None: - return None - return MacAddressType.validate(value) - - -class UuidOrNameType(atypes.UserType): - """A simple UUID or logical name type.""" - - basetype = str - name = 'uuid_or_name' - - @staticmethod - def validate(value): - if not (uuidutils.is_uuid_like(value) - or v1_utils.is_valid_logical_name(value)): - raise exception.InvalidUuidOrName(name=value) - return value - - @staticmethod - def frombasetype(value): - if value is None: - return None - return UuidOrNameType.validate(value) - - -class NameType(atypes.UserType): - """A simple logical name type.""" - - basetype = str - name = 'name' - - @staticmethod - def validate(value): - if not v1_utils.is_valid_logical_name(value): - raise exception.InvalidName(name=value) - return value - - @staticmethod - def frombasetype(value): - if value is None: - return None - return NameType.validate(value) - - -class UuidType(atypes.UserType): - """A simple UUID type.""" - - basetype = str - name = 'uuid' - - @staticmethod - def validate(value): - if not uuidutils.is_uuid_like(value): - raise exception.InvalidUUID(uuid=value) - return value - - @staticmethod - def frombasetype(value): - if value is None: - return None - return UuidType.validate(value) - - -class BooleanType(atypes.UserType): - """A simple boolean type.""" - - basetype = str - name = 'boolean' - - @staticmethod - def validate(value): - try: - return strutils.bool_from_string(value, strict=True) - except ValueError as e: - # raise Invalid to return 400 (BadRequest) in the API - raise exception.Invalid(str(e)) - - @staticmethod - def frombasetype(value): - if value is None: - return None - return BooleanType.validate(value) - - -class JsonType(atypes.UserType): - """A simple JSON type.""" - - basetype = str - name = 'json' - - def __str__(self): - # These are the json serializable native types - return ' | '.join(map(str, (str, int, float, - BooleanType, list, dict, None))) - - @staticmethod - def validate(value): - try: - json.dumps(value) - except TypeError: - raise exception.Invalid(_('%s is not JSON serializable') % value) - else: - return value - - @staticmethod - def frombasetype(value): - return JsonType.validate(value) - - -class ListType(atypes.UserType): - """A simple list type.""" - - basetype = str - name = 'list' - - @staticmethod - def validate(value): - """Validate and convert the input to a ListType. - - :param value: A comma separated string of values - :returns: A list of unique values (lower-cased), maintaining the - same order - """ - items = [] - for v in str(value).split(','): - v_norm = v.strip().lower() - if v_norm and v_norm not in items: - items.append(v_norm) - return items - - @staticmethod - def frombasetype(value): - if value is None: - return None - return ListType.validate(value) - - -macaddress = MacAddressType() -uuid_or_name = UuidOrNameType() -name = NameType() -uuid = UuidType() -boolean = BooleanType() -listtype = ListType() -# Can't call it 'json' because that's the name of the stdlib module -jsontype = JsonType() - - -class JsonPatchType(base.Base): - """A complex type that represents a single json-patch operation.""" - - path = atypes.wsattr(atypes.StringType(pattern='^(/[\\w-]+)+$'), - mandatory=True) - op = atypes.wsattr(atypes.Enum(str, 'add', 'replace', 'remove'), - mandatory=True) - value = atypes.wsattr(jsontype, default=atypes.Unset) - - # The class of the objects being patched. Override this in subclasses. - # Should probably be a subclass of ironic.api.controllers.base.APIBase. - _api_base = None - - # Attributes that are not required for construction, but which may not be - # removed if set. Override in subclasses if needed. - _extra_non_removable_attrs = set() - - # Set of non-removable attributes, calculated lazily. - _non_removable_attrs = None - - @staticmethod - def internal_attrs(): - """Returns a list of internal attributes. - - Internal attributes can't be added, replaced or removed. This - method may be overwritten by derived class. - - """ - return ['/created_at', '/id', '/links', '/updated_at', '/uuid'] - - @classmethod - def non_removable_attrs(cls): - """Returns a set of names of attributes that may not be removed. - - Attributes whose 'mandatory' property is True are automatically added - to this set. To add additional attributes to the set, override the - field _extra_non_removable_attrs in subclasses, with a set of the form - {'/foo', '/bar'}. - """ - if cls._non_removable_attrs is None: - cls._non_removable_attrs = cls._extra_non_removable_attrs.copy() - if cls._api_base: - fields = inspect.getmembers(cls._api_base, - lambda a: not inspect.isroutine(a)) - for name, field in fields: - if getattr(field, 'mandatory', False): - cls._non_removable_attrs.add('/%s' % name) - return cls._non_removable_attrs - - @staticmethod - def validate(patch): - _path = '/' + patch.path.split('/')[1] - if _path in patch.internal_attrs(): - msg = _("'%s' is an internal attribute and can not be updated") - raise exception.ClientSideError(msg % patch.path) - - if patch.path in patch.non_removable_attrs() and patch.op == 'remove': - msg = _("'%s' is a mandatory attribute and can not be removed") - raise exception.ClientSideError(msg % patch.path) - - if patch.op != 'remove': - if patch.value is atypes.Unset: - msg = _("'add' and 'replace' operations need a value") - raise exception.ClientSideError(msg) - - ret = {'path': patch.path, 'op': patch.op} - if patch.value is not atypes.Unset: - ret['value'] = patch.value - return ret diff --git a/ironic/api/controllers/v1/utils.py b/ironic/api/controllers/v1/utils.py index eebf6c86ed..2ad7cd7b45 100644 --- a/ironic/api/controllers/v1/utils.py +++ b/ironic/api/controllers/v1/utils.py @@ -30,7 +30,6 @@ from pecan import rest from ironic import api from ironic.api.controllers import link from ironic.api.controllers.v1 import versions -from ironic.api import types as atypes from ironic.common import args from ironic.common import exception from ironic.common import faults @@ -677,6 +676,16 @@ def is_valid_logical_name(name): return utils.is_valid_logical_name(name) +class PassthruResponse(object): + """Object to hold the "response" from a passthru call""" + def __init__(self, obj, status_code=None): + #: Store the result object from the view + self.obj = obj + + #: Store an optional status_code + self.status_code = status_code + + def vendor_passthru(ident, method, topic, data=None, driver_passthru=False): """Call a vendor passthru API extension. @@ -719,7 +728,7 @@ def vendor_passthru(ident, method, topic, data=None, driver_passthru=False): return_value = return_value.encode('utf-8') return_value = io.BytesIO(return_value) - return atypes.PassthruResponse(return_value, status_code=status_code) + return PassthruResponse(return_value, status_code=status_code) def check_for_invalid_fields(fields, object_fields): diff --git a/ironic/api/expose.py b/ironic/api/expose.py deleted file mode 100644 index 16eecb1c2a..0000000000 --- a/ironic/api/expose.py +++ /dev/null @@ -1,222 +0,0 @@ -# -# Copyright 2015 Rackspace, Inc -# All Rights Reserved -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import functools -from http import client as http_client -import inspect -import json -import sys -import traceback - -from oslo_config import cfg -from oslo_log import log -import pecan -from webob import static - -from ironic.api import args as api_args -from ironic.api import functions -from ironic.api import types as atypes - -LOG = log.getLogger(__name__) - - -class JSonRenderer(object): - @staticmethod - def __init__(path, extra_vars): - pass - - @staticmethod - def render(template_path, namespace): - if 'faultcode' in namespace: - return encode_error(None, namespace) - result = encode_result( - namespace['result'], - namespace['datatype'] - ) - return result - - -pecan.templating._builtin_renderers['wsmejson'] = JSonRenderer - -pecan_json_decorate = pecan.expose( - template='wsmejson:', - content_type='application/json', - generic=False) - - -def expose(*args, **kwargs): - sig = functions.signature(*args, **kwargs) - - def decorate(f): - sig(f) - funcdef = functions.FunctionDefinition.get(f) - funcdef.resolve_types(atypes.registry) - - @functools.wraps(f) - def callfunction(self, *args, **kwargs): - return_type = funcdef.return_type - - try: - args, kwargs = api_args.get_args( - funcdef, args, kwargs, pecan.request.params, - pecan.request.body, pecan.request.content_type - ) - result = f(self, *args, **kwargs) - - # NOTE: Support setting of status_code with default 201 - pecan.response.status = funcdef.status_code - if isinstance(result, atypes.PassthruResponse): - pecan.response.status = result.status_code - - # NOTE(lucasagomes): If the return code is 204 - # (No Response) we have to make sure that we are not - # returning anything in the body response and the - # content-length is 0 - if result.status_code == 204: - return_type = None - - if callable(getattr(result.obj, 'read', None)): - # Stream the files-like data directly to the response - pecan.response.app_iter = static.FileIter(result.obj) - return_type = None - result = None - else: - result = result.obj - - except Exception: - try: - exception_info = sys.exc_info() - orig_exception = exception_info[1] - orig_code = getattr(orig_exception, 'code', None) - data = format_exception( - exception_info, - cfg.CONF.debug_tracebacks_in_api - ) - finally: - del exception_info - - if orig_code and orig_code in http_client.responses: - pecan.response.status = orig_code - else: - pecan.response.status = 500 - - return data - - if return_type is None: - pecan.request.pecan['content_type'] = None - pecan.response.content_type = None - return '' - - return dict( - datatype=return_type, - result=result - ) - - pecan_json_decorate(callfunction) - pecan.util._cfg(callfunction)['argspec'] = inspect.getfullargspec(f) - callfunction._wsme_definition = funcdef - return callfunction - - return decorate - - -def tojson(datatype, value): - """A generic converter from python to jsonify-able datatypes. - - """ - if value is None: - return None - if isinstance(datatype, atypes.ArrayType): - return [tojson(datatype.item_type, item) for item in value] - if isinstance(datatype, atypes.DictType): - return dict(( - (tojson(datatype.key_type, item[0]), - tojson(datatype.value_type, item[1])) - for item in value.items() - )) - if isinstance(value, datetime.datetime): - return value.isoformat() - if atypes.iscomplex(datatype): - d = dict() - for attr in atypes.list_attributes(datatype): - attr_value = getattr(value, attr.key) - if attr_value is not atypes.Unset: - d[attr.name] = tojson(attr.datatype, attr_value) - return d - if isinstance(datatype, atypes.UserType): - return tojson(datatype.basetype, datatype.tobasetype(value)) - return value - - -def encode_result(value, datatype, **options): - jsondata = tojson(datatype, value) - return json.dumps(jsondata) - - -def encode_error(context, errordetail): - return json.dumps(errordetail) - - -def format_exception(excinfo, debug=False): - """Extract informations that can be sent to the client.""" - error = excinfo[1] - code = getattr(error, 'code', None) - if code and code in http_client.responses and (400 <= code < 500): - faultstring = (error.faultstring if hasattr(error, 'faultstring') - else str(error)) - faultcode = getattr(error, 'faultcode', 'Client') - r = dict(faultcode=faultcode, - faultstring=faultstring) - LOG.debug("Client-side error: %s", r['faultstring']) - r['debuginfo'] = None - return r - else: - faultstring = str(error) - debuginfo = "\n".join(traceback.format_exception(*excinfo)) - - LOG.error('Server-side error: "%s". Detail: \n%s', - faultstring, debuginfo) - - faultcode = getattr(error, 'faultcode', 'Server') - r = dict(faultcode=faultcode, faultstring=faultstring) - if debug: - r['debuginfo'] = debuginfo - else: - r['debuginfo'] = None - return r - - -class validate(object): - """Decorator that define the arguments types of a function. - - - Example:: - - class MyController(object): - @expose(str) - @validate(datetime.date, datetime.time) - def format(self, d, t): - return d.isoformat() + ' ' + t.isoformat() - """ - def __init__(self, *param_types): - self.param_types = param_types - - def __call__(self, func): - argspec = functions.getargspec(func) - fd = functions.FunctionDefinition.get(func) - fd.set_arg_types(argspec, self.param_types) - return func diff --git a/ironic/api/types.py b/ironic/api/types.py deleted file mode 100644 index 55b26ab0eb..0000000000 --- a/ironic/api/types.py +++ /dev/null @@ -1,709 +0,0 @@ -# coding: utf-8 -# -# Copyright 2011-2019 the WSME authors and contributors -# (See https://opendev.org/x/wsme/) -# -# This module is part of WSME and is also released under -# the MIT License: http://www.opensource.org/licenses/mit-license.php -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -import base64 -import datetime -import decimal -import inspect -import re -import weakref - -from oslo_log import log - -from ironic.common import exception - - -LOG = log.getLogger(__name__) - - -pod_types = (int, bytes, str, float, bool) -native_types = pod_types + (datetime.datetime, decimal.Decimal) -_promotable_types = (int, str, bytes) - - -class ArrayType(object): - def __init__(self, item_type): - if iscomplex(item_type): - self._item_type = weakref.ref(item_type) - else: - self._item_type = item_type - - def __hash__(self): - return hash(self.item_type) - - def __eq__(self, other): - return isinstance(other, ArrayType) \ - and self.item_type == other.item_type - - def sample(self): - return [getattr(self.item_type, 'sample', self.item_type)()] - - @property - def item_type(self): - if isinstance(self._item_type, weakref.ref): - return self._item_type() - else: - return self._item_type - - def validate(self, value): - if value is None: - return - if not isinstance(value, list): - raise ValueError("Wrong type. Expected '[%s]', got '%s'" % ( - self.item_type, type(value) - )) - return [ - validate_value(self.item_type, item) - for item in value - ] - - -class DictType(object): - def __init__(self, key_type, value_type): - if key_type not in (int, bytes, str, float, bool): - raise ValueError("Dictionaries key can only be a pod type") - self.key_type = key_type - if iscomplex(value_type): - self._value_type = weakref.ref(value_type) - else: - self._value_type = value_type - - def __hash__(self): - return hash((self.key_type, self.value_type)) - - def sample(self): - key = getattr(self.key_type, 'sample', self.key_type)() - value = getattr(self.value_type, 'sample', self.value_type)() - return {key: value} - - @property - def value_type(self): - if isinstance(self._value_type, weakref.ref): - return self._value_type() - else: - return self._value_type - - def validate(self, value): - if not isinstance(value, dict): - raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % ( - self.key_type, self.value_type, type(value) - )) - return dict(( - ( - validate_value(self.key_type, key), - validate_value(self.value_type, v) - ) for key, v in value.items() - )) - - -class UserType(object): - basetype = None - name = None - - def validate(self, value): - return value - - def tobasetype(self, value): - return value - - def frombasetype(self, value): - return value - - -def isusertype(class_): - return isinstance(class_, UserType) - - -class BinaryType(UserType): - """A user type that use base64 strings to carry binary data. - - """ - basetype = bytes - name = 'binary' - - def tobasetype(self, value): - if value is None: - return None - return base64.encodebytes(value) - - def frombasetype(self, value): - if value is None: - return None - return base64.decodebytes(value) - - -#: The binary almost-native type -binary = BinaryType() - - -class IntegerType(UserType): - """A simple integer type. Can validate a value range. - - :param minimum: Possible minimum value - :param maximum: Possible maximum value - - Example:: - - Price = IntegerType(minimum=1) - - """ - basetype = int - name = "integer" - - def __init__(self, minimum=None, maximum=None): - self.minimum = minimum - self.maximum = maximum - - @staticmethod - def frombasetype(value): - return int(value) if value is not None else None - - def validate(self, value): - if self.minimum is not None and value < self.minimum: - error = 'Value should be greater or equal to %s' % self.minimum - raise ValueError(error) - - if self.maximum is not None and value > self.maximum: - error = 'Value should be lower or equal to %s' % self.maximum - raise ValueError(error) - - return value - - -class StringType(UserType): - """A simple string type. Can validate a length and a pattern. - - :param min_length: Possible minimum length - :param max_length: Possible maximum length - :param pattern: Possible string pattern - - Example:: - - Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$') - - """ - basetype = str - name = "string" - - def __init__(self, min_length=None, max_length=None, pattern=None): - self.min_length = min_length - self.max_length = max_length - if isinstance(pattern, str): - self.pattern = re.compile(pattern) - else: - self.pattern = pattern - - def validate(self, value): - if not isinstance(value, self.basetype): - error = 'Value should be string' - raise ValueError(error) - - if self.min_length is not None and len(value) < self.min_length: - error = 'Value should have a minimum character requirement of %s' \ - % self.min_length - raise ValueError(error) - - if self.max_length is not None and len(value) > self.max_length: - error = 'Value should have a maximum character requirement of %s' \ - % self.max_length - raise ValueError(error) - - if self.pattern is not None and not self.pattern.search(value): - error = 'Value should match the pattern %s' % self.pattern.pattern - raise ValueError(error) - - return value - - -class Enum(UserType): - """A simple enumeration type. Can be based on any non-complex type. - - :param basetype: The actual data type - :param values: A set of possible values - - If nullable, 'None' should be added the values set. - - Example:: - - Gender = Enum(str, 'male', 'female') - Specie = Enum(str, 'cat', 'dog') - - """ - def __init__(self, basetype, *values, **kw): - self.basetype = basetype - self.values = set(values) - name = kw.pop('name', None) - if name is None: - name = "Enum(%s)" % ', '.join((str(v) for v in values)) - self.name = name - - def validate(self, value): - if value not in self.values: - raise ValueError("Value should be one of: %s" % - ', '.join(map(str, self.values))) - return value - - def tobasetype(self, value): - return value - - def frombasetype(self, value): - return value - - -class UnsetType(object): - def __bool__(self): - return False - - def __repr__(self): - return 'Unset' - - -Unset = UnsetType() - - -def validate_value(datatype, value): - if value in (Unset, None) or datatype is None: - return value - - # Try to promote the data type to one of our complex types. - if isinstance(datatype, list): - datatype = ArrayType(datatype[0]) - elif isinstance(datatype, dict): - datatype = DictType(*list(datatype.items())[0]) - - # If the datatype has its own validator, use that. - if hasattr(datatype, 'validate'): - return datatype.validate(value) - - # Do type promotion/conversion and data validation for builtin - # types. - v_type = type(value) - if datatype == int: - if v_type in _promotable_types: - try: - # Try to turn the value into an int - value = datatype(value) - except ValueError: - # An error is raised at the end of the function - # when the types don't match. - pass - elif datatype is float and v_type in _promotable_types: - try: - value = float(value) - except ValueError: - # An error is raised at the end of the function - # when the types don't match. - pass - elif datatype is str and isinstance(value, bytes): - value = value.decode() - elif datatype is bytes and isinstance(value, str): - value = value.encode() - - if not isinstance(value, datatype): - raise ValueError( - "Wrong type. Expected '%s', got '%s'" % ( - datatype, v_type - )) - return value - - -def iscomplex(datatype): - return inspect.isclass(datatype) \ - and '_wsme_attributes' in datatype.__dict__ - - -class wsproperty(property): - """A specialised :class:`property` to define typed-property on complex types. - - Example:: - - class MyComplexType(Base): - def get_aint(self): - return self._aint - - def set_aint(self, value): - assert avalue < 10 # Dummy input validation - self._aint = value - - aint = wsproperty(int, get_aint, set_aint, mandatory=True) - - """ - def __init__(self, datatype, fget, fset=None, - mandatory=False, doc=None, name=None): - property.__init__(self, fget, fset) - #: The property name in the parent python class - self.key = None - #: The attribute name on the public of the api. - #: Defaults to :attr:`key` - self.name = name - #: property data type - self.datatype = datatype - #: True if the property is mandatory - self.mandatory = mandatory - - -class wsattr(object): - """Complex type attribute definition. - - Example:: - - class MyComplexType(ctypes.Base): - optionalvalue = int - mandatoryvalue = wsattr(int, mandatory=True) - named_value = wsattr(int, name='named.value') - - After inspection, the non-wsattr attributes will be replaced, and - the above class will be equivalent to:: - - class MyComplexType(ctypes.Base): - optionalvalue = wsattr(int) - mandatoryvalue = wsattr(int, mandatory=True) - - """ - def __init__(self, datatype, mandatory=False, name=None, default=Unset, - readonly=False): - #: The attribute name in the parent python class. - #: Set by :func:`inspect_class` - self.key = None # will be set by class inspection - #: The attribute name on the public of the api. - #: Defaults to :attr:`key` - self.name = name - self._datatype = (datatype,) - #: True if the attribute is mandatory - self.mandatory = mandatory - #: Default value. The attribute will return this instead - #: of :data:`Unset` if no value has been set. - self.default = default - #: If True value cannot be set from json/xml input data - self.readonly = readonly - - self.complextype = None - - def _get_dataholder(self, instance): - dataholder = getattr(instance, '_wsme_dataholder', None) - if dataholder is None: - dataholder = instance._wsme_DataHolderClass() - instance._wsme_dataholder = dataholder - return dataholder - - def __get__(self, instance, owner): - if instance is None: - return self - return getattr( - self._get_dataholder(instance), - self.key, - self.default - ) - - def __set__(self, instance, value): - try: - value = validate_value(self.datatype, value) - except (ValueError, TypeError) as e: - raise exception.InvalidInput(self.name, value, str(e)) - dataholder = self._get_dataholder(instance) - if value is Unset: - if hasattr(dataholder, self.key): - delattr(dataholder, self.key) - else: - setattr(dataholder, self.key, value) - - def __delete__(self, instance): - self.__set__(instance, Unset) - - def _get_datatype(self): - if isinstance(self._datatype, tuple): - self._datatype = \ - self.complextype().__registry__.resolve_type(self._datatype[0]) - if isinstance(self._datatype, weakref.ref): - return self._datatype() - if isinstance(self._datatype, list): - return [ - item() if isinstance(item, weakref.ref) else item - for item in self._datatype - ] - return self._datatype - - def _set_datatype(self, datatype): - self._datatype = datatype - - #: attribute data type. Can be either an actual type, - #: or a type name, in which case the actual type will be - #: determined when needed (generally just before scanning the api). - datatype = property(_get_datatype, _set_datatype) - - -def iswsattr(attr): - if inspect.isfunction(attr) or inspect.ismethod(attr): - return False - if isinstance(attr, property) and not isinstance(attr, wsproperty): - return False - return True - - -def sort_attributes(class_, attributes): - """Sort a class attributes list. - - 3 mechanisms are attempted : - - #. Look for a _wsme_attr_order attribute on the class. This allow - to define an arbitrary order of the attributes (useful for - generated types). - - #. Access the object source code to find the declaration order. - - #. Sort by alphabetically - - """ - - if not len(attributes): - return - - attrs = dict((a.key, a) for a in attributes) - - if hasattr(class_, '_wsme_attr_order'): - names_order = class_._wsme_attr_order - else: - names = attrs.keys() - names_order = [] - try: - lines = [] - for cls in inspect.getmro(class_): - if cls is object: - continue - lines[len(lines):] = inspect.getsourcelines(cls)[0] - for line in lines: - line = line.strip().replace(" ", "") - if '=' in line: - aname = line[:line.index('=')] - if aname in names and aname not in names_order: - names_order.append(aname) - if len(names_order) < len(names): - names_order.extend(( - name for name in names if name not in names_order)) - assert len(names_order) == len(names) - except (TypeError, IOError): - names_order = list(names) - names_order.sort() - - attributes[:] = [attrs[name] for name in names_order] - - -def inspect_class(class_): - """Extract a list of (name, wsattr|wsproperty) for the given class""" - attributes = [] - for name, attr in inspect.getmembers(class_, iswsattr): - if name.startswith('_'): - continue - if inspect.isroutine(attr): - continue - - if isinstance(attr, (wsattr, wsproperty)): - attrdef = attr - else: - if (attr not in native_types - and (inspect.isclass(attr) or isinstance(attr, (list, dict)))): - register_type(attr) - attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr) - - attrdef.key = name - if attrdef.name is None: - attrdef.name = name - attrdef.complextype = weakref.ref(class_) - attributes.append(attrdef) - setattr(class_, name, attrdef) - - sort_attributes(class_, attributes) - return attributes - - -def list_attributes(class_): - """Returns a list of a complex type attributes.""" - if not iscomplex(class_): - raise TypeError("%s is not a registered type") - return class_._wsme_attributes - - -def make_dataholder(class_): - # the slots are computed outside the class scope to avoid - # 'attr' to pullute the class namespace, which leads to weird - # things if one of the slots is named 'attr'. - slots = [attr.key for attr in class_._wsme_attributes] - - class DataHolder(object): - __slots__ = slots - - DataHolder.__name__ = class_.__name__ + 'DataHolder' - return DataHolder - - -class Registry(object): - def __init__(self): - self._complex_types = [] - self.array_types = set() - self.dict_types = set() - - @property - def complex_types(self): - return [t() for t in self._complex_types if t()] - - def register(self, class_): - """Make sure a type is registered. - - It is automatically called by :class:`expose() ` - and :class:`validate() `. - Unless you want to control when the class inspection is done there - is no need to call it. - - """ - if class_ is None or \ - class_ in native_types or \ - isinstance(class_, UserType) or iscomplex(class_) or \ - isinstance(class_, ArrayType) or isinstance(class_, DictType): - return class_ - - if isinstance(class_, list): - if len(class_) != 1: - raise ValueError("Cannot register type %s" % repr(class_)) - dt = ArrayType(class_[0]) - self.register(dt.item_type) - self.array_types.add(dt) - return dt - - if isinstance(class_, dict): - if len(class_) != 1: - raise ValueError("Cannot register type %s" % repr(class_)) - dt = DictType(*list(class_.items())[0]) - self.register(dt.value_type) - self.dict_types.add(dt) - return dt - - class_._wsme_attributes = None - class_._wsme_attributes = inspect_class(class_) - class_._wsme_DataHolderClass = make_dataholder(class_) - - class_.__registry__ = self - self._complex_types.append(weakref.ref(class_)) - return class_ - - def reregister(self, class_): - """Register a type which may already have been registered. - - """ - self._unregister(class_) - return self.register(class_) - - def _unregister(self, class_): - """Remove a previously registered type. - - """ - # Clear the existing attribute reference so it is rebuilt if - # the class is registered again later. - if hasattr(class_, '_wsme_attributes'): - del class_._wsme_attributes - # FIXME(dhellmann): This method does not recurse through the - # types like register() does. Should it? - if isinstance(class_, list): - at = ArrayType(class_[0]) - try: - self.array_types.remove(at) - except KeyError: - pass - elif isinstance(class_, dict): - key_type, value_type = list(class_.items())[0] - self.dict_types = set( - dt for dt in self.dict_types - if (dt.key_type, dt.value_type) != (key_type, value_type) - ) - # We can't use remove() here because the items in - # _complex_types are weakref objects pointing to the classes, - # so we can't compare with them directly. - self._complex_types = [ - ct for ct in self._complex_types - if ct() is not class_ - ] - - def lookup(self, typename): - LOG.debug('Lookup %s', typename) - modname = None - if '.' in typename: - modname, typename = typename.rsplit('.', 1) - for ct in self._complex_types: - ct = ct() - if ct is not None and typename == ct.__name__ and ( - modname is None or modname == ct.__module__): - return ct - - def resolve_type(self, type_): - if isinstance(type_, str): - return self.lookup(type_) - if isinstance(type_, list): - type_ = ArrayType(type_[0]) - if isinstance(type_, dict): - type_ = DictType(list(type_.keys())[0], list(type_.values())[0]) - if isinstance(type_, ArrayType): - type_ = ArrayType(self.resolve_type(type_.item_type)) - self.array_types.add(type_) - elif isinstance(type_, DictType): - type_ = DictType( - type_.key_type, - self.resolve_type(type_.value_type) - ) - self.dict_types.add(type_) - else: - type_ = self.register(type_) - return type_ - - -# Default type registry -registry = Registry() - - -def register_type(class_): - return registry.register(class_) - - -class BaseMeta(type): - def __new__(cls, name, bases, dct): - if bases and bases[0] is not object and '__registry__' not in dct: - dct['__registry__'] = registry - return type.__new__(cls, name, bases, dct) - - def __init__(cls, name, bases, dct): - if bases and bases[0] is not object and cls.__registry__: - cls.__registry__.register(cls) - - -class Base(metaclass=BaseMeta): - """Base type for complex types""" - def __init__(self, **kw): - for key, value in kw.items(): - if hasattr(self, key): - setattr(self, key, value) - - -class PassthruResponse(object): - """Object to hold the "response" from a passthru call""" - def __init__(self, obj, status_code=None): - #: Store the result object from the view - self.obj = obj - - #: Store an optional status_code - self.status_code = status_code diff --git a/ironic/tests/unit/api/controllers/v1/test_expose.py b/ironic/tests/unit/api/controllers/v1/test_expose.py deleted file mode 100644 index bc8e9fbe72..0000000000 --- a/ironic/tests/unit/api/controllers/v1/test_expose.py +++ /dev/null @@ -1,315 +0,0 @@ -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -from http import client as http_client -from importlib import machinery -import inspect -import json -import os -import sys -from unittest import mock - -from oslo_utils import uuidutils -import pecan.rest -import pecan.testing - -from ironic.api.controllers import root -from ironic.api.controllers import v1 -from ironic.api import expose -from ironic.api import types as atypes -from ironic.common import exception -from ironic.tests import base as test_base -from ironic.tests.unit.api import base as test_api_base - - -class TestExposedAPIMethodsCheckPolicy(test_base.TestCase): - """Ensure that all exposed HTTP endpoints call authorize.""" - - def setUp(self): - super(TestExposedAPIMethodsCheckPolicy, self).setUp() - self.original_method = sys.modules['ironic.api.expose'].expose - self.exposed_methods = set() - - def expose_and_track(*args, **kwargs): - def wrap(f): - if f not in self.exposed_methods: - self.exposed_methods.add(f) - e = self.original_method(*args, **kwargs) - return e(f) - return wrap - - p = mock.patch('ironic.api.expose.expose', expose_and_track) - p.start() - self.addCleanup(p.stop) - - def _test(self, module): - module_path = os.path.abspath(sys.modules[module].__file__) - machinery.SourceFileLoader(uuidutils.generate_uuid(), - module_path).load_module() - expected_calls = [ - 'api_utils.check_node_policy_and_retrieve', - 'api_utils.check_list_policy', - 'api_utils.check_multiple_node_policies_and_retrieve', - 'self._get_node_and_topic', - 'api_utils.check_port_policy_and_retrieve', - 'api_utils.check_port_list_policy', - 'self._authorize_patch_and_get_node', - ] - - for func in self.exposed_methods: - src = inspect.getsource(func) - self.assertTrue( - any(call in src for call in expected_calls) - or ('policy.authorize' in src - and 'context.to_policy_values' in src), - 'no policy check found in in exposed method %s' % func) - - def test_chassis_api_policy(self): - self._test('ironic.api.controllers.v1.chassis') - - def test_driver_api_policy(self): - self._test('ironic.api.controllers.v1.driver') - - def test_node_api_policy(self): - self._test('ironic.api.controllers.v1.node') - - def test_port_api_policy(self): - self._test('ironic.api.controllers.v1.port') - - def test_portgroup_api_policy(self): - self._test('ironic.api.controllers.v1.portgroup') - - def test_ramdisk_api_policy(self): - self._test('ironic.api.controllers.v1.ramdisk') - - def test_conductor_api_policy(self): - self._test('ironic.api.controllers.v1.conductor') - - -class UnderscoreStr(atypes.UserType): - basetype = str - name = "custom string" - - def tobasetype(self, value): - return '__' + value - - -class Obj(atypes.Base): - id = int - name = str - unset_me = str - - -class NestedObj(atypes.Base): - o = Obj - - -class TestJsonRenderer(test_base.TestCase): - - def setUp(self): - super(TestJsonRenderer, self).setUp() - self.renderer = expose.JSonRenderer('/', None) - - def test_render_error(self): - error_dict = { - 'faultcode': 500, - 'faultstring': 'ouch' - } - self.assertEqual( - error_dict, - json.loads(self.renderer.render('/', error_dict)) - ) - - def test_render_exception(self): - error_dict = { - 'faultcode': 'Server', - 'faultstring': 'ouch', - 'debuginfo': None - } - try: - raise Exception('ouch') - except Exception: - excinfo = sys.exc_info() - self.assertEqual( - json.dumps(error_dict), - self.renderer.render('/', expose.format_exception(excinfo)) - ) - - def test_render_http_exception(self): - error_dict = { - 'faultcode': '403', - 'faultstring': 'Not authorized', - 'debuginfo': None - } - try: - e = exception.NotAuthorized() - e.code = 403 - except exception.IronicException: - excinfo = sys.exc_info() - self.assertEqual( - json.dumps(error_dict), - self.renderer.render('/', expose.format_exception(excinfo)) - ) - - def test_render_int(self): - self.assertEqual( - '42', - self.renderer.render('/', { - 'result': 42, - 'datatype': int - }) - ) - - def test_render_none(self): - self.assertEqual( - 'null', - self.renderer.render('/', { - 'result': None, - 'datatype': str - }) - ) - - def test_render_str(self): - self.assertEqual( - '"a string"', - self.renderer.render('/', { - 'result': 'a string', - 'datatype': str - }) - ) - - def test_render_datetime(self): - self.assertEqual( - '"2020-04-14T10:35:10.586431"', - self.renderer.render('/', { - 'result': datetime.datetime(2020, 4, 14, 10, 35, 10, 586431), - 'datatype': datetime.datetime - }) - ) - - def test_render_array(self): - self.assertEqual( - json.dumps(['one', 'two', 'three']), - self.renderer.render('/', { - 'result': ['one', 'two', 'three'], - 'datatype': atypes.ArrayType(str) - }) - ) - - def test_render_dict(self): - self.assertEqual( - json.dumps({'one': 'a', 'two': 'b', 'three': 'c'}), - self.renderer.render('/', { - 'result': {'one': 'a', 'two': 'b', 'three': 'c'}, - 'datatype': atypes.DictType(str, str) - }) - ) - - def test_complex_type(self): - o = Obj() - o.id = 1 - o.name = 'one' - o.unset_me = atypes.Unset - - n = NestedObj() - n.o = o - self.assertEqual( - json.dumps({'o': {'id': 1, 'name': 'one'}}), - self.renderer.render('/', { - 'result': n, - 'datatype': NestedObj - }) - ) - - def test_user_type(self): - self.assertEqual( - '"__foo"', - self.renderer.render('/', { - 'result': 'foo', - 'datatype': UnderscoreStr() - }) - ) - - -class MyThingController(pecan.rest.RestController): - - _custom_actions = { - 'no_content': ['GET'], - 'response_content': ['GET'], - 'ouch': ['GET'], - } - - @expose.expose(int, str, int) - def get(self, name, number): - return {name: number} - - @expose.expose(str) - def no_content(self): - return atypes.PassthruResponse('nothing', status_code=204) - - @expose.expose(str) - def response_content(self): - return atypes.PassthruResponse('nothing', status_code=200) - - @expose.expose(str) - def ouch(self): - raise Exception('ouch') - - -class MyV1Controller(v1.Controller): - - things = MyThingController() - - -class MyRootController(root.RootController): - - v1 = MyV1Controller() - - -class TestExpose(test_api_base.BaseApiTest): - - block_execute = False - - root_controller = '%s.%s' % (MyRootController.__module__, - MyRootController.__name__) - - def test_expose(self): - self.assertEqual( - {'foo': 1}, - self.get_json('/things/', name='foo', number=1) - ) - - def test_response_204(self): - response = self.get_json('/things/no_content', expect_errors=True) - self.assertEqual(http_client.NO_CONTENT, response.status_int) - self.assertIsNone(response.content_type) - self.assertEqual(b'', response.normal_body) - - def test_response_content(self): - response = self.get_json('/things/response_content', - expect_errors=True) - self.assertEqual(http_client.OK, response.status_int) - self.assertEqual(b'"nothing"', response.normal_body) - self.assertEqual('application/json', response.content_type) - - def test_exception(self): - response = self.get_json('/things/ouch', - expect_errors=True) - error_message = json.loads(response.json['error_message']) - self.assertEqual(http_client.INTERNAL_SERVER_ERROR, - response.status_int) - self.assertEqual('application/json', response.content_type) - self.assertEqual('Server', error_message['faultcode']) - self.assertEqual('ouch', error_message['faultstring']) diff --git a/ironic/tests/unit/api/controllers/v1/test_notification_utils.py b/ironic/tests/unit/api/controllers/v1/test_notification_utils.py index d7e3e7e828..3f5b96064a 100644 --- a/ironic/tests/unit/api/controllers/v1/test_notification_utils.py +++ b/ironic/tests/unit/api/controllers/v1/test_notification_utils.py @@ -17,7 +17,6 @@ from unittest import mock from oslo_utils import uuidutils from ironic.api.controllers.v1 import notification_utils as notif_utils -from ironic.api import types as atypes from ironic.objects import fields from ironic.objects import notification from ironic.tests import base as tests_base @@ -97,17 +96,6 @@ class APINotifyTestCase(tests_base.TestCase): self.assertEqual('******', node.driver_info['password']) self.assertEqual('fake-value', node.driver_info['some_value']) - def test_notification_uuid_unset(self): - node = obj_utils.get_test_node(self.context) - test_level = fields.NotificationLevel.INFO - test_status = fields.NotificationStatus.SUCCESS - notif_utils._emit_api_notification(self.context, node, 'create', - test_level, test_status, - chassis_uuid=atypes.Unset) - init_kwargs = self.node_notify_mock.call_args[1] - payload = init_kwargs['payload'] - self.assertIsNone(payload.chassis_uuid) - def test_chassis_notification(self): chassis = obj_utils.get_test_chassis(self.context, extra={'foo': 'boo'}, diff --git a/ironic/tests/unit/api/controllers/v1/test_types.py b/ironic/tests/unit/api/controllers/v1/test_types.py deleted file mode 100644 index dcfde80aea..0000000000 --- a/ironic/tests/unit/api/controllers/v1/test_types.py +++ /dev/null @@ -1,291 +0,0 @@ -# coding: utf-8 -# -# Copyright 2013 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from http import client as http_client -from unittest import mock - -from pecan import rest - -from ironic.api.controllers.v1 import types -from ironic.api import expose -from ironic.api import types as atypes -from ironic.common import exception -from ironic.common import utils -from ironic.tests import base -from ironic.tests.unit.api import base as api_base - - -class TestMacAddressType(base.TestCase): - - def test_valid_mac_addr(self): - test_mac = 'aa:bb:cc:11:22:33' - with mock.patch.object(utils, 'validate_and_normalize_mac') as m_mock: - types.MacAddressType.validate(test_mac) - m_mock.assert_called_once_with(test_mac) - - def test_invalid_mac_addr(self): - self.assertRaises(exception.InvalidMAC, - types.MacAddressType.validate, 'invalid-mac') - - -class TestUuidType(base.TestCase): - - def test_valid_uuid(self): - test_uuid = '1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e' - self.assertEqual(test_uuid, types.UuidType.validate(test_uuid)) - - def test_invalid_uuid(self): - self.assertRaises(exception.InvalidUUID, - types.UuidType.validate, 'invalid-uuid') - - -@mock.patch("ironic.api.request") -class TestNameType(base.TestCase): - - def test_valid_name(self, mock_pecan_req): - mock_pecan_req.version.minor = 10 - test_name = 'hal-9000' - self.assertEqual(test_name, types.NameType.validate(test_name)) - - def test_invalid_name(self, mock_pecan_req): - mock_pecan_req.version.minor = 10 - self.assertRaises(exception.InvalidName, - types.NameType.validate, '-this is not valid-') - - -@mock.patch("ironic.api.request") -class TestUuidOrNameType(base.TestCase): - - def test_valid_uuid(self, mock_pecan_req): - mock_pecan_req.version.minor = 10 - test_uuid = '1a1a1a1a-2b2b-3c3c-4d4d-5e5e5e5e5e5e' - self.assertTrue(types.UuidOrNameType.validate(test_uuid)) - - def test_valid_name(self, mock_pecan_req): - mock_pecan_req.version.minor = 10 - test_name = 'dc16-database5' - self.assertTrue(types.UuidOrNameType.validate(test_name)) - - def test_invalid_uuid_or_name(self, mock_pecan_req): - mock_pecan_req.version.minor = 10 - self.assertRaises(exception.InvalidUuidOrName, - types.UuidOrNameType.validate, 'inval#uuid%or*name') - - -class MyBaseType(object): - """Helper class, patched by objects of type MyPatchType""" - mandatory = atypes.wsattr(str, mandatory=True) - - -class MyPatchType(types.JsonPatchType): - """Helper class for TestJsonPatchType tests.""" - _api_base = MyBaseType - _extra_non_removable_attrs = {'/non_removable'} - - @staticmethod - def internal_attrs(): - return ['/internal'] - - -class MyTest(rest.RestController): - """Helper class for TestJsonPatchType tests.""" - - @expose.validate([MyPatchType]) - @expose.expose([str], body=[MyPatchType]) - def patch(self, patch): - return patch - - -class MyRoot(rest.RestController): - test = MyTest() - - -class TestJsonPatchType(api_base.BaseApiTest): - - root_controller = ('ironic.tests.unit.api.controllers.v1.' - 'test_types.MyRoot') - - def setUp(self): - super(TestJsonPatchType, self).setUp() - - def _patch_json(self, params, expect_errors=False): - return self.app.patch_json('/test', params=params, - headers={'Accept': 'application/json'}, - expect_errors=expect_errors) - - def test_valid_patches(self): - valid_patches = [{'path': '/extra/foo', 'op': 'remove'}, - {'path': '/extra/foo', 'op': 'add', 'value': 'bar'}, - {'path': '/str', 'op': 'replace', 'value': 'bar'}, - {'path': '/bool', 'op': 'add', 'value': True}, - {'path': '/int', 'op': 'add', 'value': 1}, - {'path': '/float', 'op': 'add', 'value': 0.123}, - {'path': '/list', 'op': 'add', 'value': [1, 2]}, - {'path': '/none', 'op': 'add', 'value': None}, - {'path': '/empty_dict', 'op': 'add', 'value': {}}, - {'path': '/empty_list', 'op': 'add', 'value': []}, - {'path': '/dict', 'op': 'add', - 'value': {'cat': 'meow'}}] - ret = self._patch_json(valid_patches, False) - self.assertEqual(http_client.OK, ret.status_int) - self.assertCountEqual(valid_patches, ret.json) - - def test_cannot_update_internal_attr(self): - patch = [{'path': '/internal', 'op': 'replace', 'value': 'foo'}] - ret = self._patch_json(patch, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_cannot_update_internal_dict_attr(self): - patch = [{'path': '/internal/test', 'op': 'replace', - 'value': 'foo'}] - ret = self._patch_json(patch, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_mandatory_attr(self): - patch = [{'op': 'replace', 'path': '/mandatory', 'value': 'foo'}] - ret = self._patch_json(patch, False) - self.assertEqual(http_client.OK, ret.status_int) - self.assertEqual(patch, ret.json) - - def test_cannot_remove_mandatory_attr(self): - patch = [{'op': 'remove', 'path': '/mandatory'}] - ret = self._patch_json(patch, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_cannot_remove_extra_non_removable_attr(self): - patch = [{'op': 'remove', 'path': '/non_removable'}] - ret = self._patch_json(patch, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_missing_required_fields_path(self): - missing_path = [{'op': 'remove'}] - ret = self._patch_json(missing_path, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_missing_required_fields_op(self): - missing_op = [{'path': '/foo'}] - ret = self._patch_json(missing_op, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_invalid_op(self): - patch = [{'path': '/foo', 'op': 'invalid'}] - ret = self._patch_json(patch, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_invalid_path(self): - patch = [{'path': 'invalid-path', 'op': 'remove'}] - ret = self._patch_json(patch, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_cannot_add_with_no_value(self): - patch = [{'path': '/extra/foo', 'op': 'add'}] - ret = self._patch_json(patch, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - def test_cannot_replace_with_no_value(self): - patch = [{'path': '/foo', 'op': 'replace'}] - ret = self._patch_json(patch, True) - self.assertEqual(http_client.BAD_REQUEST, ret.status_int) - self.assertTrue(ret.json['error_message']) - - -class TestBooleanType(base.TestCase): - - def test_valid_true_values(self): - v = types.BooleanType() - self.assertTrue(v.validate("true")) - self.assertTrue(v.validate("TRUE")) - self.assertTrue(v.validate("True")) - self.assertTrue(v.validate("t")) - self.assertTrue(v.validate("1")) - self.assertTrue(v.validate("y")) - self.assertTrue(v.validate("yes")) - self.assertTrue(v.validate("on")) - - def test_valid_false_values(self): - v = types.BooleanType() - self.assertFalse(v.validate("false")) - self.assertFalse(v.validate("FALSE")) - self.assertFalse(v.validate("False")) - self.assertFalse(v.validate("f")) - self.assertFalse(v.validate("0")) - self.assertFalse(v.validate("n")) - self.assertFalse(v.validate("no")) - self.assertFalse(v.validate("off")) - - def test_invalid_value(self): - v = types.BooleanType() - self.assertRaises(exception.Invalid, v.validate, "invalid-value") - self.assertRaises(exception.Invalid, v.validate, "01") - - -class TestJsonType(base.TestCase): - - def test_valid_values(self): - vt = types.jsontype - value = vt.validate("hello") - self.assertEqual("hello", value) - value = vt.validate(10) - self.assertEqual(10, value) - value = vt.validate(0.123) - self.assertEqual(0.123, value) - value = vt.validate(True) - self.assertTrue(value) - value = vt.validate([1, 2, 3]) - self.assertEqual([1, 2, 3], value) - value = vt.validate({'foo': 'bar'}) - self.assertEqual({'foo': 'bar'}, value) - value = vt.validate(None) - self.assertIsNone(value) - - def test_invalid_values(self): - vt = types.jsontype - self.assertRaises(exception.Invalid, vt.validate, object()) - - def test_apimultitype_tostring(self): - vts = str(types.jsontype) - self.assertIn(str(str), vts) - self.assertIn(str(int), vts) - self.assertIn(str(float), vts) - self.assertIn(str(types.BooleanType), vts) - self.assertIn(str(list), vts) - self.assertIn(str(dict), vts) - self.assertIn(str(None), vts) - - -class TestListType(base.TestCase): - - def test_list_type(self): - v = types.ListType() - self.assertEqual(['foo', 'bar'], v.validate('foo,bar')) - self.assertNotEqual(['bar', 'foo'], v.validate('foo,bar')) - - self.assertEqual(['cat', 'meow'], v.validate("cat , meow")) - self.assertEqual(['spongebob', 'squarepants'], - v.validate("SpongeBob,SquarePants")) - self.assertEqual(['foo', 'bar'], v.validate("foo, ,,bar")) - self.assertEqual(['foo', 'bar'], v.validate("foo,foo,foo,bar")) - self.assertIsInstance(v.validate('foo,bar'), list) diff --git a/ironic/tests/unit/api/controllers/v1/test_utils.py b/ironic/tests/unit/api/controllers/v1/test_utils.py index 662f9755fc..df51e594e8 100644 --- a/ironic/tests/unit/api/controllers/v1/test_utils.py +++ b/ironic/tests/unit/api/controllers/v1/test_utils.py @@ -25,7 +25,6 @@ from oslo_utils import uuidutils from ironic import api from ironic.api.controllers.v1 import node as api_node from ironic.api.controllers.v1 import utils -from ironic.api import types as atypes from ironic.common import exception from ironic.common import policy from ironic.common import states @@ -943,7 +942,7 @@ class TestVendorPassthru(base.TestCase): passthru_mock.assert_called_once_with( 'fake-context', 'fake-ident', 'squarepants', 'POST', 'fake-data', 'fake-topic') - self.assertIsInstance(response, atypes.PassthruResponse) + self.assertIsInstance(response, utils.PassthruResponse) self.assertEqual('SpongeBob', response.obj) sc = http_client.ACCEPTED if async_call else http_client.OK self.assertEqual(sc, response.status_code) @@ -979,7 +978,7 @@ class TestVendorPassthru(base.TestCase): self.assertIsInstance(response.obj, io.BytesIO) self.assertEqual(expct_return_value, response.obj.read()) # Assert response message is none - self.assertIsInstance(response, atypes.PassthruResponse) + self.assertIsInstance(response, utils.PassthruResponse) self.assertEqual(http_client.OK, response.status_code) def test_vendor_passthru_attach(self): diff --git a/ironic/tests/unit/api/test_args.py b/ironic/tests/unit/api/test_args.py deleted file mode 100644 index 549c2efe12..0000000000 --- a/ironic/tests/unit/api/test_args.py +++ /dev/null @@ -1,506 +0,0 @@ -# Copyright 2020 Red Hat, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import datetime -import decimal -import io - -from webob import multidict - -from ironic.api import args -from ironic.api.controllers.v1 import types -from ironic.api import functions -from ironic.api import types as atypes -from ironic.common import exception -from ironic.tests import base as test_base - - -class Obj(atypes.Base): - - id = atypes.wsattr(int, mandatory=True) - name = str - readonly_field = atypes.wsattr(str, readonly=True) - default_field = atypes.wsattr(str, default='foo') - unset_me = str - - -class NestedObj(atypes.Base): - o = Obj - - -class TestArgs(test_base.TestCase): - - def test_fromjson_array(self): - atype = atypes.ArrayType(int) - self.assertEqual( - [0, 1, 1234, None], - args.fromjson_array(atype, [0, '1', '1_234', None]) - ) - self.assertRaises(ValueError, args.fromjson_array, - atype, ['one', 'two', 'three']) - self.assertRaises(ValueError, args.fromjson_array, - atype, 'one') - - def test_fromjson_dict(self): - dtype = atypes.DictType(str, int) - self.assertEqual({ - 'zero': 0, - 'one': 1, - 'etc': 1234, - 'none': None - }, args.fromjson_dict(dtype, { - 'zero': 0, - 'one': '1', - 'etc': '1_234', - 'none': None - })) - - self.assertRaises(ValueError, args.fromjson_dict, - dtype, []) - self.assertRaises(ValueError, args.fromjson_dict, - dtype, {'one': 'one'}) - - def test_fromjson_bool(self): - for b in (1, 2, True, 'true', 't', 'yes', 'y', 'on', '1'): - self.assertTrue(args.fromjson_bool(b)) - for b in (0, False, 'false', 'f', 'no', 'n', 'off', '0'): - self.assertFalse(args.fromjson_bool(b)) - for b in ('yup', 'yeet', 'NOPE', 3.14): - self.assertRaises(ValueError, args.fromjson_bool, b) - - def test_fromjson(self): - # parse None - self.assertIsNone(args.fromjson(None, None)) - - # parse array - atype = atypes.ArrayType(int) - self.assertEqual( - [0, 1, 1234, None], - args.fromjson(atype, [0, '1', '1_234', None]) - ) - - # parse dict - dtype = atypes.DictType(str, int) - self.assertEqual({ - 'zero': 0, - 'one': 1, - 'etc': 1234, - 'none': None - }, args.fromjson(dtype, { - 'zero': 0, - 'one': '1', - 'etc': '1_234', - 'none': None - })) - - # parse bytes - self.assertEqual( - b'asdf', - args.fromjson(bytes, b'asdf') - ) - self.assertEqual( - b'asdf', - args.fromjson(bytes, 'asdf') - ) - self.assertEqual( - b'33', - args.fromjson(bytes, 33) - ) - self.assertEqual( - b'3.14', - args.fromjson(bytes, 3.14) - ) - - # parse str - self.assertEqual( - 'asdf', - args.fromjson(str, b'asdf') - ) - self.assertEqual( - 'asdf', - args.fromjson(str, 'asdf') - ) - - # parse int/float - self.assertEqual( - 3, - args.fromjson(int, '3') - ) - self.assertEqual( - 3, - args.fromjson(int, 3) - ) - self.assertEqual( - 3.14, - args.fromjson(float, 3.14) - ) - - # parse bool - self.assertFalse(args.fromjson(bool, 'no')) - self.assertTrue(args.fromjson(bool, 'yes')) - - # parse decimal - self.assertEqual( - decimal.Decimal(3.14), - args.fromjson(decimal.Decimal, 3.14) - ) - - # parse datetime - expected = datetime.datetime(2015, 8, 13, 11, 38, 9, 496475) - self.assertEqual( - expected, - args.fromjson(datetime.datetime, '2015-08-13T11:38:09.496475') - ) - - # parse complex - n = args.fromjson(NestedObj, {'o': { - 'id': 1234, - 'name': 'an object' - }}) - self.assertIsInstance(n.o, Obj) - self.assertEqual(1234, n.o.id) - self.assertEqual('an object', n.o.name) - self.assertEqual('foo', n.o.default_field) - - # parse usertype - self.assertEqual( - ['0', '1', '2', 'three'], - args.fromjson(types.listtype, '0,1, 2, three') - ) - - def test_fromjson_complex(self): - n = args.fromjson_complex(NestedObj, {'o': { - 'id': 1234, - 'name': 'an object' - }}) - self.assertIsInstance(n.o, Obj) - self.assertEqual(1234, n.o.id) - self.assertEqual('an object', n.o.name) - self.assertEqual('foo', n.o.default_field) - - e = self.assertRaises(exception.UnknownAttribute, - args.fromjson_complex, - Obj, {'ooo': {}}) - self.assertEqual({'ooo'}, e.attributes) - - e = self.assertRaises(exception.InvalidInput, args.fromjson_complex, - Obj, - {'name': 'an object'}) - self.assertEqual('id', e.fieldname) - self.assertEqual('Mandatory field missing.', e.msg) - - e = self.assertRaises(exception.InvalidInput, args.fromjson_complex, - Obj, - {'id': 1234, 'readonly_field': 'foo'}) - self.assertEqual('readonly_field', e.fieldname) - self.assertEqual('Cannot set read only field.', e.msg) - - def test_parse(self): - # source as bytes - s = b'{"o": {"id": 1234, "name": "an object"}}' - - # test bodyarg=True - n = args.parse(s, {"o": NestedObj}, True)['o'] - self.assertEqual(1234, n.o.id) - self.assertEqual('an object', n.o.name) - - # source as file - s = io.StringIO('{"o": {"id": 1234, "name": "an object"}}') - - # test bodyarg=False - n = args.parse(s, {"o": Obj}, False)['o'] - self.assertEqual(1234, n.id) - self.assertEqual('an object', n.name) - - # fromjson ValueError - s = '{"o": ["id", "name"]}' - self.assertRaises(exception.InvalidInput, args.parse, - s, {"o": atypes.DictType(str, str)}, False) - s = '["id", "name"]' - self.assertRaises(exception.InvalidInput, args.parse, - s, {"o": atypes.DictType(str, str)}, True) - - # fromjson UnknownAttribute - s = '{"o": {"foo": "bar", "id": 1234, "name": "an object"}}' - self.assertRaises(exception.UnknownAttribute, args.parse, - s, {"o": NestedObj}, True) - self.assertRaises(exception.UnknownAttribute, args.parse, - s, {"o": Obj}, False) - - # invalid json - s = '{Sunn O)))}' - self.assertRaises(exception.ClientSideError, args.parse, - s, {"o": Obj}, False) - - # extra args - s = '{"foo": "bar", "o": {"id": 1234, "name": "an object"}}' - self.assertRaises(exception.UnknownArgument, args.parse, - s, {"o": Obj}, False) - - def test_from_param(self): - # datetime param - expected = datetime.datetime(2015, 8, 13, 11, 38, 9, 496475) - self.assertEqual( - expected, - args.from_param(datetime.datetime, '2015-08-13T11:38:09.496475') - ) - self.assertIsNone(args.from_param(datetime.datetime, None)) - - # usertype param - self.assertEqual( - ['0', '1', '2', 'three'], - args.from_param(types.listtype, '0,1, 2, three') - ) - - # array param - atype = atypes.ArrayType(int) - self.assertEqual( - [0, 1, 1234, None], - args.from_param(atype, [0, '1', '1_234', None]) - ) - self.assertIsNone(args.from_param(atype, None)) - - # string param - self.assertEqual('foo', args.from_param(str, 'foo')) - self.assertIsNone(args.from_param(str, None)) - - # string param with from_params - hit_paths = set() - params = multidict.MultiDict( - foo='bar', - ) - self.assertEqual( - 'bar', - args.from_params(str, params, 'foo', hit_paths) - ) - self.assertEqual({'foo'}, hit_paths) - - def test_array_from_params(self): - hit_paths = set() - datatype = atypes.ArrayType(str) - params = multidict.MultiDict( - foo='bar', - one='two' - ) - self.assertEqual( - ['bar'], - args.from_params(datatype, params, 'foo', hit_paths) - ) - self.assertEqual({'foo'}, hit_paths) - self.assertEqual( - ['two'], - args.array_from_params(datatype, params, 'one', hit_paths) - ) - self.assertEqual({'foo', 'one'}, hit_paths) - - def test_usertype_from_params(self): - hit_paths = set() - datatype = types.listtype - params = multidict.MultiDict( - foo='0,1, 2, three', - ) - self.assertEqual( - ['0', '1', '2', 'three'], - args.usertype_from_params(datatype, params, 'foo', hit_paths) - ) - self.assertEqual( - ['0', '1', '2', 'three'], - args.from_params(datatype, params, 'foo', hit_paths) - ) - self.assertEqual( - atypes.Unset, - args.usertype_from_params(datatype, params, 'bar', hit_paths) - ) - - def test_args_from_args(self): - - fromargs = ['one', 2, [0, '1', '2_34']] - fromkwargs = {'foo': '1, 2, 3'} - - @functions.signature(str, str, int, atypes.ArrayType(int), - types.listtype) - def myfunc(self, first, second, third, foo): - pass - funcdef = functions.FunctionDefinition.get(myfunc) - - newargs, newkwargs = args.args_from_args(funcdef, fromargs, fromkwargs) - self.assertEqual(['one', 2, [0, 1, 234]], newargs) - self.assertEqual({'foo': ['1', '2', '3']}, newkwargs) - - def test_args_from_params(self): - - @functions.signature(str, str, int, atypes.ArrayType(int), - types.listtype) - def myfunc(self, first, second, third, foo): - pass - funcdef = functions.FunctionDefinition.get(myfunc) - params = multidict.MultiDict( - foo='0,1, 2, three', - third='1', - second='2' - ) - self.assertEqual( - ([], {'foo': ['0', '1', '2', 'three'], 'second': 2, 'third': [1]}), - args.args_from_params(funcdef, params) - ) - - # unexpected param - params = multidict.MultiDict(bar='baz') - self.assertRaises(exception.UnknownArgument, args.args_from_params, - funcdef, params) - - # no params plus a body - params = multidict.MultiDict(__body__='') - self.assertEqual( - ([], {}), - args.args_from_params(funcdef, params) - ) - - def test_args_from_body(self): - @functions.signature(str, body=NestedObj) - def myfunc(self, nested): - pass - funcdef = functions.FunctionDefinition.get(myfunc) - mimetype = 'application/json' - body = b'{"o": {"id": 1234, "name": "an object"}}' - newargs, newkwargs = args.args_from_body(funcdef, body, mimetype) - - self.assertEqual(1234, newkwargs['nested'].o.id) - self.assertEqual('an object', newkwargs['nested'].o.name) - - self.assertEqual( - ((), {}), - args.args_from_body(funcdef, None, mimetype) - ) - - self.assertRaises(exception.ClientSideError, args.args_from_body, - funcdef, body, 'application/x-corba') - - self.assertEqual( - ((), {}), - args.args_from_body(funcdef, body, - 'application/x-www-form-urlencoded') - ) - - def test_combine_args(self): - - @functions.signature(str, str, int) - def myfunc(self, first, second,): - pass - funcdef = functions.FunctionDefinition.get(myfunc) - - # empty - self.assertEqual( - ([], {}), - args.combine_args( - funcdef, ( - ([], {}), - ([], {}), - ) - ) - ) - - # combine kwargs - self.assertEqual( - ([], {'first': 'one', 'second': 'two'}), - args.combine_args( - funcdef, ( - ([], {}), - ([], {'first': 'one', 'second': 'two'}), - ) - ) - ) - - # combine mixed args - self.assertEqual( - ([], {'first': 'one', 'second': 'two'}), - args.combine_args( - funcdef, ( - (['one'], {}), - ([], {'second': 'two'}), - ) - ) - ) - - # override kwargs - self.assertEqual( - ([], {'first': 'two'}), - args.combine_args( - funcdef, ( - ([], {'first': 'one'}), - ([], {'first': 'two'}), - ), - allow_override=True - ) - ) - - # override args - self.assertEqual( - ([], {'first': 'two', 'second': 'three'}), - args.combine_args( - funcdef, ( - (['one', 'three'], {}), - (['two'], {}), - ), - allow_override=True - ) - ) - - # can't override args - self.assertRaises(exception.ClientSideError, args.combine_args, - funcdef, - ((['one'], {}), (['two'], {}))) - - # can't override kwargs - self.assertRaises(exception.ClientSideError, args.combine_args, - funcdef, - (([], {'first': 'one'}), ([], {'first': 'two'}))) - - def test_get_args(self): - @functions.signature(str, str, int, atypes.ArrayType(int), - types.listtype, body=NestedObj) - def myfunc(self, first, second, third, foo, nested): - pass - funcdef = functions.FunctionDefinition.get(myfunc) - params = multidict.MultiDict( - foo='0,1, 2, three', - second='2' - ) - mimetype = 'application/json' - body = b'{"o": {"id": 1234, "name": "an object"}}' - fromargs = ['one'] - fromkwargs = {'third': '1'} - - newargs, newkwargs = args.get_args(funcdef, fromargs, fromkwargs, - params, body, mimetype) - self.assertEqual([], newargs) - n = newkwargs.pop('nested') - self.assertEqual({ - 'first': 'one', - 'foo': ['0', '1', '2', 'three'], - 'second': 2, - 'third': [1]}, - newkwargs - ) - self.assertEqual(1234, n.o.id) - self.assertEqual('an object', n.o.name) - - # check_arguments missing mandatory argument 'second' - params = multidict.MultiDict( - foo='0,1, 2, three', - ) - self.assertRaises(exception.MissingArgument, args.get_args, - funcdef, fromargs, fromkwargs, - params, body, mimetype) diff --git a/ironic/tests/unit/api/test_method.py b/ironic/tests/unit/api/test_method.py index 2b6535977d..f2f9cce130 100644 --- a/ironic/tests/unit/api/test_method.py +++ b/ironic/tests/unit/api/test_method.py @@ -22,7 +22,6 @@ from ironic import api from ironic.api.controllers import root from ironic.api.controllers import v1 from ironic.api import method -from ironic.api import types as atypes from ironic.common import args from ironic.tests.unit.api import base as test_api_base @@ -48,7 +47,7 @@ class MyThingController(pecan.rest.RestController): @method.expose() def response_content(self): - resp = atypes.PassthruResponse('nothing', status_code=200) + resp = v1.utils.PassthruResponse('nothing', status_code=200) api.response.status_code = resp.status_code return resp.obj diff --git a/ironic/tests/unit/api/test_types.py b/ironic/tests/unit/api/test_types.py deleted file mode 100644 index d61825cf4d..0000000000 --- a/ironic/tests/unit/api/test_types.py +++ /dev/null @@ -1,566 +0,0 @@ -# coding: utf-8 -# -# Copyright 2011-2019 the WSME authors and contributors -# (See https://opendev.org/x/wsme/) -# -# This module is part of WSME and is also released under -# the MIT License: http://www.opensource.org/licenses/mit-license.php -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import re - -from ironic.api import types -from ironic.common import exception as exc -from ironic.tests import base as test_base - - -def gen_class(): - d = {} - exec('''class tmp(object): pass''', d) - return d['tmp'] - - -class TestTypes(test_base.TestCase): - def setUp(self): - super(TestTypes, self).setUp() - types.registry = types.Registry() - - def test_default_usertype(self): - class MyType(types.UserType): - basetype = str - - My = MyType() - - assert My.validate('a') == 'a' - assert My.tobasetype('a') == 'a' - assert My.frombasetype('a') == 'a' - - def test_unset(self): - u = types.Unset - - assert not u - - def test_flat_type(self): - class Flat(object): - aint = int - abytes = bytes - atext = str - afloat = float - - types.register_type(Flat) - - assert len(Flat._wsme_attributes) == 4 - attrs = Flat._wsme_attributes - print(attrs) - - assert attrs[0].key == 'aint' - assert attrs[0].name == 'aint' - assert isinstance(attrs[0], types.wsattr) - assert attrs[0].datatype == int - assert attrs[0].mandatory is False - assert attrs[1].key == 'abytes' - assert attrs[1].name == 'abytes' - assert attrs[2].key == 'atext' - assert attrs[2].name == 'atext' - assert attrs[3].key == 'afloat' - assert attrs[3].name == 'afloat' - - def test_private_attr(self): - class WithPrivateAttrs(object): - _private = 12 - - types.register_type(WithPrivateAttrs) - - assert len(WithPrivateAttrs._wsme_attributes) == 0 - - def test_attribute_order(self): - class ForcedOrder(object): - _wsme_attr_order = ('a2', 'a1', 'a3') - a1 = int - a2 = int - a3 = int - - types.register_type(ForcedOrder) - - print(ForcedOrder._wsme_attributes) - assert ForcedOrder._wsme_attributes[0].key == 'a2' - assert ForcedOrder._wsme_attributes[1].key == 'a1' - assert ForcedOrder._wsme_attributes[2].key == 'a3' - - c = gen_class() - print(c) - types.register_type(c) - del c._wsme_attributes - - c.a2 = int - c.a1 = int - c.a3 = int - - types.register_type(c) - - assert c._wsme_attributes[0].key == 'a1', c._wsme_attributes[0].key - assert c._wsme_attributes[1].key == 'a2' - assert c._wsme_attributes[2].key == 'a3' - - def test_wsproperty(self): - class WithWSProp(object): - def __init__(self): - self._aint = 0 - - def get_aint(self): - return self._aint - - def set_aint(self, value): - self._aint = value - - aint = types.wsproperty(int, get_aint, set_aint, mandatory=True) - - types.register_type(WithWSProp) - - print(WithWSProp._wsme_attributes) - assert len(WithWSProp._wsme_attributes) == 1 - a = WithWSProp._wsme_attributes[0] - assert a.key == 'aint' - assert a.datatype == int - assert a.mandatory - - o = WithWSProp() - o.aint = 12 - - assert o.aint == 12 - - def test_nested(self): - class Inner(object): - aint = int - - class Outer(object): - inner = Inner - - types.register_type(Outer) - - assert hasattr(Inner, '_wsme_attributes') - assert len(Inner._wsme_attributes) == 1 - - def test_inspect_with_inheritance(self): - class Parent(object): - parent_attribute = int - - class Child(Parent): - child_attribute = int - - types.register_type(Parent) - types.register_type(Child) - - assert len(Child._wsme_attributes) == 2 - - def test_selfreftype(self): - class SelfRefType(object): - pass - - SelfRefType.parent = SelfRefType - - types.register_type(SelfRefType) - - def test_inspect_with_property(self): - class AType(object): - @property - def test(self): - return 'test' - - types.register_type(AType) - - assert len(AType._wsme_attributes) == 0 - assert AType().test == 'test' - - def test_enum(self): - aenum = types.Enum(str, 'v1', 'v2') - assert aenum.basetype is str - - class AType(object): - a = aenum - - types.register_type(AType) - - assert AType.a.datatype is aenum - - obj = AType() - obj.a = 'v1' - assert obj.a == 'v1', repr(obj.a) - - self.assertRaisesRegexp(exc.InvalidInput, - "Invalid input for field/attribute a. \ -Value: 'v3'. Value should be one of: v., v.", - setattr, - obj, - 'a', - 'v3') - - def test_attribute_validation(self): - class AType(object): - alist = [int] - aint = int - - types.register_type(AType) - - obj = AType() - - obj.alist = [1, 2, 3] - assert obj.alist == [1, 2, 3] - obj.aint = 5 - assert obj.aint == 5 - - self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', 12) - self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', [2, 'a']) - - def test_attribute_validation_minimum(self): - class ATypeInt(object): - attr = types.IntegerType(minimum=1, maximum=5) - - types.register_type(ATypeInt) - - obj = ATypeInt() - obj.attr = 2 - - # comparison between 'zero' value and intger minimum (1) raises a - # TypeError which must be wrapped into an InvalidInput exception - self.assertRaises(exc.InvalidInput, setattr, obj, 'attr', 'zero') - - def test_text_attribute_conversion(self): - class SType(object): - atext = str - abytes = bytes - - types.register_type(SType) - - obj = SType() - - obj.atext = b'somebytes' - assert obj.atext == 'somebytes' - assert isinstance(obj.atext, str) - - obj.abytes = 'sometext' - assert obj.abytes == b'sometext' - assert isinstance(obj.abytes, bytes) - - def test_named_attribute(self): - class ABCDType(object): - a_list = types.wsattr([int], name='a.list') - astr = str - - types.register_type(ABCDType) - - assert len(ABCDType._wsme_attributes) == 2 - attrs = ABCDType._wsme_attributes - - assert attrs[0].key == 'a_list', attrs[0].key - assert attrs[0].name == 'a.list', attrs[0].name - assert attrs[1].key == 'astr', attrs[1].key - assert attrs[1].name == 'astr', attrs[1].name - - def test_wsattr_del(self): - class MyType(object): - a = types.wsattr(int) - - types.register_type(MyType) - - value = MyType() - - value.a = 5 - assert value.a == 5 - del value.a - assert value.a is types.Unset - - def test_validate_dict(self): - assert types.validate_value({int: str}, {1: '1', 5: '5'}) - - self.assertRaises(ValueError, types.validate_value, - {int: str}, []) - - assert types.validate_value({int: str}, {'1': '1', 5: '5'}) - - self.assertRaises(ValueError, types.validate_value, - {int: str}, {1: 1, 5: '5'}) - - def test_validate_list_valid(self): - assert types.validate_value([int], [1, 2]) - assert types.validate_value([int], ['5']) - - def test_validate_list_empty(self): - assert types.validate_value([int], []) == [] - - def test_validate_list_none(self): - v = types.ArrayType(int) - assert v.validate(None) is None - - def test_validate_list_invalid_member(self): - self.assertRaises(ValueError, types.validate_value, [int], - ['not-a-number']) - - def test_validate_list_invalid_type(self): - self.assertRaises(ValueError, types.validate_value, [int], 1) - - def test_validate_float(self): - self.assertEqual(types.validate_value(float, 1), 1.0) - self.assertEqual(types.validate_value(float, '1'), 1.0) - self.assertEqual(types.validate_value(float, 1.1), 1.1) - self.assertRaises(ValueError, types.validate_value, float, []) - self.assertRaises(ValueError, types.validate_value, float, - 'not-a-float') - - def test_validate_int(self): - self.assertEqual(types.validate_value(int, 1), 1) - self.assertEqual(types.validate_value(int, '1'), 1) - self.assertRaises(ValueError, types.validate_value, int, 1.1) - - def test_validate_integer_type(self): - v = types.IntegerType(minimum=1, maximum=10) - v.validate(1) - v.validate(5) - v.validate(10) - self.assertRaises(ValueError, v.validate, 0) - self.assertRaises(ValueError, v.validate, 11) - - def test_validate_string_type(self): - v = types.StringType(min_length=1, max_length=10, - pattern='^[a-zA-Z0-9]*$') - v.validate('1') - v.validate('12345') - v.validate('1234567890') - self.assertRaises(ValueError, v.validate, '') - self.assertRaises(ValueError, v.validate, '12345678901') - - # Test a pattern validation - v.validate('a') - v.validate('A') - self.assertRaises(ValueError, v.validate, '_') - - def test_validate_string_type_precompile(self): - precompile = re.compile('^[a-zA-Z0-9]*$') - v = types.StringType(min_length=1, max_length=10, - pattern=precompile) - - # Test a pattern validation - v.validate('a') - v.validate('A') - self.assertRaises(ValueError, v.validate, '_') - - def test_validate_string_type_pattern_exception_message(self): - regex = '^[a-zA-Z0-9]*$' - v = types.StringType(pattern=regex) - try: - v.validate('_') - self.assertFail() - except ValueError as e: - self.assertIn(regex, str(e)) - - def test_register_invalid_array(self): - self.assertRaises(ValueError, types.register_type, []) - self.assertRaises(ValueError, types.register_type, [int, str]) - self.assertRaises(AttributeError, types.register_type, [1]) - - def test_register_invalid_dict(self): - self.assertRaises(ValueError, types.register_type, {}) - self.assertRaises(ValueError, types.register_type, - {int: str, str: int}) - self.assertRaises(ValueError, types.register_type, - {types.Unset: str}) - - def test_list_attribute_no_auto_register(self): - class MyType(object): - aint = int - - assert not hasattr(MyType, '_wsme_attributes') - - self.assertRaises(TypeError, types.list_attributes, MyType) - - assert not hasattr(MyType, '_wsme_attributes') - - def test_list_of_complextypes(self): - class A(object): - bs = types.wsattr(['B']) - - class B(object): - i = int - - types.register_type(A) - types.register_type(B) - - assert A.bs.datatype.item_type is B - - def test_cross_referenced_types(self): - class A(object): - b = types.wsattr('B') - - class B(object): - a = A - - types.register_type(A) - types.register_type(B) - - assert A.b.datatype is B - - def test_base(self): - class B1(types.Base): - b2 = types.wsattr('B2') - - class B2(types.Base): - b2 = types.wsattr('B2') - - assert B1.b2.datatype is B2, repr(B1.b2.datatype) - assert B2.b2.datatype is B2 - - def test_base_init(self): - class C1(types.Base): - s = str - - c = C1(s='test') - assert c.s == 'test' - - def test_array_eq(self): - ell = [types.ArrayType(str)] - assert types.ArrayType(str) in ell - - def test_array_sample(self): - s = types.ArrayType(str).sample() - assert isinstance(s, list) - assert s - assert s[0] == '' - - def test_dict_sample(self): - s = types.DictType(str, str).sample() - assert isinstance(s, dict) - assert s - assert s == {'': ''} - - def test_binary_to_base(self): - import base64 - assert types.binary.tobasetype(None) is None - expected = base64.encodebytes(b'abcdef') - assert types.binary.tobasetype(b'abcdef') == expected - - def test_binary_from_base(self): - import base64 - assert types.binary.frombasetype(None) is None - encoded = base64.encodebytes(b'abcdef') - assert types.binary.frombasetype(encoded) == b'abcdef' - - def test_wsattr_weakref_datatype(self): - # If the datatype inside the wsattr ends up a weakref, it - # should be converted to the real type when accessed again by - # the property getter. - import weakref - a = types.wsattr(int) - a.datatype = weakref.ref(int) - assert a.datatype is int - - def test_wsattr_list_datatype(self): - # If the datatype inside the wsattr ends up a list of weakrefs - # to types, it should be converted to the real types when - # accessed again by the property getter. - import weakref - a = types.wsattr(int) - a.datatype = [weakref.ref(int)] - assert isinstance(a.datatype, list) - assert a.datatype[0] is int - - def test_unregister(self): - class TempType(object): - pass - types.registry.register(TempType) - v = types.registry.lookup('TempType') - self.assertIs(v, TempType) - types.registry._unregister(TempType) - after = types.registry.lookup('TempType') - self.assertIsNone(after) - - def test_unregister_twice(self): - class TempType(object): - pass - types.registry.register(TempType) - v = types.registry.lookup('TempType') - self.assertIs(v, TempType) - types.registry._unregister(TempType) - # Second call should not raise an exception - types.registry._unregister(TempType) - after = types.registry.lookup('TempType') - self.assertIsNone(after) - - def test_unregister_array_type(self): - class TempType(object): - pass - t = [TempType] - types.registry.register(t) - self.assertNotEqual(types.registry.array_types, set()) - types.registry._unregister(t) - self.assertEqual(types.registry.array_types, set()) - - def test_unregister_array_type_twice(self): - class TempType(object): - pass - t = [TempType] - types.registry.register(t) - self.assertNotEqual(types.registry.array_types, set()) - types.registry._unregister(t) - # Second call should not raise an exception - types.registry._unregister(t) - self.assertEqual(types.registry.array_types, set()) - - def test_unregister_dict_type(self): - class TempType(object): - pass - t = {str: TempType} - types.registry.register(t) - self.assertNotEqual(types.registry.dict_types, set()) - types.registry._unregister(t) - self.assertEqual(types.registry.dict_types, set()) - - def test_unregister_dict_type_twice(self): - class TempType(object): - pass - t = {str: TempType} - types.registry.register(t) - self.assertNotEqual(types.registry.dict_types, set()) - types.registry._unregister(t) - # Second call should not raise an exception - types.registry._unregister(t) - self.assertEqual(types.registry.dict_types, set()) - - def test_reregister(self): - class TempType(object): - pass - types.registry.register(TempType) - v = types.registry.lookup('TempType') - self.assertIs(v, TempType) - types.registry.reregister(TempType) - after = types.registry.lookup('TempType') - self.assertIs(after, TempType) - - def test_reregister_and_add_attr(self): - class TempType(object): - pass - types.registry.register(TempType) - attrs = types.list_attributes(TempType) - self.assertEqual(attrs, []) - TempType.one = str - types.registry.reregister(TempType) - after = types.list_attributes(TempType) - self.assertNotEqual(after, []) - - def test_non_registered_complex_type(self): - class TempType(types.Base): - __registry__ = None - - self.assertFalse(types.iscomplex(TempType)) - types.registry.register(TempType) - self.assertTrue(types.iscomplex(TempType))