Browse Source

Add wsme core types, remove WSME

The header for the file types.py denotes its dual-licensed status as
MIT with copyright to the original WSME authors, plus apache licensed
as part of Ironic.

Story: 1651346
Task: 10551

Change-Id: I986cc4a936c8679e932463ff3c91d1876a713196
changes/90/704490/11
Steve Baker 2 years ago
parent
commit
44cc6dd792
  1. 1
      doc/requirements.txt
  2. 2
      doc/source/conf.py
  3. 771
      ironic/api/types.py
  4. 18
      ironic/common/exception.py
  5. 628
      ironic/tests/unit/api/test_types.py
  6. 1
      requirements.txt

1
doc/requirements.txt

@ -3,6 +3,5 @@ os-api-ref>=1.4.0 # Apache-2.0
reno>=3.1.0 # Apache-2.0
sphinx>=2.0.0,!=2.1.0 # BSD
sphinxcontrib-apidoc>=0.2.0 # BSD
sphinxcontrib-pecanwsme>=0.10.0 # Apache-2.0
sphinxcontrib-seqdiag>=0.8.4 # BSD
sphinxcontrib-svg2pdfconverter>=0.1.0 # BSD

2
doc/source/conf.py

@ -34,8 +34,6 @@ sys.path.insert(0, os.path.join(os.path.abspath('.'), '_exts'))
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.viewcode',
'sphinx.ext.graphviz',
'sphinxcontrib.httpdomain',
'sphinxcontrib.pecanwsme.rest',
'sphinxcontrib.seqdiag',
'sphinxcontrib.apidoc',
'sphinxcontrib.rsvgconverter',

771
ironic/api/types.py

