Merge "rehome neutron.objects.common_types"
This commit is contained in:
commit
bcb9c90072
321
neutron_lib/objects/common_types.py
Normal file
321
neutron_lib/objects/common_types.py
Normal file
@ -0,0 +1,321 @@
|
||||
# Copyright 2016 OpenStack Foundation
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import uuid
|
||||
|
||||
import netaddr
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_versionedobjects import fields as obj_fields
|
||||
import six
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from neutron_lib import constants as lib_constants
|
||||
from neutron_lib.db import constants as lib_db_const
|
||||
from neutron_lib.objects import exceptions as o_exc
|
||||
from neutron_lib.utils import net as net_utils
|
||||
|
||||
|
||||
class HARouterEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.VALID_HA_STATES)
|
||||
|
||||
|
||||
class IPV6ModeEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.IPV6_MODES)
|
||||
|
||||
|
||||
class RangeConstrainedInteger(obj_fields.Integer):
|
||||
def __init__(self, start, end, **kwargs):
|
||||
try:
|
||||
self._start = int(start)
|
||||
self._end = int(end)
|
||||
except (TypeError, ValueError):
|
||||
raise o_exc.NeutronRangeConstrainedIntegerInvalidLimit(
|
||||
start=start, end=end)
|
||||
super(RangeConstrainedInteger, self).__init__(**kwargs)
|
||||
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, six.integer_types):
|
||||
msg = _("Field value %s is not an integer") % value
|
||||
raise ValueError(msg)
|
||||
if not self._start <= value <= self._end:
|
||||
msg = _("Field value %s is invalid") % value
|
||||
raise ValueError(msg)
|
||||
return super(RangeConstrainedInteger, self).coerce(obj, attr, value)
|
||||
|
||||
|
||||
class IPNetworkPrefixLen(RangeConstrainedInteger):
|
||||
"""IP network (CIDR) prefix length custom Enum"""
|
||||
def __init__(self, **kwargs):
|
||||
super(IPNetworkPrefixLen, self).__init__(
|
||||
start=0, end=lib_constants.IPv6_BITS, **kwargs)
|
||||
|
||||
|
||||
class IPNetworkPrefixLenField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = IPNetworkPrefixLen()
|
||||
|
||||
|
||||
class PortRange(RangeConstrainedInteger):
|
||||
def __init__(self, start=lib_constants.PORT_RANGE_MIN, **kwargs):
|
||||
super(PortRange, self).__init__(start=start,
|
||||
end=lib_constants.PORT_RANGE_MAX,
|
||||
**kwargs)
|
||||
|
||||
|
||||
class PortRangeField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = PortRange()
|
||||
|
||||
|
||||
class PortRangeWith0Field(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = PortRange(start=0)
|
||||
|
||||
|
||||
class VlanIdRange(RangeConstrainedInteger):
|
||||
def __init__(self, **kwargs):
|
||||
super(VlanIdRange, self).__init__(start=lib_constants.MIN_VLAN_TAG,
|
||||
end=lib_constants.MAX_VLAN_TAG,
|
||||
**kwargs)
|
||||
|
||||
|
||||
class VlanIdRangeField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = VlanIdRange()
|
||||
|
||||
|
||||
class ListOfIPNetworksField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.List(obj_fields.IPNetwork())
|
||||
|
||||
|
||||
class SetOfUUIDsField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Set(obj_fields.UUID())
|
||||
|
||||
|
||||
class DomainName(obj_fields.String):
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, six.string_types):
|
||||
msg = _("Field value %s is not a string") % value
|
||||
raise ValueError(msg)
|
||||
if len(value) > lib_db_const.FQDN_FIELD_SIZE:
|
||||
msg = _("Domain name %s is too long") % value
|
||||
raise ValueError(msg)
|
||||
return super(DomainName, self).coerce(obj, attr, value)
|
||||
|
||||
|
||||
class DomainNameField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = DomainName()
|
||||
|
||||
|
||||
class IntegerEnum(obj_fields.Integer):
|
||||
def __init__(self, valid_values=None, **kwargs):
|
||||
if not valid_values:
|
||||
msg = _("No possible values specified")
|
||||
raise ValueError(msg)
|
||||
for value in valid_values:
|
||||
if not isinstance(value, six.integer_types):
|
||||
msg = _("Possible value %s is not an integer") % value
|
||||
raise ValueError(msg)
|
||||
self._valid_values = valid_values
|
||||
super(IntegerEnum, self).__init__(**kwargs)
|
||||
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, six.integer_types):
|
||||
msg = _("Field value %s is not an integer") % value
|
||||
raise ValueError(msg)
|
||||
if value not in self._valid_values:
|
||||
msg = (
|
||||
_("Field value %(value)s is not in the list "
|
||||
"of valid values: %(values)s") %
|
||||
{'value': value, 'values': self._valid_values}
|
||||
)
|
||||
raise ValueError(msg)
|
||||
return super(IntegerEnum, self).coerce(obj, attr, value)
|
||||
|
||||
|
||||
class IPVersionEnum(IntegerEnum):
|
||||
"""IP version integer Enum"""
|
||||
def __init__(self, **kwargs):
|
||||
super(IPVersionEnum, self).__init__(
|
||||
valid_values=lib_constants.IP_ALLOWED_VERSIONS, **kwargs)
|
||||
|
||||
|
||||
class IPVersionEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = IPVersionEnum()
|
||||
|
||||
|
||||
class DscpMark(IntegerEnum):
|
||||
def __init__(self, valid_values=None, **kwargs):
|
||||
super(DscpMark, self).__init__(
|
||||
valid_values=lib_constants.VALID_DSCP_MARKS)
|
||||
|
||||
|
||||
class DscpMarkField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = DscpMark()
|
||||
|
||||
|
||||
class FlowDirectionEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.VALID_DIRECTIONS)
|
||||
|
||||
|
||||
class IpamAllocationStatusEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.VALID_IPAM_ALLOCATION_STATUSES)
|
||||
|
||||
|
||||
class EtherTypeEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.VALID_ETHERTYPES)
|
||||
|
||||
|
||||
class IpProtocolEnum(obj_fields.Enum):
|
||||
"""IP protocol number Enum"""
|
||||
def __init__(self, **kwargs):
|
||||
super(IpProtocolEnum, self).__init__(
|
||||
valid_values=list(
|
||||
itertools.chain(
|
||||
lib_constants.IP_PROTOCOL_MAP.keys(),
|
||||
[str(v) for v in range(256)]
|
||||
)
|
||||
),
|
||||
**kwargs)
|
||||
|
||||
|
||||
class PortBindingStatusEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.PORT_BINDING_STATUSES)
|
||||
|
||||
|
||||
class IpProtocolEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = IpProtocolEnum()
|
||||
|
||||
|
||||
class MACAddress(obj_fields.FieldType):
|
||||
"""MACAddress custom field.
|
||||
|
||||
This custom field is different from the one provided by
|
||||
oslo.versionedobjects library: it uses netaddr.EUI type instead of strings.
|
||||
"""
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, netaddr.EUI):
|
||||
msg = _("Field value %s is not a netaddr.EUI") % value
|
||||
raise ValueError(msg)
|
||||
return super(MACAddress, self).coerce(obj, attr, value)
|
||||
|
||||
@staticmethod
|
||||
def to_primitive(obj, attr, value):
|
||||
return str(value)
|
||||
|
||||
@staticmethod
|
||||
def from_primitive(obj, attr, value):
|
||||
try:
|
||||
return net_utils.AuthenticEUI(value)
|
||||
except Exception:
|
||||
msg = _("Field value %s is not a netaddr.EUI") % value
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
class MACAddressField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = MACAddress()
|
||||
|
||||
|
||||
class DictOfMiscValues(obj_fields.FieldType):
|
||||
"""DictOfMiscValues custom field
|
||||
|
||||
This custom field is handling dictionary with miscellaneous value types,
|
||||
including integer, float, boolean and list and nested dictionaries.
|
||||
"""
|
||||
@staticmethod
|
||||
def coerce(obj, attr, value):
|
||||
if isinstance(value, dict):
|
||||
return value
|
||||
if isinstance(value, six.string_types):
|
||||
try:
|
||||
return jsonutils.loads(value)
|
||||
except Exception:
|
||||
msg = _("Field value %s is not stringified JSON") % value
|
||||
raise ValueError(msg)
|
||||
msg = (_("Field value %s is not type of dict or stringified JSON")
|
||||
% value)
|
||||
raise ValueError(msg)
|
||||
|
||||
@staticmethod
|
||||
def from_primitive(obj, attr, value):
|
||||
return DictOfMiscValues.coerce(obj, attr, value)
|
||||
|
||||
@staticmethod
|
||||
def to_primitive(obj, attr, value):
|
||||
return jsonutils.dumps(value)
|
||||
|
||||
@staticmethod
|
||||
def stringify(value):
|
||||
return jsonutils.dumps(value)
|
||||
|
||||
|
||||
class DictOfMiscValuesField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = DictOfMiscValues
|
||||
|
||||
|
||||
class ListOfDictOfMiscValuesField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.List(DictOfMiscValuesField())
|
||||
|
||||
|
||||
class IPNetwork(obj_fields.FieldType):
|
||||
"""IPNetwork custom field.
|
||||
|
||||
This custom field is different from the one provided by
|
||||
oslo.versionedobjects library: it does not reset string representation for
|
||||
the field.
|
||||
"""
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, netaddr.IPNetwork):
|
||||
msg = _("Field value %s is not a netaddr.IPNetwork") % value
|
||||
raise ValueError(msg)
|
||||
return super(IPNetwork, self).coerce(obj, attr, value)
|
||||
|
||||
@staticmethod
|
||||
def to_primitive(obj, attr, value):
|
||||
return str(value)
|
||||
|
||||
@staticmethod
|
||||
def from_primitive(obj, attr, value):
|
||||
try:
|
||||
return net_utils.AuthenticIPNetwork(value)
|
||||
except Exception:
|
||||
msg = _("Field value %s is not a netaddr.IPNetwork") % value
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
class IPNetworkField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = IPNetwork()
|
||||
|
||||
|
||||
class UUID(obj_fields.UUID):
|
||||
def coerce(self, obj, attr, value):
|
||||
uuid.UUID(str(value))
|
||||
return str(value)
|
||||
|
||||
|
||||
class UUIDField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = UUID()
|
||||
|
||||
|
||||
class FloatingIPStatusEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.VALID_FLOATINGIP_STATUS)
|
||||
|
||||
|
||||
class RouterStatusEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.VALID_ROUTER_STATUS)
|
||||
|
||||
|
||||
class NetworkSegmentRangeNetworkTypeEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.NETWORK_SEGMENT_RANGE_TYPES)
|
@ -20,8 +20,10 @@ import time
|
||||
import warnings
|
||||
|
||||
import fixtures
|
||||
import netaddr
|
||||
|
||||
from neutron_lib.utils import helpers
|
||||
from neutron_lib.utils import net
|
||||
|
||||
|
||||
class UnorderedList(list):
|
||||
@ -78,3 +80,13 @@ def reset_random_seed():
|
||||
# at the same time get the same values from RNG
|
||||
seed = time.time() + os.getpid()
|
||||
random.seed(seed)
|
||||
|
||||
|
||||
def get_random_EUI():
|
||||
return netaddr.EUI(
|
||||
net.get_random_mac(['fe', '16', '3e', '00', '00', '00'])
|
||||
)
|
||||
|
||||
|
||||
def get_random_ip_network(version=4):
|
||||
return netaddr.IPNetwork(get_random_cidr(version=version))
|
||||
|
303
neutron_lib/tests/unit/objects/test_common_types.py
Normal file
303
neutron_lib/tests/unit/objects/test_common_types.py
Normal file
@ -0,0 +1,303 @@
|
||||
# Copyright 2016 OpenStack Foundation
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import itertools
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from neutron_lib import constants as const
|
||||
from neutron_lib.db import constants as db_const
|
||||
from neutron_lib.objects import common_types
|
||||
from neutron_lib.tests import _base as test_base
|
||||
from neutron_lib.tests import tools
|
||||
from neutron_lib.utils import net
|
||||
|
||||
|
||||
class TestField(object):
|
||||
|
||||
def test_coerce_good_values(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual(out_val, self.field.coerce('obj', 'attr', in_val))
|
||||
|
||||
def test_coerce_bad_values(self):
|
||||
for in_val in self.coerce_bad_values:
|
||||
self.assertRaises((TypeError, ValueError),
|
||||
self.field.coerce, 'obj', 'attr', in_val)
|
||||
|
||||
def test_to_primitive(self):
|
||||
for in_val, prim_val in self.to_primitive_values:
|
||||
self.assertEqual(prim_val, self.field.to_primitive('obj', 'attr',
|
||||
in_val))
|
||||
|
||||
def test_to_primitive_json_serializable(self):
|
||||
for in_val, _ in self.to_primitive_values:
|
||||
prim = self.field.to_primitive('obj', 'attr', in_val)
|
||||
jsencoded = jsonutils.dumps(prim)
|
||||
self.assertEqual(prim, jsonutils.loads(jsencoded))
|
||||
|
||||
def test_from_primitive(self):
|
||||
class ObjectLikeThing(object):
|
||||
_context = 'context'
|
||||
|
||||
for prim_val, out_val in self.from_primitive_values:
|
||||
from_prim = self.field.from_primitive(ObjectLikeThing, 'attr',
|
||||
prim_val)
|
||||
self.assertEqual(out_val, from_prim)
|
||||
# ensure it's coercable for sanity
|
||||
self.field.coerce('obj', 'attr', from_prim)
|
||||
|
||||
@abc.abstractmethod
|
||||
def test_stringify(self):
|
||||
'''This test should validate stringify() format for new field types.'''
|
||||
|
||||
|
||||
class IPV6ModeEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IPV6ModeEnumFieldTest, self).setUp()
|
||||
self.field = common_types.IPV6ModeEnumField()
|
||||
self.coerce_good_values = [(mode, mode)
|
||||
for mode in const.IPV6_MODES]
|
||||
self.coerce_bad_values = ['6', 4, 'type', 'slaacc']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class DscpMarkFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(DscpMarkFieldTest, self).setUp()
|
||||
self.field = common_types.DscpMarkField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in const.VALID_DSCP_MARKS]
|
||||
self.coerce_bad_values = ['6', 'str', [], {}, object()]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class IPNetworkPrefixLenFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IPNetworkPrefixLenFieldTest, self).setUp()
|
||||
self.field = common_types.IPNetworkPrefixLenField()
|
||||
self.coerce_good_values = [(x, x) for x in (0, 32, 128, 42)]
|
||||
self.coerce_bad_values = ['len', '1', 129, -1]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class MACAddressFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(MACAddressFieldTest, self).setUp()
|
||||
self.field = common_types.MACAddressField()
|
||||
mac1 = tools.get_random_EUI()
|
||||
mac2 = tools.get_random_EUI()
|
||||
self.coerce_good_values = [(mac1, mac1), (mac2, mac2)]
|
||||
self.coerce_bad_values = [
|
||||
'XXXX', 'ypp', 'g3:vvv',
|
||||
# the field type is strict and does not allow to pass strings, even
|
||||
# if they represent a valid MAC address
|
||||
net.get_random_mac('fe:16:3e:00:00:00'.split(':')),
|
||||
]
|
||||
self.to_primitive_values = ((a1, str(a2))
|
||||
for a1, a2 in self.coerce_good_values)
|
||||
self.from_primitive_values = ((a2, a1)
|
||||
for a1, a2 in self.to_primitive_values)
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class IPNetworkFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IPNetworkFieldTest, self).setUp()
|
||||
self.field = common_types.IPNetworkField()
|
||||
addrs = [
|
||||
tools.get_random_ip_network(version=ip_version)
|
||||
for ip_version in const.IP_ALLOWED_VERSIONS
|
||||
]
|
||||
self.coerce_good_values = [(addr, addr) for addr in addrs]
|
||||
self.coerce_bad_values = [
|
||||
'ypp', 'g3:vvv',
|
||||
# the field type is strict and does not allow to pass strings, even
|
||||
# if they represent a valid IP network
|
||||
'10.0.0.0/24',
|
||||
]
|
||||
self.to_primitive_values = ((a1, str(a2))
|
||||
for a1, a2 in self.coerce_good_values)
|
||||
self.from_primitive_values = ((a2, a1)
|
||||
for a1, a2 in self.to_primitive_values)
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class IPVersionEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IPVersionEnumFieldTest, self).setUp()
|
||||
self.field = common_types.IPVersionEnumField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in const.IP_ALLOWED_VERSIONS]
|
||||
self.coerce_bad_values = [5, 0, -1, 'str']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class FlowDirectionEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(FlowDirectionEnumFieldTest, self).setUp()
|
||||
self.field = common_types.FlowDirectionEnumField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in const.VALID_DIRECTIONS]
|
||||
self.coerce_bad_values = ['test', '8', 10, []]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class DomainNameFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(DomainNameFieldTest, self).setUp()
|
||||
self.field = common_types.DomainNameField()
|
||||
self.coerce_good_values = [
|
||||
(val, val)
|
||||
for val in ('www.google.com', 'hostname', '1abc.com')
|
||||
]
|
||||
self.coerce_bad_values = ['x' * (db_const.FQDN_FIELD_SIZE + 1), 10, []]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class EtherTypeEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(EtherTypeEnumFieldTest, self).setUp()
|
||||
self.field = common_types.EtherTypeEnumField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in const.VALID_ETHERTYPES]
|
||||
self.coerce_bad_values = ['IpV4', 8, 'str', 'ipv6']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class IpProtocolEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IpProtocolEnumFieldTest, self).setUp()
|
||||
self.field = common_types.IpProtocolEnumField()
|
||||
self.coerce_good_values = [
|
||||
(val, val)
|
||||
for val in itertools.chain(
|
||||
const.IP_PROTOCOL_MAP.keys(),
|
||||
[str(v) for v in range(256)]
|
||||
)
|
||||
]
|
||||
self.coerce_bad_values = ['test', 'Udp', 256]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class UUIDFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(UUIDFieldTest, self).setUp()
|
||||
self.field = common_types.UUIDField()
|
||||
self.coerce_good_values = [
|
||||
('f1d9cb3f-c263-45d3-907c-d12a9ef1629e',
|
||||
'f1d9cb3f-c263-45d3-907c-d12a9ef1629e'),
|
||||
('7188f6637cbd4097a3b1d1bb7897c7c0',
|
||||
'7188f6637cbd4097a3b1d1bb7897c7c0')]
|
||||
self.coerce_bad_values = [
|
||||
'f1d9cb3f-c263-45d3-907c-d12a9ef16zzz',
|
||||
'7188f6637cbd4097a3b1d1bb7897']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class DictOfMiscValuesFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(DictOfMiscValuesFieldTest, self).setUp()
|
||||
self.field = common_types.DictOfMiscValues
|
||||
test_dict_1 = {'a': True,
|
||||
'b': 1.23,
|
||||
'c': ['1', 1.23, True],
|
||||
'd': {'aa': 'zz'},
|
||||
'e': '10.0.0.1'}
|
||||
test_dict_str = jsonutils.dumps(test_dict_1)
|
||||
self.coerce_good_values = [
|
||||
(test_dict_1, test_dict_1),
|
||||
(test_dict_str, test_dict_1)
|
||||
]
|
||||
self.coerce_bad_values = [str(test_dict_1), '{"a":}']
|
||||
self.to_primitive_values = [
|
||||
(test_dict_1, test_dict_str)
|
||||
]
|
||||
self.from_primitive_values = [
|
||||
(test_dict_str, test_dict_1)
|
||||
]
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual(jsonutils.dumps(in_val),
|
||||
self.field.stringify(in_val))
|
||||
|
||||
|
||||
class NetworkSegmentRangeNetworkTypeEnumFieldTest(test_base.BaseTestCase,
|
||||
TestField):
|
||||
def setUp(self):
|
||||
super(NetworkSegmentRangeNetworkTypeEnumFieldTest, self).setUp()
|
||||
self.field = common_types.NetworkSegmentRangeNetworkTypeEnumField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in [const.TYPE_VLAN,
|
||||
const.TYPE_VXLAN,
|
||||
const.TYPE_GRE,
|
||||
const.TYPE_GENEVE]]
|
||||
self.coerce_bad_values = [const.TYPE_FLAT, 'foo-network-type']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
@ -14,6 +14,9 @@
|
||||
import random
|
||||
import socket
|
||||
|
||||
import netaddr
|
||||
import six
|
||||
|
||||
from neutron_lib import constants
|
||||
|
||||
|
||||
@ -85,3 +88,41 @@ def is_port_trusted(port):
|
||||
"""
|
||||
return port['device_owner'].startswith(
|
||||
constants.DEVICE_OWNER_NETWORK_PREFIX)
|
||||
|
||||
|
||||
class _AuthenticBase(object):
|
||||
def __init__(self, addr, **kwargs):
|
||||
super(_AuthenticBase, self).__init__(addr, **kwargs)
|
||||
self._initial_value = addr
|
||||
|
||||
def __str__(self):
|
||||
if isinstance(self._initial_value, six.string_types):
|
||||
return self._initial_value
|
||||
return super(_AuthenticBase, self).__str__()
|
||||
|
||||
# NOTE(ihrachys): override deepcopy because netaddr.* classes are
|
||||
# slot-based and hence would not copy _initial_value
|
||||
def __deepcopy__(self, memo):
|
||||
return self.__class__(self._initial_value)
|
||||
|
||||
|
||||
class AuthenticIPNetwork(_AuthenticBase, netaddr.IPNetwork):
|
||||
'''AuthenticIPNetwork class
|
||||
|
||||
This class retains the format of the IP network string passed during
|
||||
initialization.
|
||||
|
||||
This is useful when we want to make sure that we retain the format passed
|
||||
by a user through API.
|
||||
'''
|
||||
|
||||
|
||||
class AuthenticEUI(_AuthenticBase, netaddr.EUI):
|
||||
'''AuthenticEUI class
|
||||
|
||||
This class retains the format of the MAC address string passed during
|
||||
initialization.
|
||||
|
||||
This is useful when we want to make sure that we retain the format passed
|
||||
by a user through API.
|
||||
'''
|
||||
|
@ -0,0 +1,8 @@
|
||||
---
|
||||
features:
|
||||
- The ``neutron.objects.common_types`` module is now available in
|
||||
``neutron_lib.objects.common_types``.
|
||||
- The ``get_random_EUI`` and ``get_random_ip_network`` functions are now
|
||||
available in ``neutron_lib.tests.tools``.
|
||||
- The ``AuthenticIPNetwork`` and ``AuthenticEUI`` classes are now available
|
||||
in ``neutron_lib.utils.net``.
|
@ -7,6 +7,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
|
||||
SQLAlchemy>=1.2.0 # MIT
|
||||
pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD
|
||||
keystoneauth1>=3.4.0 # Apache-2.0
|
||||
netaddr>=0.7.18 # BSD
|
||||
six>=1.10.0 # MIT
|
||||
stevedore>=1.20.0 # Apache-2.0
|
||||
oslo.concurrency>=3.26.0 # Apache-2.0
|
||||
|
Loading…
x
Reference in New Issue
Block a user