
Change-Id: I431bf688ca51825622d726f01fed4bb1eea5c564 Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
840 lines
24 KiB
Python
840 lines
24 KiB
Python
import base64
|
|
import datetime
|
|
import decimal
|
|
import inspect
|
|
import io
|
|
import logging
|
|
import netaddr
|
|
import re
|
|
import sys
|
|
import uuid
|
|
import weakref
|
|
|
|
from wsme import exc
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
#: Byte string.
|
|
#: Its use should be restricted to
|
|
#: pure ascii strings as the protocols will generally not be
|
|
#: be able to send non-unicode strings.
|
|
#: To transmit binary strings, use the :class:`binary` type
|
|
bytes = bytes
|
|
|
|
#: Unicode string.
|
|
text = str
|
|
|
|
|
|
class ArrayType(object):
|
|
def __init__(self, item_type):
|
|
if iscomplex(item_type):
|
|
self._item_type = weakref.ref(item_type)
|
|
else:
|
|
self._item_type = item_type
|
|
|
|
def __hash__(self):
|
|
return hash(self.item_type)
|
|
|
|
def __eq__(self, other):
|
|
return isinstance(other, ArrayType) \
|
|
and self.item_type == other.item_type
|
|
|
|
def sample(self):
|
|
return [getattr(self.item_type, 'sample', self.item_type)()]
|
|
|
|
@property
|
|
def item_type(self):
|
|
if isinstance(self._item_type, weakref.ref):
|
|
return self._item_type()
|
|
else:
|
|
return self._item_type
|
|
|
|
def validate(self, value):
|
|
if value is None:
|
|
return
|
|
if not isinstance(value, list):
|
|
raise ValueError("Wrong type. Expected '[%s]', got '%s'" % (
|
|
self.item_type, type(value)
|
|
))
|
|
return [
|
|
validate_value(self.item_type, item)
|
|
for item in value
|
|
]
|
|
|
|
|
|
class DictType(object):
|
|
def __init__(self, key_type, value_type):
|
|
if key_type not in pod_types:
|
|
raise ValueError("Dictionaries key can only be a pod type")
|
|
self.key_type = key_type
|
|
if iscomplex(value_type):
|
|
self._value_type = weakref.ref(value_type)
|
|
else:
|
|
self._value_type = value_type
|
|
|
|
def __hash__(self):
|
|
return hash((self.key_type, self.value_type))
|
|
|
|
def sample(self):
|
|
key = getattr(self.key_type, 'sample', self.key_type)()
|
|
value = getattr(self.value_type, 'sample', self.value_type)()
|
|
return {key: value}
|
|
|
|
@property
|
|
def value_type(self):
|
|
if isinstance(self._value_type, weakref.ref):
|
|
return self._value_type()
|
|
else:
|
|
return self._value_type
|
|
|
|
def validate(self, value):
|
|
if not isinstance(value, dict):
|
|
raise ValueError("Wrong type. Expected '{%s: %s}', got '%s'" % (
|
|
self.key_type, self.value_type, type(value)
|
|
))
|
|
return dict((
|
|
(
|
|
validate_value(self.key_type, key),
|
|
validate_value(self.value_type, v)
|
|
) for key, v in value.items()
|
|
))
|
|
|
|
|
|
class UserType(object):
|
|
basetype = None
|
|
name = None
|
|
|
|
def validate(self, value):
|
|
return value
|
|
|
|
def tobasetype(self, value):
|
|
return value
|
|
|
|
def frombasetype(self, value):
|
|
return value
|
|
|
|
|
|
def isusertype(class_):
|
|
return isinstance(class_, UserType)
|
|
|
|
|
|
class BinaryType(UserType):
|
|
"""
|
|
A user type that use base64 strings to carry binary data.
|
|
"""
|
|
basetype = bytes
|
|
name = 'binary'
|
|
|
|
def tobasetype(self, value):
|
|
if value is None:
|
|
return None
|
|
return base64.encodebytes(value)
|
|
|
|
def frombasetype(self, value):
|
|
if value is None:
|
|
return None
|
|
return base64.decodebytes(value)
|
|
|
|
|
|
#: The binary almost-native type
|
|
binary = BinaryType()
|
|
|
|
|
|
class IntegerType(UserType):
|
|
"""
|
|
A simple integer type. Can validate a value range.
|
|
|
|
:param minimum: Possible minimum value
|
|
:param maximum: Possible maximum value
|
|
|
|
Example::
|
|
|
|
Price = IntegerType(minimum=1)
|
|
|
|
"""
|
|
basetype = int
|
|
name = "integer"
|
|
|
|
def __init__(self, minimum=None, maximum=None):
|
|
self.minimum = minimum
|
|
self.maximum = maximum
|
|
|
|
@staticmethod
|
|
def frombasetype(value):
|
|
return int(value) if value is not None else None
|
|
|
|
def validate(self, value):
|
|
if self.minimum is not None and value < self.minimum:
|
|
error = 'Value should be greater or equal to %s' % self.minimum
|
|
raise ValueError(error)
|
|
|
|
if self.maximum is not None and value > self.maximum:
|
|
error = 'Value should be lower or equal to %s' % self.maximum
|
|
raise ValueError(error)
|
|
|
|
return value
|
|
|
|
|
|
class StringType(UserType):
|
|
"""
|
|
A simple string type. Can validate a length and a pattern.
|
|
|
|
:param min_length: Possible minimum length
|
|
:param max_length: Possible maximum length
|
|
:param pattern: Possible string pattern
|
|
|
|
Example::
|
|
|
|
Name = StringType(min_length=1, pattern='^[a-zA-Z ]*$')
|
|
|
|
"""
|
|
basetype = str
|
|
name = "string"
|
|
|
|
def __init__(self, min_length=None, max_length=None, pattern=None):
|
|
self.min_length = min_length
|
|
self.max_length = max_length
|
|
if isinstance(pattern, str):
|
|
self.pattern = re.compile(pattern)
|
|
else:
|
|
self.pattern = pattern
|
|
|
|
def validate(self, value):
|
|
if not isinstance(value, self.basetype):
|
|
error = 'Value should be string'
|
|
raise ValueError(error)
|
|
|
|
if self.min_length is not None and len(value) < self.min_length:
|
|
error = 'Value should have a minimum character requirement of %s' \
|
|
% self.min_length
|
|
raise ValueError(error)
|
|
|
|
if self.max_length is not None and len(value) > self.max_length:
|
|
error = 'Value should have a maximum character requirement of %s' \
|
|
% self.max_length
|
|
raise ValueError(error)
|
|
|
|
if self.pattern is not None and not self.pattern.search(value):
|
|
error = 'Value should match the pattern %s' % self.pattern.pattern
|
|
raise ValueError(error)
|
|
|
|
return value
|
|
|
|
|
|
class IPv4AddressType(UserType):
|
|
"""
|
|
A simple IPv4 type.
|
|
"""
|
|
basetype = str
|
|
name = "ipv4address"
|
|
|
|
@staticmethod
|
|
def validate(value):
|
|
try:
|
|
netaddr.IPAddress(value, version=4, flags=netaddr.INET_PTON)
|
|
except netaddr.AddrFormatError:
|
|
error = 'Value should be IPv4 format'
|
|
raise ValueError(error)
|
|
else:
|
|
return value
|
|
|
|
|
|
class IPv6AddressType(UserType):
|
|
"""
|
|
A simple IPv6 type.
|
|
|
|
This type represents IPv6 addresses in the short format.
|
|
"""
|
|
basetype = str
|
|
name = "ipv6address"
|
|
|
|
@staticmethod
|
|
def validate(value):
|
|
try:
|
|
netaddr.IPAddress(value, version=6, flags=netaddr.INET_PTON)
|
|
except netaddr.AddrFormatError:
|
|
error = 'Value should be IPv6 format'
|
|
raise ValueError(error)
|
|
else:
|
|
return value
|
|
|
|
|
|
class UuidType(UserType):
|
|
"""
|
|
A simple UUID type.
|
|
|
|
This type allows not only UUID having dashes but also UUID not
|
|
having dashes. For example, '6a0a707c-45ef-4758-b533-e55adddba8ce'
|
|
and '6a0a707c45ef4758b533e55adddba8ce' are distinguished as valid.
|
|
"""
|
|
basetype = str
|
|
name = "uuid"
|
|
|
|
@staticmethod
|
|
def validate(value):
|
|
try:
|
|
return str(uuid.UUID(value))
|
|
except (TypeError, ValueError, AttributeError):
|
|
error = 'Value should be UUID format'
|
|
raise ValueError(error)
|
|
|
|
|
|
class Enum(UserType):
|
|
"""
|
|
A simple enumeration type. Can be based on any non-complex type.
|
|
|
|
:param basetype: The actual data type
|
|
:param values: A set of possible values
|
|
|
|
If nullable, 'None' should be added the values set.
|
|
|
|
Example::
|
|
|
|
Gender = Enum(str, 'male', 'female')
|
|
Specie = Enum(str, 'cat', 'dog')
|
|
|
|
"""
|
|
def __init__(self, basetype, *values, **kw):
|
|
self.basetype = basetype
|
|
self.values = set(values)
|
|
name = kw.pop('name', None)
|
|
if name is None:
|
|
name = "Enum(%s)" % ', '.join((str(v) for v in values))
|
|
self.name = name
|
|
|
|
def validate(self, value):
|
|
if value not in self.values:
|
|
raise ValueError("Value should be one of: %s" %
|
|
', '.join(map(str, self.values)))
|
|
return value
|
|
|
|
def tobasetype(self, value):
|
|
return value
|
|
|
|
def frombasetype(self, value):
|
|
return value
|
|
|
|
|
|
class UnsetType(object):
|
|
if sys.version < '3':
|
|
def __nonzero__(self):
|
|
return False
|
|
else:
|
|
def __bool__(self):
|
|
return False
|
|
|
|
def __repr__(self):
|
|
return 'Unset'
|
|
|
|
|
|
Unset = UnsetType()
|
|
|
|
#: A special type that corresponds to the host framework request object.
|
|
#: It can only be used in the function parameters, and if so the request object
|
|
#: of the host framework will be passed to the function.
|
|
HostRequest = object()
|
|
|
|
|
|
pod_types = (int, bytes, str, float, bool)
|
|
dt_types = (datetime.date, datetime.time, datetime.datetime)
|
|
extra_types = (binary, decimal.Decimal)
|
|
native_types = pod_types + dt_types + extra_types
|
|
# The types for which we allow promotion to certain numbers.
|
|
_promotable_types = (int, str, bytes)
|
|
|
|
|
|
def iscomplex(datatype):
|
|
return inspect.isclass(datatype) \
|
|
and '_wsme_attributes' in datatype.__dict__
|
|
|
|
|
|
def isarray(datatype):
|
|
return isinstance(datatype, ArrayType)
|
|
|
|
|
|
def isdict(datatype):
|
|
return isinstance(datatype, DictType)
|
|
|
|
|
|
def validate_value(datatype, value):
|
|
if value in (Unset, None):
|
|
return value
|
|
|
|
# Try to promote the data type to one of our complex types.
|
|
if isinstance(datatype, list):
|
|
datatype = ArrayType(datatype[0])
|
|
elif isinstance(datatype, dict):
|
|
datatype = DictType(*list(datatype.items())[0])
|
|
|
|
# If the datatype has its own validator, use that.
|
|
if hasattr(datatype, 'validate'):
|
|
return datatype.validate(value)
|
|
|
|
# Do type promotion/conversion and data validation for builtin
|
|
# types.
|
|
v_type = type(value)
|
|
if datatype in (int,):
|
|
if v_type in _promotable_types:
|
|
try:
|
|
# Try to turn the value into an int
|
|
value = datatype(value)
|
|
except ValueError:
|
|
# An error is raised at the end of the function
|
|
# when the types don't match.
|
|
pass
|
|
elif datatype is float and v_type in _promotable_types:
|
|
try:
|
|
value = float(value)
|
|
except ValueError:
|
|
# An error is raised at the end of the function
|
|
# when the types don't match.
|
|
pass
|
|
elif datatype is str and isinstance(value, bytes):
|
|
value = value.decode()
|
|
elif datatype is bytes and isinstance(value, str):
|
|
value = value.encode()
|
|
|
|
if not isinstance(value, datatype):
|
|
raise ValueError(
|
|
"Wrong type. Expected '%s', got '%s'" % (
|
|
datatype, v_type
|
|
))
|
|
return value
|
|
|
|
|
|
class wsproperty(property):
|
|
"""
|
|
A specialised :class:`property` to define typed-property on complex types.
|
|
Example::
|
|
|
|
class MyComplexType(wsme.types.Base):
|
|
def get_aint(self):
|
|
return self._aint
|
|
|
|
def set_aint(self, value):
|
|
assert avalue < 10 # Dummy input validation
|
|
self._aint = value
|
|
|
|
aint = wsproperty(int, get_aint, set_aint, mandatory=True)
|
|
"""
|
|
def __init__(self, datatype, fget, fset=None,
|
|
mandatory=False, doc=None, name=None):
|
|
property.__init__(self, fget, fset)
|
|
#: The property name in the parent python class
|
|
self.key = None
|
|
#: The attribute name on the public of the api.
|
|
#: Defaults to :attr:`key`
|
|
self.name = name
|
|
#: property data type
|
|
self.datatype = datatype
|
|
#: True if the property is mandatory
|
|
self.mandatory = mandatory
|
|
|
|
|
|
class wsattr(object):
|
|
"""
|
|
Complex type attribute definition.
|
|
|
|
Example::
|
|
|
|
class MyComplexType(wsme.types.Base):
|
|
optionalvalue = int
|
|
mandatoryvalue = wsattr(int, mandatory=True)
|
|
named_value = wsattr(int, name='named.value')
|
|
|
|
After inspection, the non-wsattr attributes will be replaced, and
|
|
the above class will be equivalent to::
|
|
|
|
class MyComplexType(wsme.types.Base):
|
|
optionalvalue = wsattr(int)
|
|
mandatoryvalue = wsattr(int, mandatory=True)
|
|
|
|
"""
|
|
def __init__(self, datatype, mandatory=False, name=None, default=Unset,
|
|
readonly=False):
|
|
#: The attribute name in the parent python class.
|
|
#: Set by :func:`inspect_class`
|
|
self.key = None # will be set by class inspection
|
|
#: The attribute name on the public of the api.
|
|
#: Defaults to :attr:`key`
|
|
self.name = name
|
|
self._datatype = (datatype,)
|
|
#: True if the attribute is mandatory
|
|
self.mandatory = mandatory
|
|
#: Default value. The attribute will return this instead
|
|
#: of :data:`Unset` if no value has been set.
|
|
self.default = default
|
|
#: If True value cannot be set from json/xml input data
|
|
self.readonly = readonly
|
|
|
|
self.complextype = None
|
|
|
|
def _get_dataholder(self, instance):
|
|
dataholder = getattr(instance, '_wsme_dataholder', None)
|
|
if dataholder is None:
|
|
dataholder = instance._wsme_DataHolderClass()
|
|
instance._wsme_dataholder = dataholder
|
|
return dataholder
|
|
|
|
def __get__(self, instance, owner):
|
|
if instance is None:
|
|
return self
|
|
return getattr(
|
|
self._get_dataholder(instance),
|
|
self.key,
|
|
self.default
|
|
)
|
|
|
|
def __set__(self, instance, value):
|
|
try:
|
|
value = validate_value(self.datatype, value)
|
|
except (ValueError, TypeError) as e:
|
|
raise exc.InvalidInput(self.name, value, str(e))
|
|
dataholder = self._get_dataholder(instance)
|
|
if value is Unset:
|
|
if hasattr(dataholder, self.key):
|
|
delattr(dataholder, self.key)
|
|
else:
|
|
setattr(dataholder, self.key, value)
|
|
|
|
def __delete__(self, instance):
|
|
self.__set__(instance, Unset)
|
|
|
|
def _get_datatype(self):
|
|
if isinstance(self._datatype, tuple):
|
|
self._datatype = \
|
|
self.complextype().__registry__.resolve_type(self._datatype[0])
|
|
if isinstance(self._datatype, weakref.ref):
|
|
return self._datatype()
|
|
if isinstance(self._datatype, list):
|
|
return [
|
|
item() if isinstance(item, weakref.ref) else item
|
|
for item in self._datatype
|
|
]
|
|
return self._datatype
|
|
|
|
def _set_datatype(self, datatype):
|
|
self._datatype = datatype
|
|
|
|
#: attribute data type. Can be either an actual type,
|
|
#: or a type name, in which case the actual type will be
|
|
#: determined when needed (generally just before scanning the api).
|
|
datatype = property(_get_datatype, _set_datatype)
|
|
|
|
|
|
def iswsattr(attr):
|
|
if inspect.isfunction(attr) or inspect.ismethod(attr):
|
|
return False
|
|
if isinstance(attr, property) and not isinstance(attr, wsproperty):
|
|
return False
|
|
return True
|
|
|
|
|
|
def sort_attributes(class_, attributes):
|
|
"""Sort a class attributes list.
|
|
|
|
3 mechanisms are attempted :
|
|
|
|
#. Look for a _wsme_attr_order attribute on the class_. This allow
|
|
to define an arbitrary order of the attributes (useful for
|
|
generated types).
|
|
|
|
#. Access the object source code to find the declaration order.
|
|
|
|
#. Sort by alphabetically"""
|
|
|
|
if not len(attributes):
|
|
return
|
|
|
|
attrs = dict((a.key, a) for a in attributes)
|
|
|
|
if hasattr(class_, '_wsme_attr_order'):
|
|
names_order = class_._wsme_attr_order
|
|
else:
|
|
names = attrs.keys()
|
|
names_order = []
|
|
try:
|
|
lines = []
|
|
for cls in inspect.getmro(class_):
|
|
if cls is object:
|
|
continue
|
|
lines[len(lines):] = inspect.getsourcelines(cls)[0]
|
|
for line in lines:
|
|
line = line.strip().replace(" ", "")
|
|
if '=' in line:
|
|
aname = line[:line.index('=')]
|
|
if aname in names and aname not in names_order:
|
|
names_order.append(aname)
|
|
if len(names_order) < len(names):
|
|
names_order.extend((
|
|
name for name in names if name not in names_order))
|
|
assert len(names_order) == len(names)
|
|
except (TypeError, IOError):
|
|
names_order = list(names)
|
|
names_order.sort()
|
|
|
|
attributes[:] = [attrs[name] for name in names_order]
|
|
|
|
|
|
def inspect_class(class_):
|
|
"""Extract a list of (name, wsattr|wsproperty) for the given class_"""
|
|
attributes = []
|
|
for name, attr in inspect.getmembers(class_, iswsattr):
|
|
if name.startswith('_'):
|
|
continue
|
|
if inspect.isroutine(attr):
|
|
continue
|
|
|
|
if isinstance(attr, (wsattr, wsproperty)):
|
|
attrdef = attr
|
|
else:
|
|
if attr not in native_types and (
|
|
inspect.isclass(attr) or
|
|
isinstance(attr, (list, dict))):
|
|
register_type(attr)
|
|
attrdef = getattr(class_, '__wsattrclass__', wsattr)(attr)
|
|
|
|
attrdef.key = name
|
|
if attrdef.name is None:
|
|
attrdef.name = name
|
|
attrdef.complextype = weakref.ref(class_)
|
|
attributes.append(attrdef)
|
|
setattr(class_, name, attrdef)
|
|
|
|
sort_attributes(class_, attributes)
|
|
return attributes
|
|
|
|
|
|
def list_attributes(class_):
|
|
"""
|
|
Returns a list of a complex type attributes.
|
|
"""
|
|
if not iscomplex(class_):
|
|
raise TypeError("%s is not a registered type")
|
|
return class_._wsme_attributes
|
|
|
|
|
|
def make_dataholder(class_):
|
|
# the slots are computed outside the class scope to avoid
|
|
# 'attr' to pullute the class namespace, which leads to weird
|
|
# things if one of the slots is named 'attr'.
|
|
slots = [attr.key for attr in class_._wsme_attributes]
|
|
|
|
class DataHolder(object):
|
|
__slots__ = slots
|
|
|
|
DataHolder.__name__ = class_.__name__ + 'DataHolder'
|
|
return DataHolder
|
|
|
|
|
|
class Registry(object):
|
|
def __init__(self):
|
|
self._complex_types = []
|
|
self.array_types = set()
|
|
self.dict_types = set()
|
|
|
|
@property
|
|
def complex_types(self):
|
|
return [t() for t in self._complex_types if t()]
|
|
|
|
def register(self, class_):
|
|
"""
|
|
Make sure a type is registered.
|
|
|
|
It is automatically called by :class:`expose() <wsme.expose>`
|
|
and :class:`validate() <wsme.validate>`.
|
|
Unless you want to control when the class inspection is done there
|
|
is no need to call it.
|
|
"""
|
|
if class_ is None or \
|
|
class_ in native_types or \
|
|
isusertype(class_) or iscomplex(class_) or \
|
|
isarray(class_) or isdict(class_):
|
|
return class_
|
|
|
|
if isinstance(class_, list):
|
|
if len(class_) != 1:
|
|
raise ValueError("Cannot register type %s" % repr(class_))
|
|
dt = ArrayType(class_[0])
|
|
self.register(dt.item_type)
|
|
self.array_types.add(dt)
|
|
return dt
|
|
|
|
if isinstance(class_, dict):
|
|
if len(class_) != 1:
|
|
raise ValueError("Cannot register type %s" % repr(class_))
|
|
dt = DictType(*list(class_.items())[0])
|
|
self.register(dt.value_type)
|
|
self.dict_types.add(dt)
|
|
return dt
|
|
|
|
class_._wsme_attributes = None
|
|
class_._wsme_attributes = inspect_class(class_)
|
|
class_._wsme_DataHolderClass = make_dataholder(class_)
|
|
|
|
class_.__registry__ = self
|
|
self._complex_types.append(weakref.ref(class_))
|
|
return class_
|
|
|
|
def reregister(self, class_):
|
|
"""Register a type which may already have been registered.
|
|
"""
|
|
self._unregister(class_)
|
|
return self.register(class_)
|
|
|
|
def _unregister(self, class_):
|
|
"""Remove a previously registered type.
|
|
"""
|
|
# Clear the existing attribute reference so it is rebuilt if
|
|
# the class is registered again later.
|
|
if hasattr(class_, '_wsme_attributes'):
|
|
del class_._wsme_attributes
|
|
# FIXME(dhellmann): This method does not recurse through the
|
|
# types like register() does. Should it?
|
|
if isinstance(class_, list):
|
|
at = ArrayType(class_[0])
|
|
try:
|
|
self.array_types.remove(at)
|
|
except KeyError:
|
|
pass
|
|
elif isinstance(class_, dict):
|
|
key_type, value_type = list(class_.items())[0]
|
|
self.dict_types = set(
|
|
dt for dt in self.dict_types
|
|
if (dt.key_type, dt.value_type) != (key_type, value_type)
|
|
)
|
|
# We can't use remove() here because the items in
|
|
# _complex_types are weakref objects pointing to the classes,
|
|
# so we can't compare with them directly.
|
|
self._complex_types = [
|
|
ct for ct in self._complex_types
|
|
if ct() is not class_
|
|
]
|
|
|
|
def lookup(self, typename):
|
|
log.debug('Lookup %s' % typename)
|
|
modname = None
|
|
if '.' in typename:
|
|
modname, typename = typename.rsplit('.', 1)
|
|
for ct in self._complex_types:
|
|
ct = ct()
|
|
if ct is not None and typename == ct.__name__ and (
|
|
modname is None or modname == ct.__module__):
|
|
return ct
|
|
|
|
def resolve_type(self, type_):
|
|
if isinstance(type_, str):
|
|
return self.lookup(type_)
|
|
if isinstance(type_, list):
|
|
type_ = ArrayType(type_[0])
|
|
if isinstance(type_, dict):
|
|
type_ = DictType(list(type_.keys())[0], list(type_.values())[0])
|
|
if isinstance(type_, ArrayType):
|
|
type_ = ArrayType(self.resolve_type(type_.item_type))
|
|
self.array_types.add(type_)
|
|
elif isinstance(type_, DictType):
|
|
type_ = DictType(
|
|
type_.key_type,
|
|
self.resolve_type(type_.value_type)
|
|
)
|
|
self.dict_types.add(type_)
|
|
else:
|
|
type_ = self.register(type_)
|
|
return type_
|
|
|
|
|
|
# Default type registry
|
|
registry = Registry()
|
|
|
|
|
|
def register_type(class_):
|
|
return registry.register(class_)
|
|
|
|
|
|
class BaseMeta(type):
|
|
def __new__(cls, name, bases, dct):
|
|
if bases and bases[0] is not object and '__registry__' not in dct:
|
|
dct['__registry__'] = registry
|
|
return type.__new__(cls, name, bases, dct)
|
|
|
|
def __init__(cls, name, bases, dct):
|
|
if bases and bases[0] is not object and cls.__registry__:
|
|
cls.__registry__.register(cls)
|
|
|
|
|
|
class Base(metaclass=BaseMeta):
|
|
"""Base type for complex types"""
|
|
def __init__(self, **kw):
|
|
for key, value in kw.items():
|
|
if hasattr(self, key):
|
|
setattr(self, key, value)
|
|
|
|
|
|
class File(Base):
|
|
"""A complex type that represents a file.
|
|
|
|
In the particular case of protocol accepting form encoded data as
|
|
input, File can be loaded from a form file field.
|
|
"""
|
|
#: The file name
|
|
filename = wsattr(str)
|
|
|
|
#: Mime type of the content
|
|
contenttype = wsattr(str)
|
|
|
|
def _get_content(self):
|
|
if self._content is None and self._file:
|
|
self._content = self._file.read()
|
|
return self._content
|
|
|
|
def _set_content(self, value):
|
|
self._content = value
|
|
self._file = None
|
|
|
|
#: File content
|
|
content = wsproperty(binary, _get_content, _set_content)
|
|
|
|
def __init__(self, filename=None, file=None, content=None,
|
|
contenttype=None, fieldstorage=None):
|
|
self.filename = filename
|
|
self.contenttype = contenttype
|
|
self._file = file
|
|
self._content = content
|
|
|
|
if fieldstorage is not None:
|
|
if fieldstorage.file:
|
|
self._file = fieldstorage.file
|
|
self.filename = fieldstorage.filename
|
|
self.contenttype = str(fieldstorage.type)
|
|
else:
|
|
self._content = fieldstorage.value
|
|
|
|
@property
|
|
def file(self):
|
|
if self._file is None and self._content:
|
|
self._file = io.BytesIO(self._content)
|
|
return self._file
|
|
|
|
|
|
class DynamicBase(Base):
|
|
"""Base type for complex types for which all attributes are not
|
|
defined when the class is constructed.
|
|
|
|
This class is meant to be used as a base for types that have
|
|
properties added after the main class is created, such as by
|
|
loading plugins.
|
|
|
|
"""
|
|
|
|
@classmethod
|
|
def add_attributes(cls, **attrs):
|
|
"""Add more attributes
|
|
|
|
The arguments should be valid Python attribute names
|
|
associated with a type for the new attribute.
|
|
|
|
"""
|
|
for n, t in attrs.items():
|
|
setattr(cls, n, t)
|
|
cls.__registry__.reregister(cls)
|