@ -1,38 +1,749 @@
# coding: utf-8
#
# Copyright 2020 Red Hat, Inc.
# All Rights Reserved.
# Copyright 2011-2019 the WSME authors and contributors
# (See https://opendev.org/x/wsme/)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# This module is part of WSME and is also released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
#
# http://www.apache.org/licenses/LICENSE-2.0
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from wsme.types import ArrayType # noqa
from wsme.types import Base # noqa
from wsme.types import DictType # noqa
from wsme.types import Enum # noqa
from wsme.types import File # noqa
from wsme.types import IntegerType # noqa
from wsme.types import iscomplex # noqa
from wsme.types import isusertype # noqa
from wsme.types import list_attributes # noqa
from wsme.types import registry # noqa
from wsme.types import StringType # noqa
from wsme.types import text # noqa
from wsme.types import Unset # noqa
from wsme.types import UnsetType # noqa
from wsme.types import UserType # noqa
from wsme.types import validate_value # noqa
from wsme.types import wsattr # noqa
from wsme.types import wsproperty # noqa
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import datetime
import decimal
import inspect
import io
import re
import weakref
from oslo_log import log
from ironic.common import exception
LOG = log.getLogger(__name__)
pod_types = (int, bytes, str, float, bool)
native_types = pod_types + (datetime.datetime, decimal.Decimal)
_promotable_types = (int, str, bytes)
class ArrayType(object):
def __init__(self, item_type):
if iscomplex(item_type):
self._item_type = weakref.ref(item_type)
else:
self._item_type = item_type
def __hash__(self):
return hash(self.item_type)
def __eq__(self, other):
return isinstance(other, ArrayType) \
and self.item_type == other.item_type
def sample(self):
return [getattr(self.item_type, 'sample', self.item_type)()]
@property
def item_type(self):
if isinstance(self._item_type, weakref.ref):
return self._item_type()
else:
return self._item_type
def validate(self, value):
if value is None:
return
if not isinstance(value, list):
raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
self.item_type, type(value)
))
return [
validate_value(self.item_type, item)
for item in value
]
class DictType(object):
def __init__(self, key_type, value_type):
if key_type not in (int, bytes, str, float, bool):
raise ValueError("Dictionaries key can only be a pod type")
self.key_type = key_type
if iscomplex(value_type):
self._value_type = weakref.ref(value_type)
else:
self._value_type = value_type
def __hash__(self):
return hash((self.key_type, self.value_type))
def sample(self):
key = getattr(self.key_type, 'sample', self.key_type)()
value = getattr(self.value_type, 'sample', self.value_type)()
return {key: value}
@property
def value_type(self):
if isinstance(self._value_type, weakref.ref):
return self._value_type()
else:
return self._value_type
def validate(self, value):
if not isinstance(value, dict):
raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
self.key_type, self.value_type, type(value)
))
return dict((
(
validate_value(self.key_type, key),
validate_value(self.value_type, v)
) for key, v in value.items()
))
class UserType(object):
basetype = None
name = None
def validate(self, value):
return value
def tobasetype(self, value):
return value
def frombasetype(self, value):
return value
def isusertype(class_):
return isinstance(class_, UserType)
class BinaryType(UserType):
"""A user type that use base64 strings to carry binary data.
"""
basetype = bytes
name = 'binary'
def tobasetype(self, value):
if value is None:
return None
return base64.encodestring(value)
def frombasetype(self, value):
if value is None:
return None
return base64.decodestring(value)
#: The binary almost-native type
binary = BinaryType()
class IntegerType(UserType):
"""A simple integer type. Can validate a value range.
:param minimum: Possible minimum value
:param maximum: Possible maximum value
Example::
Price = IntegerType(minimum=1)
"""
basetype = int
name = "integer"
def __init__(self, minimum=None, maximum=None):
self.minimum = minimum
self.maximum = maximum
@staticmethod
def frombasetype(value):
return int(value) if value is not None else None
def validate(self, value):
if self.minimum is not None and value < self.minimum:
error = 'Value should be greater or equal to %s' % self.minimum
raise ValueError(error)
if self.maximum is not None and value > self.maximum:
error = 'Value should be lower or equal to %s' % self.maximum
raise ValueError(error)
return value
class StringType(UserType):
"""A simple string type. Can validate a length and a pattern.
:param min_length: Possible minimum length
:param max_length: Possible maximum length
:param pattern: Possible string pattern
Example::
Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$')
"""
basetype = str
name = "string"
def __init__(self, min_length=None, max_length=None, pattern=None):
self.min_length = min_length
self.max_length = max_length
if isinstance(pattern, str):
self.pattern = re.compile(pattern)
else:
self.pattern = pattern
def validate(self, value):
if not isinstance(value, self.basetype):
error = 'Value should be string'
raise ValueError(error)
if self.min_length is not None and len(value) < self.min_length:
error = 'Value should have a minimum character requirement of %s' \
% self.min_length
raise ValueError(error)
if self.max_length is not None and len(value) > self.max_length:
error = 'Value should have a maximum character requirement of %s' \
% self.max_length
raise ValueError(error)
if self.pattern is not None and not self.pattern.search(value):
error = 'Value should match the pattern %s' % self.pattern.pattern
raise ValueError(error)
return value
class Enum(UserType):
"""A simple enumeration type. Can be based on any non-complex type.
:param basetype: The actual data type
:param values: A set of possible values
If nullable, 'None' should be added the values set.
Example::
Gender = Enum(str, 'male', 'female')
Specie = Enum(str, 'cat', 'dog')
"""
def __init__(self, basetype, *values, **kw):
self.basetype = basetype
self.values = set(values)
name = kw.pop('name', None)
if name is None:
name = "Enum(%s)" % ', '.join((str(v) for v in values))
self.name = name
def validate(self, value):
if value not in self.values:
raise ValueError("Value should be one of: %s" %
', '.join(map(str, self.values)))
return value
def tobasetype(self, value):
return value
def frombasetype(self, value):
return value
class UnsetType(object):
def __bool__(self):
return False
def __repr__(self):
return 'Unset'
Unset = UnsetType()
def validate_value(datatype, value):
if value in (Unset, None):
return value
# Try to promote the data type to one of our complex types.
if isinstance(datatype, list):
datatype = ArrayType(datatype[0])
elif isinstance(datatype, dict):
datatype = DictType(*list(datatype.items())[0])
# If the datatype has its own validator, use that.
if hasattr(datatype, 'validate'):
return datatype.validate(value)
# Do type promotion/conversion and data validation for builtin
# types.
v_type = type(value)
if datatype == int:
if v_type in _promotable_types:
try:
# Try to turn the value into an int
value = datatype(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is float and v_type in _promotable_types:
try:
value = float(value)
except ValueError:
# An error is raised at the end of the function
# when the types don't match.
pass
elif datatype is str and isinstance(value, bytes):
value = value.decode()
elif datatype is bytes and isinstance(value, str):
value = value.encode()
if not isinstance(value, datatype):
raise ValueError(
"Wrong type. Expected '%s', got '%s'" % (
datatype, v_type
))
return value
def iscomplex(datatype):
return inspect.isclass(datatype) \
and '_wsme_attributes' in datatype.__dict__
class wsproperty(property):
"""A specialised :class:`property` to define typed-property on complex types.
Example::
class MyComplexType(Base):
def get_aint(self):
return self._aint
def set_aint(self, value):
assert avalue < 10 # Dummy input validation
self._aint = value
aint = wsproperty(int, get_aint, set_aint, mandatory=True)
"""
def __init__(self, datatype, fget, fset=None,
mandatory=False, doc=None, name=None):
property.__init__(self, fget, fset)
#: The property name in the parent python class
self.key = None
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
#: property data type
self.datatype = datatype
#: True if the property is mandatory
self.mandatory = mandatory
class wsattr(object):
"""Complex type attribute definition.
Example::
class MyComplexType(ctypes.Base):
optionalvalue = int
mandatoryvalue = wsattr(int, mandatory=True)
named_value = wsattr(int, name='named.value')
After inspection, the non-wsattr attributes will be replaced, and
the above class will be equivalent to::
class MyComplexType(ctypes.Base):
optionalvalue = wsattr(int)
mandatoryvalue = wsattr(int, mandatory=True)
"""
def __init__(self, datatype, mandatory=False, name=None, default=Unset,
readonly=False):
#: The attribute name in the parent python class.
#: Set by :func:`inspect_class`
self.key = None # will be set by class inspection
#: The attribute name on the public of the api.
#: Defaults to :attr:`key`
self.name = name
self._datatype = (datatype,)
#: True if the attribute is mandatory
self.mandatory = mandatory
#: Default value. The attribute will return this instead
#: of :data:`Unset` if no value has been set.
self.default = default
#: If True value cannot be set from json/xml input data
self.readonly = readonly
self.complextype = None
def _get_dataholder(self, instance):
dataholder = getattr(instance, '_wsme_dataholder', None)
if dataholder is None:
dataholder = instance._wsme_DataHolderClass()
instance._wsme_dataholder = dataholder
return dataholder
def __get__(self, instance, owner):
if instance is None:
return self
return getattr(
self._get_dataholder(instance),
self.key,
self.default
)
def __set__(self, instance, value):
try:
value = validate_value(self.datatype, value)
except (ValueError, TypeError) as e:
raise exception.InvalidInput(self.name, value, str(e))
dataholder = self._get_dataholder(instance)
if value is Unset:
if hasattr(dataholder, self.key):
delattr(dataholder, self.key)
else:
setattr(dataholder, self.key, value)
def __delete__(self, instance):
self.__set__(instance, Unset)
def _get_datatype(self):
if isinstance(self._datatype, tuple):
self._datatype = \
self.complextype().__registry__.resolve_type(self._datatype[0])
if isinstance(self._datatype, weakref.ref):
return self._datatype()
if isinstance(self._datatype, list):
return [
item() if isinstance(item, weakref.ref) else item
for item in self._datatype
]
return self._datatype
def _set_datatype(self, datatype):
self._datatype = datatype
#: attribute data type. Can be either an actual type,
#: or a type name, in which case the actual type will be
#: determined when needed (generally just before scanning the api).
datatype = property(_get_datatype, _set_datatype)
def iswsattr(attr):
if inspect.isfunction(attr) or inspect.ismethod(attr):
return False
if isinstance(attr, property) and not isinstance(attr, wsproperty):
return False
return True
def sort_attributes(class_, attributes):
"""Sort a class attributes list.
3 mechanisms are attempted :
#. Look for a _wsme_attr_order attribute on the class. This allow
to define an arbitrary order of the attributes (useful for
generated types).
#. Access the object source code to find the declaration order.
#. Sort by alphabetically
"""
if not len(attributes):
return
attrs = dict((a.key, a) for a in attributes)
if hasattr(class_, '_wsme_attr_order'):
names_order = class_._wsme_attr_order
else:
names = attrs.keys()
names_order = []
try:
lines = []
for cls in inspect.getmro(class_):
if cls is object:
continue
lines[len(lines):] = inspect.getsourcelines(cls)[0]
for line in lines:
line = line.strip().replace(" ", "")
if '=' in line:
aname = line[:line.index('=')]
if aname in names and aname not in names_order:
names_order.append(aname)
if len(names_order) < len(names):
names_order.extend((
name for name in names if name not in names_order))
assert len(names_order) == len(names)
except (TypeError, IOError):
names_order = list(names)
names_order.sort()
attributes[:] = [attrs[name] for name in names_order]
def inspect_class(class_):
"""Extract a list of (name, wsattr|wsproperty) for the given class"""
attributes = []
for name, attr in inspect.getmembers(class_, iswsattr):
if name.startswith('_'):
continue
if inspect.isroutine(attr):
continue
if isinstance(attr, (wsattr, wsproperty)):
attrdef = attr
else:
if (attr not in native_types
and (inspect.isclass(attr) or isinstance(attr, (list, dict)))):
register_type(attr)
attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr)
attrdef.key = name
if attrdef.name is None:
attrdef.name = name
attrdef.complextype = weakref.ref(class_)
attributes.append(attrdef)
setattr(class_, name, attrdef)
sort_attributes(class_, attributes)
return attributes
def list_attributes(class_):
"""Returns a list of a complex type attributes."""
if not iscomplex(class_):
raise TypeError("%s is not a registered type")
return class_._wsme_attributes
def make_dataholder(class_):
# the slots are computed outside the class scope to avoid
# 'attr' to pullute the class namespace, which leads to weird
# things if one of the slots is named 'attr'.
slots = [attr.key for attr in class_._wsme_attributes]
class DataHolder(object):
__slots__ = slots
DataHolder.__name__ = class_.__name__ + 'DataHolder'
return DataHolder
class Registry(object):
def __init__(self):
self._complex_types = []
self.array_types = set()
self.dict_types = set()
@property
def complex_types(self):
return [t() for t in self._complex_types if t()]
def register(self, class_):
"""Make sure a type is registered.
It is automatically called by :class:`expose() <expose.expose>`
and :class:`validate() <expose.validate>`.
Unless you want to control when the class inspection is done there
is no need to call it.
"""
if class_ is None or \
class_ in native_types or \
isinstance(class_, UserType) or iscomplex(class_) or \
isinstance(class_, ArrayType) or isinstance(class_, DictType):
return class_
if isinstance(class_, list):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = ArrayType(class_[0])
self.register(dt.item_type)
self.array_types.add(dt)
return dt
if isinstance(class_, dict):
if len(class_) != 1:
raise ValueError("Cannot register type %s" % repr(class_))
dt = DictType(*list(class_.items())[0])
self.register(dt.value_type)
self.dict_types.add(dt)
return dt
class_._wsme_attributes = None
class_._wsme_attributes = inspect_class(class_)
class_._wsme_DataHolderClass = make_dataholder(class_)
class_.__registry__ = self
self._complex_types.append(weakref.ref(class_))
return class_
def reregister(self, class_):
"""Register a type which may already have been registered.
"""
self._unregister(class_)
return self.register(class_)
def _unregister(self, class_):
"""Remove a previously registered type.
"""
# Clear the existing attribute reference so it is rebuilt if
# the class is registered again later.
if hasattr(class_, '_wsme_attributes'):
del class_._wsme_attributes
# FIXME(dhellmann): This method does not recurse through the
# types like register() does. Should it?
if isinstance(class_, list):
at = ArrayType(class_[0])
try:
self.array_types.remove(at)
except KeyError:
pass
elif isinstance(class_, dict):
key_type, value_type = list(class_.items())[0]
self.dict_types = set(
dt for dt in self.dict_types
if (dt.key_type, dt.value_type) != (key_type, value_type)
)
# We can't use remove() here because the items in
# _complex_types are weakref objects pointing to the classes,
# so we can't compare with them directly.
self._complex_types = [
ct for ct in self._complex_types
if ct() is not class_
]
def lookup(self, typename):
LOG.debug('Lookup %s', typename)
modname = None
if '.' in typename:
modname, typename = typename.rsplit('.', 1)
for ct in self._complex_types:
ct = ct()
if ct is not None and typename == ct.__name__ and (
modname is None or modname == ct.__module__):
return ct
def resolve_type(self, type_):
if isinstance(type_, str):
return self.lookup(type_)
if isinstance(type_, list):
type_ = ArrayType(type_[0])
if isinstance(type_, dict):
type_ = DictType(list(type_.keys())[0], list(type_.values())[0])
if isinstance(type_, ArrayType):
type_ = ArrayType(self.resolve_type(type_.item_type))
self.array_types.add(type_)
elif isinstance(type_, DictType):
type_ = DictType(
type_.key_type,
self.resolve_type(type_.value_type)
)
self.dict_types.add(type_)
else:
type_ = self.register(type_)
return type_
# Default type registry
registry = Registry()
def register_type(class_):
return registry.register(class_)
class BaseMeta(type):
def __new__(cls, name, bases, dct):
if bases and bases[0] is not object and '__registry__' not in dct:
dct['__registry__'] = registry
return type.__new__(cls, name, bases, dct)
def __init__(cls, name, bases, dct):
if bases and bases[0] is not object and cls.__registry__:
cls.__registry__.register(cls)
class Base(metaclass=BaseMeta):
"""Base type for complex types"""
def __init__(self, **kw):
for key, value in kw.items():
if hasattr(self, key):
setattr(self, key, value)
class File(Base):
"""A complex type that represents a file.
In the particular case of protocol accepting form encoded data as
input, File can be loaded from a form file field.
"""
#: The file name
filename = str
#: Mime type of the content
contenttype = str
def _get_content(self):
if self._content is None and self._file:
self._content = self._file.read()
return self._content
def _set_content(self, value):
self._content = value
self._file = None
#: File content
content = wsproperty(binary, _get_content, _set_content)
def __init__(self, filename=None, file=None, content=None,
contenttype=None, fieldstorage=None):
self.filename = filename
self.contenttype = contenttype
self._file = file
self._content = content
if fieldstorage is not None:
if fieldstorage.file:
self._file = fieldstorage.file
self.filename = fieldstorage.filename
self.contenttype = str(fieldstorage.type)
else:
self._content = fieldstorage.value
@property
def file(self):
if self._file is None and self._content:
self._file = io.BytesIO(self._content)
return self._file
class Response(object):

18
ironic/common/exception.py

@ -20,7 +20,6 @@ from http import client as http_client
from ironic_lib.exception import IronicException
from oslo_log import log as logging
import wsme
from ironic.common.i18n import _
@ -713,8 +712,21 @@ class IBMCConnectionError(IBMCError):
_msg_fmt = _("IBMC connection failed for node %(node)s: %(error)s")
class ClientSideError(wsme.exc.ClientSideError):
pass
class ClientSideError(RuntimeError):
def __init__(self, msg=None, status_code=400, faultcode='Client'):
self.msg = msg
self.code = status_code
self.faultcode = faultcode
super(ClientSideError, self).__init__(self.faultstring)
@property
def faultstring(self):
if self.msg is None:
return str(self)
elif isinstance(self.msg, str):
return self.msg
else:
return str(self.msg)
class NodeIsRetired(Invalid):

628
ironic/tests/unit/api/test_types.py

@ -0,0 +1,628 @@
# coding: utf-8
#
# Copyright 2011-2019 the WSME authors and contributors
# (See https://opendev.org/x/wsme/)
#
# This module is part of WSME and is also released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
from ironic.api import types
from ironic.common import exception as exc
from ironic.tests import base as test_base
def gen_class():
d = {}
exec('''class tmp(object): pass''', d)
return d['tmp']
class TestTypes(test_base.TestCase):
def setUp(self):
super(TestTypes, self).setUp()
types.registry = types.Registry()
def test_default_usertype(self):
class MyType(types.UserType):
basetype = str
My = MyType()
assert My.validate('a') == 'a'
assert My.tobasetype('a') == 'a'
assert My.frombasetype('a') == 'a'
def test_unset(self):
u = types.Unset
assert not u
def test_flat_type(self):
class Flat(object):
aint = int
abytes = bytes
atext = str
afloat = float
types.register_type(Flat)
assert len(Flat._wsme_attributes) == 4
attrs = Flat._wsme_attributes
print(attrs)
assert attrs[0].key == 'aint'
assert attrs[0].name == 'aint'
assert isinstance(attrs[0], types.wsattr)
assert attrs[0].datatype == int
assert attrs[0].mandatory is False
assert attrs[1].key == 'abytes'
assert attrs[1].name == 'abytes'
assert attrs[2].key == 'atext'
assert attrs[2].name == 'atext'
assert attrs[3].key == 'afloat'
assert attrs[3].name == 'afloat'
def test_private_attr(self):
class WithPrivateAttrs(object):
_private = 12
types.register_type(WithPrivateAttrs)
assert len(WithPrivateAttrs._wsme_attributes) == 0
def test_attribute_order(self):
class ForcedOrder(object):
_wsme_attr_order = ('a2', 'a1', 'a3')
a1 = int
a2 = int
a3 = int
types.register_type(ForcedOrder)
print(ForcedOrder._wsme_attributes)
assert ForcedOrder._wsme_attributes[0].key == 'a2'
assert ForcedOrder._wsme_attributes[1].key == 'a1'
assert ForcedOrder._wsme_attributes[2].key == 'a3'
c = gen_class()
print(c)
types.register_type(c)
del c._wsme_attributes
c.a2 = int
c.a1 = int
c.a3 = int
types.register_type(c)
assert c._wsme_attributes[0].key == 'a1', c._wsme_attributes[0].key
assert c._wsme_attributes[1].key == 'a2'
assert c._wsme_attributes[2].key == 'a3'
def test_wsproperty(self):
class WithWSProp(object):
def __init__(self):
self._aint = 0
def get_aint(self):
return self._aint
def set_aint(self, value):
self._aint = value
aint = types.wsproperty(int, get_aint, set_aint, mandatory=True)
types.register_type(WithWSProp)
print(WithWSProp._wsme_attributes)
assert len(WithWSProp._wsme_attributes) == 1
a = WithWSProp._wsme_attributes[0]
assert a.key == 'aint'
assert a.datatype == int
assert a.mandatory
o = WithWSProp()
o.aint = 12
assert o.aint == 12
def test_nested(self):
class Inner(object):
aint = int
class Outer(object):
inner = Inner
types.register_type(Outer)
assert hasattr(Inner, '_wsme_attributes')
assert len(Inner._wsme_attributes) == 1
def test_inspect_with_inheritance(self):
class Parent(object):
parent_attribute = int
class Child(Parent):
child_attribute = int
types.register_type(Parent)
types.register_type(Child)
assert len(Child._wsme_attributes) == 2
def test_selfreftype(self):
class SelfRefType(object):
pass
SelfRefType.parent = SelfRefType
types.register_type(SelfRefType)
def test_inspect_with_property(self):
class AType(object):
@property
def test(self):
return 'test'
types.register_type(AType)
assert len(AType._wsme_attributes) == 0
assert AType().test == 'test'
def test_enum(self):
aenum = types.Enum(str, 'v1', 'v2')
assert aenum.basetype is str
class AType(object):
a = aenum
types.register_type(AType)
assert AType.a.datatype is aenum
obj = AType()
obj.a = 'v1'
assert obj.a == 'v1', repr(obj.a)
self.assertRaisesRegexp(exc.InvalidInput,
"Invalid input for field/attribute a. \
Value: 'v3'. Value should be one of: v., v.",
setattr,
obj,
'a',
'v3')
def test_attribute_validation(self):
class AType(object):
alist = [int]
aint = int
types.register_type(AType)
obj = AType()
obj.alist = [1, 2, 3]
assert obj.alist == [1, 2, 3]
obj.aint = 5
assert obj.aint == 5
self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', 12)
self.assertRaises(exc.InvalidInput, setattr, obj, 'alist', [2, 'a'])
def test_attribute_validation_minimum(self):
class ATypeInt(object):
attr = types.IntegerType(minimum=1, maximum=5)
types.register_type(ATypeInt)
obj = ATypeInt()
obj.attr = 2
# comparison between 'zero' value and intger minimum (1) raises a
# TypeError which must be wrapped into an InvalidInput exception
self.assertRaises(exc.InvalidInput, setattr, obj, 'attr', 'zero')
def test_text_attribute_conversion(self):
class SType(object):
atext = str
abytes = bytes
types.register_type(SType)
obj = SType()
obj.atext = b'somebytes'
assert obj.atext == 'somebytes'
assert isinstance(obj.atext, str)
obj.abytes = 'sometext'
assert obj.abytes == b'sometext'
assert isinstance(obj.abytes, bytes)
def test_named_attribute(self):
class ABCDType(object):
a_list = types.wsattr([int], name='a.list')
astr = str
types.register_type(ABCDType)
assert len(ABCDType._wsme_attributes) == 2
attrs = ABCDType._wsme_attributes
assert attrs[0].key == 'a_list', attrs[0].key
assert attrs[0].name == 'a.list', attrs[0].name
assert attrs[1].key == 'astr', attrs[1].key
assert attrs[1].name == 'astr', attrs[1].name
def test_wsattr_del(self):
class MyType(object):
a = types.wsattr(int)
types.register_type(MyType)
value = MyType()
value.a = 5
assert value.a == 5
del value.a
assert value.a is types.Unset
def test_validate_dict(self):
assert types.validate_value({int: str}, {1: '1', 5: '5'})
self.assertRaises(ValueError, types.validate_value,
{int: str}, [])
assert types.validate_value({int: str}, {'1': '1', 5: '5'})
self.assertRaises(ValueError, types.validate_value,
{int: str}, {1: 1, 5: '5'})
def test_validate_list_valid(self):
assert types.validate_value([int], [1, 2])
assert types.validate_value([int], ['5'])
def test_validate_list_empty(self):
assert types.validate_value([int], []) == []
def test_validate_list_none(self):
v = types.ArrayType(int)
assert v.validate(None) is None
def test_validate_list_invalid_member(self):
self.assertRaises(ValueError, types.validate_value, [int],
['not-a-number'])
def test_validate_list_invalid_type(self):
self.assertRaises(ValueError, types.validate_value, [int], 1)
def test_validate_float(self):
self.assertEqual(types.validate_value(float, 1), 1.0)
self.assertEqual(types.validate_value(float, '1'), 1.0)
self.assertEqual(types.validate_value(float, 1.1), 1.1)
self.assertRaises(ValueError, types.validate_value, float, [])
self.assertRaises(ValueError, types.validate_value, float,
'not-a-float')
def test_validate_int(self):
self.assertEqual(types.validate_value(int, 1), 1)
self.assertEqual(types.validate_value(int, '1'), 1)
self.assertRaises(ValueError, types.validate_value, int, 1.1)
def test_validate_integer_type(self):
v = types.IntegerType(minimum=1, maximum=10)
v.validate(1)
v.validate(5)
v.validate(10)
self.assertRaises(ValueError, v.validate, 0)
self.assertRaises(ValueError, v.validate, 11)
def test_validate_string_type(self):
v = types.StringType(min_length=1, max_length=10,
pattern='^[a-zA-Z0-9]*$')
v.validate('1')
v.validate('12345')
v.validate('1234567890')
self.assertRaises(ValueError, v.validate, '')
self.assertRaises(ValueError, v.validate, '12345678901')
# Test a pattern validation
v.validate('a')
v.validate('A')
self.assertRaises(ValueError, v.validate, '_')
def test_validate_string_type_precompile(self):
precompile = re.compile('^[a-zA-Z0-9]*$')
v = types.StringType(min_length=1, max_length=10,
pattern=precompile)
# Test a pattern validation
v.validate('a')
v.validate('A')
self.assertRaises(ValueError, v.validate, '_')
def test_validate_string_type_pattern_exception_message(self):
regex = '^[a-zA-Z0-9]*$'
v = types.StringType(pattern=regex)
try:
v.validate('_')
self.assertFail()
except ValueError as e:
self.assertIn(regex, str(e))
def test_register_invalid_array(self):
self.assertRaises(ValueError, types.register_type, [])
self.assertRaises(ValueError, types.register_type, [int, str])
self.assertRaises(AttributeError, types.register_type, [1])
def test_register_invalid_dict(self):
self.assertRaises(ValueError, types.register_type, {})
self.assertRaises(ValueError, types.register_type,
{int: str, str: int})
self.assertRaises(ValueError, types.register_type,
{types.Unset: str})
def test_list_attribute_no_auto_register(self):
class MyType(object):
aint = int
assert not hasattr(MyType, '_wsme_attributes')
self.assertRaises(TypeError, types.list_attributes, MyType)
assert not hasattr(MyType, '_wsme_attributes')
def test_list_of_complextypes(self):
class A(object):
bs = types.wsattr(['B'])
class B(object):
i = int
types.register_type(A)
types.register_type(B)
assert A.bs.datatype.item_type is B
def test_cross_referenced_types(self):
class A(object):
b = types.wsattr('B')
class B(object):
a = A
types.register_type(A)
types.register_type(B)
assert A.b.datatype is B
def test_base(self):
class B1(types.Base):
b2 = types.wsattr('B2')
class B2(types.Base):
b2 = types.wsattr('B2')
assert B1.b2.datatype is B2, repr(B1.b2.datatype)
assert B2.b2.datatype is B2
def test_base_init(self):
class C1(types.Base):
s = str
c = C1(s='test')
assert c.s == 'test'
def test_array_eq(self):
ell = [types.ArrayType(str)]
assert types.ArrayType(str) in ell
def test_array_sample(self):
s = types.ArrayType(str).sample()
assert isinstance(s, list)
assert s
assert s[0] == ''
def test_dict_sample(self):
s = types.DictType(str, str).sample()
assert isinstance(s, dict)
assert s
assert s == {'': ''}
def test_binary_to_base(self):
import base64
assert types.binary.tobasetype(None) is None
expected = base64.encodestring(b'abcdef')
assert types.binary.tobasetype(b'abcdef') == expected
def test_binary_from_base(self):
import base64
assert types.binary.frombasetype(None) is None
encoded = base64.encodestring(b'abcdef')
assert types.binary.frombasetype(encoded) == b'abcdef'
def test_wsattr_weakref_datatype(self):
# If the datatype inside the wsattr ends up a weakref, it
# should be converted to the real type when accessed again by
# the property getter.
import weakref
a = types.wsattr(int)
a.datatype = weakref.ref(int)
assert a.datatype is int
def test_wsattr_list_datatype(self):
# If the datatype inside the wsattr ends up a list of weakrefs
# to types, it should be converted to the real types when
# accessed again by the property getter.
import weakref
a = types.wsattr(int)
a.datatype = [weakref.ref(int)]
assert isinstance(a.datatype, list)
assert a.datatype[0] is int
def test_file_get_content_by_reading(self):
class buffer:
def read(self):
return 'abcdef'
f = types.File(file=buffer())
assert f.content == 'abcdef'
def test_file_content_overrides_file(self):
class buffer:
def read(self):
return 'from-file'
f = types.File(content='from-content', file=buffer())
assert f.content == 'from-content'
def test_file_setting_content_discards_file(self):
class buffer:
def read(self):
return 'from-file'
f = types.File(file=buffer())
f.content = 'from-content'
assert f.content == 'from-content'
def test_file_field_storage(self):
class buffer:
def read(self):
return 'from-file'
class fieldstorage:
filename = 'static.json'
file = buffer()
type = 'application/json'
f = types.File(fieldstorage=fieldstorage)
assert f.content == 'from-file'
def test_file_field_storage_value(self):
class buffer:
def read(self):
return 'from-file'
class fieldstorage:
filename = 'static.json'
file = None
type = 'application/json'
value = 'from-value'
f = types.File(fieldstorage=fieldstorage)
assert f.content == 'from-value'
def test_file_property_file(self):
class buffer:
def read(self):
return 'from-file'
buf = buffer()
f = types.File(file=buf)
assert f.file is buf
def test_file_property_content(self):
class buffer:
def read(self):
return 'from-file'
f = types.File(content=b'from-content')
assert f.file.read() == b'from-content'
def test_unregister(self):
class TempType(object):
pass
types.registry.register(TempType)
v = types.registry.lookup('TempType')
self.assertIs(v, TempType)
types.registry._unregister(TempType)
after = types.registry.lookup('TempType')
self.assertIsNone(after)
def test_unregister_twice(self):
class TempType(object):
pass
types.registry.register(TempType)
v = types.registry.lookup('TempType')
self.assertIs(v, TempType)
types.registry._unregister(TempType)
# Second call should not raise an exception
types.registry._unregister(TempType)
after = types.registry.lookup('TempType')
self.assertIsNone(after)
def test_unregister_array_type(self):
class TempType(object):
pass
t = [TempType]
types.registry.register(t)
self.assertNotEqual(types.registry.array_types, set())
types.registry._unregister(t)
self.assertEqual(types.registry.array_types, set())
def test_unregister_array_type_twice(self):
class TempType(object):
pass
t = [TempType]
types.registry.register(t)
self.assertNotEqual(types.registry.array_types, set())
types.registry._unregister(t)
# Second call should not raise an exception
types.registry._unregister(t)
self.assertEqual(types.registry.array_types, set())
def test_unregister_dict_type(self):
class TempType(object):
pass
t = {str: TempType}
types.registry.register(t)
self.assertNotEqual(types.registry.dict_types, set())
types.registry._unregister(t)
self.assertEqual(types.registry.dict_types, set())
def test_unregister_dict_type_twice(self):
class TempType(object):
pass
t = {str: TempType}
types.registry.register(t)
self.assertNotEqual(types.registry.dict_types, set())
types.registry._unregister(t)
# Second call should not raise an exception
types.registry._unregister(t)
self.assertEqual(types.registry.dict_types, set())
def test_reregister(self):
class TempType(object):
pass
types.registry.register(TempType)
v = types.registry.lookup('TempType')
self.assertIs(v, TempType)
types.registry.reregister(TempType)
after = types.registry.lookup('TempType')
self.assertIs(after, TempType)
def test_reregister_and_add_attr(self):
class TempType(object):
pass
types.registry.register(TempType)
attrs = types.list_attributes(TempType)
self.assertEqual(attrs, [])
TempType.one = str
types.registry.reregister(TempType)
after = types.list_attributes(TempType)
self.assertNotEqual(after, [])
def test_non_registered_complex_type(self):
class TempType(types.Base):
__registry__ = None
self.assertFalse(types.iscomplex(TempType))
types.registry.register(TempType)
self.assertTrue(types.iscomplex(TempType))

1
requirements.txt

@ -34,7 +34,6 @@ pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD
requests>=2.14.2 # Apache-2.0
rfc3986>=0.3.1 # Apache-2.0
jsonpatch!=1.20,>=1.16 # BSD
WSME>=0.9.3 # MIT
Jinja2>=2.10 # BSD License (3 clause)
keystonemiddleware>=4.17.0 # Apache-2.0
oslo.messaging>=5.29.0 # Apache-2.0

Loading…
Cancel
Save