rehome neutron.objects.common_types
This patch rehomes neutron.objects.common_types into neutron_lib.objects.common_types and includes the test_common_types from neutron as well. In addition some supporting logic is rehomed including some utils. For full details on the rehomed code please see the release note included herein. Also see: https://bugs.launchpad.net/neutron/+bug/1815827 Change-Id: Ic4f1240fceea1e372e6cb68e747169f7236b9f08
This commit is contained in:
parent
baab4a0373
commit
02233a9871
|
@ -0,0 +1,321 @@
|
|||
# Copyright 2016 OpenStack Foundation
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import itertools
|
||||
import uuid
|
||||
|
||||
import netaddr
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_versionedobjects import fields as obj_fields
|
||||
import six
|
||||
|
||||
from neutron_lib._i18n import _
|
||||
from neutron_lib import constants as lib_constants
|
||||
from neutron_lib.db import constants as lib_db_const
|
||||
from neutron_lib.objects import exceptions as o_exc
|
||||
from neutron_lib.utils import net as net_utils
|
||||
|
||||
|
||||
class HARouterEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.VALID_HA_STATES)
|
||||
|
||||
|
||||
class IPV6ModeEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.IPV6_MODES)
|
||||
|
||||
|
||||
class RangeConstrainedInteger(obj_fields.Integer):
|
||||
def __init__(self, start, end, **kwargs):
|
||||
try:
|
||||
self._start = int(start)
|
||||
self._end = int(end)
|
||||
except (TypeError, ValueError):
|
||||
raise o_exc.NeutronRangeConstrainedIntegerInvalidLimit(
|
||||
start=start, end=end)
|
||||
super(RangeConstrainedInteger, self).__init__(**kwargs)
|
||||
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, six.integer_types):
|
||||
msg = _("Field value %s is not an integer") % value
|
||||
raise ValueError(msg)
|
||||
if not self._start <= value <= self._end:
|
||||
msg = _("Field value %s is invalid") % value
|
||||
raise ValueError(msg)
|
||||
return super(RangeConstrainedInteger, self).coerce(obj, attr, value)
|
||||
|
||||
|
||||
class IPNetworkPrefixLen(RangeConstrainedInteger):
|
||||
"""IP network (CIDR) prefix length custom Enum"""
|
||||
def __init__(self, **kwargs):
|
||||
super(IPNetworkPrefixLen, self).__init__(
|
||||
start=0, end=lib_constants.IPv6_BITS, **kwargs)
|
||||
|
||||
|
||||
class IPNetworkPrefixLenField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = IPNetworkPrefixLen()
|
||||
|
||||
|
||||
class PortRange(RangeConstrainedInteger):
|
||||
def __init__(self, start=lib_constants.PORT_RANGE_MIN, **kwargs):
|
||||
super(PortRange, self).__init__(start=start,
|
||||
end=lib_constants.PORT_RANGE_MAX,
|
||||
**kwargs)
|
||||
|
||||
|
||||
class PortRangeField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = PortRange()
|
||||
|
||||
|
||||
class PortRangeWith0Field(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = PortRange(start=0)
|
||||
|
||||
|
||||
class VlanIdRange(RangeConstrainedInteger):
|
||||
def __init__(self, **kwargs):
|
||||
super(VlanIdRange, self).__init__(start=lib_constants.MIN_VLAN_TAG,
|
||||
end=lib_constants.MAX_VLAN_TAG,
|
||||
**kwargs)
|
||||
|
||||
|
||||
class VlanIdRangeField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = VlanIdRange()
|
||||
|
||||
|
||||
class ListOfIPNetworksField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.List(obj_fields.IPNetwork())
|
||||
|
||||
|
||||
class SetOfUUIDsField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Set(obj_fields.UUID())
|
||||
|
||||
|
||||
class DomainName(obj_fields.String):
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, six.string_types):
|
||||
msg = _("Field value %s is not a string") % value
|
||||
raise ValueError(msg)
|
||||
if len(value) > lib_db_const.FQDN_FIELD_SIZE:
|
||||
msg = _("Domain name %s is too long") % value
|
||||
raise ValueError(msg)
|
||||
return super(DomainName, self).coerce(obj, attr, value)
|
||||
|
||||
|
||||
class DomainNameField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = DomainName()
|
||||
|
||||
|
||||
class IntegerEnum(obj_fields.Integer):
|
||||
def __init__(self, valid_values=None, **kwargs):
|
||||
if not valid_values:
|
||||
msg = _("No possible values specified")
|
||||
raise ValueError(msg)
|
||||
for value in valid_values:
|
||||
if not isinstance(value, six.integer_types):
|
||||
msg = _("Possible value %s is not an integer") % value
|
||||
raise ValueError(msg)
|
||||
self._valid_values = valid_values
|
||||
super(IntegerEnum, self).__init__(**kwargs)
|
||||
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, six.integer_types):
|
||||
msg = _("Field value %s is not an integer") % value
|
||||
raise ValueError(msg)
|
||||
if value not in self._valid_values:
|
||||
msg = (
|
||||
_("Field value %(value)s is not in the list "
|
||||
"of valid values: %(values)s") %
|
||||
{'value': value, 'values': self._valid_values}
|
||||
)
|
||||
raise ValueError(msg)
|
||||
return super(IntegerEnum, self).coerce(obj, attr, value)
|
||||
|
||||
|
||||
class IPVersionEnum(IntegerEnum):
|
||||
"""IP version integer Enum"""
|
||||
def __init__(self, **kwargs):
|
||||
super(IPVersionEnum, self).__init__(
|
||||
valid_values=lib_constants.IP_ALLOWED_VERSIONS, **kwargs)
|
||||
|
||||
|
||||
class IPVersionEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = IPVersionEnum()
|
||||
|
||||
|
||||
class DscpMark(IntegerEnum):
|
||||
def __init__(self, valid_values=None, **kwargs):
|
||||
super(DscpMark, self).__init__(
|
||||
valid_values=lib_constants.VALID_DSCP_MARKS)
|
||||
|
||||
|
||||
class DscpMarkField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = DscpMark()
|
||||
|
||||
|
||||
class FlowDirectionEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.VALID_DIRECTIONS)
|
||||
|
||||
|
||||
class IpamAllocationStatusEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.VALID_IPAM_ALLOCATION_STATUSES)
|
||||
|
||||
|
||||
class EtherTypeEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(valid_values=lib_constants.VALID_ETHERTYPES)
|
||||
|
||||
|
||||
class IpProtocolEnum(obj_fields.Enum):
|
||||
"""IP protocol number Enum"""
|
||||
def __init__(self, **kwargs):
|
||||
super(IpProtocolEnum, self).__init__(
|
||||
valid_values=list(
|
||||
itertools.chain(
|
||||
lib_constants.IP_PROTOCOL_MAP.keys(),
|
||||
[str(v) for v in range(256)]
|
||||
)
|
||||
),
|
||||
**kwargs)
|
||||
|
||||
|
||||
class PortBindingStatusEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.PORT_BINDING_STATUSES)
|
||||
|
||||
|
||||
class IpProtocolEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = IpProtocolEnum()
|
||||
|
||||
|
||||
class MACAddress(obj_fields.FieldType):
|
||||
"""MACAddress custom field.
|
||||
|
||||
This custom field is different from the one provided by
|
||||
oslo.versionedobjects library: it uses netaddr.EUI type instead of strings.
|
||||
"""
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, netaddr.EUI):
|
||||
msg = _("Field value %s is not a netaddr.EUI") % value
|
||||
raise ValueError(msg)
|
||||
return super(MACAddress, self).coerce(obj, attr, value)
|
||||
|
||||
@staticmethod
|
||||
def to_primitive(obj, attr, value):
|
||||
return str(value)
|
||||
|
||||
@staticmethod
|
||||
def from_primitive(obj, attr, value):
|
||||
try:
|
||||
return net_utils.AuthenticEUI(value)
|
||||
except Exception:
|
||||
msg = _("Field value %s is not a netaddr.EUI") % value
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
class MACAddressField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = MACAddress()
|
||||
|
||||
|
||||
class DictOfMiscValues(obj_fields.FieldType):
|
||||
"""DictOfMiscValues custom field
|
||||
|
||||
This custom field is handling dictionary with miscellaneous value types,
|
||||
including integer, float, boolean and list and nested dictionaries.
|
||||
"""
|
||||
@staticmethod
|
||||
def coerce(obj, attr, value):
|
||||
if isinstance(value, dict):
|
||||
return value
|
||||
if isinstance(value, six.string_types):
|
||||
try:
|
||||
return jsonutils.loads(value)
|
||||
except Exception:
|
||||
msg = _("Field value %s is not stringified JSON") % value
|
||||
raise ValueError(msg)
|
||||
msg = (_("Field value %s is not type of dict or stringified JSON")
|
||||
% value)
|
||||
raise ValueError(msg)
|
||||
|
||||
@staticmethod
|
||||
def from_primitive(obj, attr, value):
|
||||
return DictOfMiscValues.coerce(obj, attr, value)
|
||||
|
||||
@staticmethod
|
||||
def to_primitive(obj, attr, value):
|
||||
return jsonutils.dumps(value)
|
||||
|
||||
@staticmethod
|
||||
def stringify(value):
|
||||
return jsonutils.dumps(value)
|
||||
|
||||
|
||||
class DictOfMiscValuesField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = DictOfMiscValues
|
||||
|
||||
|
||||
class ListOfDictOfMiscValuesField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.List(DictOfMiscValuesField())
|
||||
|
||||
|
||||
class IPNetwork(obj_fields.FieldType):
|
||||
"""IPNetwork custom field.
|
||||
|
||||
This custom field is different from the one provided by
|
||||
oslo.versionedobjects library: it does not reset string representation for
|
||||
the field.
|
||||
"""
|
||||
def coerce(self, obj, attr, value):
|
||||
if not isinstance(value, netaddr.IPNetwork):
|
||||
msg = _("Field value %s is not a netaddr.IPNetwork") % value
|
||||
raise ValueError(msg)
|
||||
return super(IPNetwork, self).coerce(obj, attr, value)
|
||||
|
||||
@staticmethod
|
||||
def to_primitive(obj, attr, value):
|
||||
return str(value)
|
||||
|
||||
@staticmethod
|
||||
def from_primitive(obj, attr, value):
|
||||
try:
|
||||
return net_utils.AuthenticIPNetwork(value)
|
||||
except Exception:
|
||||
msg = _("Field value %s is not a netaddr.IPNetwork") % value
|
||||
raise ValueError(msg)
|
||||
|
||||
|
||||
class IPNetworkField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = IPNetwork()
|
||||
|
||||
|
||||
class UUID(obj_fields.UUID):
|
||||
def coerce(self, obj, attr, value):
|
||||
uuid.UUID(str(value))
|
||||
return str(value)
|
||||
|
||||
|
||||
class UUIDField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = UUID()
|
||||
|
||||
|
||||
class FloatingIPStatusEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.VALID_FLOATINGIP_STATUS)
|
||||
|
||||
|
||||
class RouterStatusEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.VALID_ROUTER_STATUS)
|
||||
|
||||
|
||||
class NetworkSegmentRangeNetworkTypeEnumField(obj_fields.AutoTypedField):
|
||||
AUTO_TYPE = obj_fields.Enum(
|
||||
valid_values=lib_constants.NETWORK_SEGMENT_RANGE_TYPES)
|
|
@ -20,8 +20,10 @@ import time
|
|||
import warnings
|
||||
|
||||
import fixtures
|
||||
import netaddr
|
||||
|
||||
from neutron_lib.utils import helpers
|
||||
from neutron_lib.utils import net
|
||||
|
||||
|
||||
class UnorderedList(list):
|
||||
|
@ -78,3 +80,13 @@ def reset_random_seed():
|
|||
# at the same time get the same values from RNG
|
||||
seed = time.time() + os.getpid()
|
||||
random.seed(seed)
|
||||
|
||||
|
||||
def get_random_EUI():
|
||||
return netaddr.EUI(
|
||||
net.get_random_mac(['fe', '16', '3e', '00', '00', '00'])
|
||||
)
|
||||
|
||||
|
||||
def get_random_ip_network(version=4):
|
||||
return netaddr.IPNetwork(get_random_cidr(version=version))
|
||||
|
|
|
@ -0,0 +1,303 @@
|
|||
# Copyright 2016 OpenStack Foundation
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import itertools
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from neutron_lib import constants as const
|
||||
from neutron_lib.db import constants as db_const
|
||||
from neutron_lib.objects import common_types
|
||||
from neutron_lib.tests import _base as test_base
|
||||
from neutron_lib.tests import tools
|
||||
from neutron_lib.utils import net
|
||||
|
||||
|
||||
class TestField(object):
|
||||
|
||||
def test_coerce_good_values(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual(out_val, self.field.coerce('obj', 'attr', in_val))
|
||||
|
||||
def test_coerce_bad_values(self):
|
||||
for in_val in self.coerce_bad_values:
|
||||
self.assertRaises((TypeError, ValueError),
|
||||
self.field.coerce, 'obj', 'attr', in_val)
|
||||
|
||||
def test_to_primitive(self):
|
||||
for in_val, prim_val in self.to_primitive_values:
|
||||
self.assertEqual(prim_val, self.field.to_primitive('obj', 'attr',
|
||||
in_val))
|
||||
|
||||
def test_to_primitive_json_serializable(self):
|
||||
for in_val, _ in self.to_primitive_values:
|
||||
prim = self.field.to_primitive('obj', 'attr', in_val)
|
||||
jsencoded = jsonutils.dumps(prim)
|
||||
self.assertEqual(prim, jsonutils.loads(jsencoded))
|
||||
|
||||
def test_from_primitive(self):
|
||||
class ObjectLikeThing(object):
|
||||
_context = 'context'
|
||||
|
||||
for prim_val, out_val in self.from_primitive_values:
|
||||
from_prim = self.field.from_primitive(ObjectLikeThing, 'attr',
|
||||
prim_val)
|
||||
self.assertEqual(out_val, from_prim)
|
||||
# ensure it's coercable for sanity
|
||||
self.field.coerce('obj', 'attr', from_prim)
|
||||
|
||||
@abc.abstractmethod
|
||||
def test_stringify(self):
|
||||
'''This test should validate stringify() format for new field types.'''
|
||||
|
||||
|
||||
class IPV6ModeEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IPV6ModeEnumFieldTest, self).setUp()
|
||||
self.field = common_types.IPV6ModeEnumField()
|
||||
self.coerce_good_values = [(mode, mode)
|
||||
for mode in const.IPV6_MODES]
|
||||
self.coerce_bad_values = ['6', 4, 'type', 'slaacc']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class DscpMarkFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(DscpMarkFieldTest, self).setUp()
|
||||
self.field = common_types.DscpMarkField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in const.VALID_DSCP_MARKS]
|
||||
self.coerce_bad_values = ['6', 'str', [], {}, object()]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class IPNetworkPrefixLenFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IPNetworkPrefixLenFieldTest, self).setUp()
|
||||
self.field = common_types.IPNetworkPrefixLenField()
|
||||
self.coerce_good_values = [(x, x) for x in (0, 32, 128, 42)]
|
||||
self.coerce_bad_values = ['len', '1', 129, -1]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class MACAddressFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(MACAddressFieldTest, self).setUp()
|
||||
self.field = common_types.MACAddressField()
|
||||
mac1 = tools.get_random_EUI()
|
||||
mac2 = tools.get_random_EUI()
|
||||
self.coerce_good_values = [(mac1, mac1), (mac2, mac2)]
|
||||
self.coerce_bad_values = [
|
||||
'XXXX', 'ypp', 'g3:vvv',
|
||||
# the field type is strict and does not allow to pass strings, even
|
||||
# if they represent a valid MAC address
|
||||
net.get_random_mac('fe:16:3e:00:00:00'.split(':')),
|
||||
]
|
||||
self.to_primitive_values = ((a1, str(a2))
|
||||
for a1, a2 in self.coerce_good_values)
|
||||
self.from_primitive_values = ((a2, a1)
|
||||
for a1, a2 in self.to_primitive_values)
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class IPNetworkFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IPNetworkFieldTest, self).setUp()
|
||||
self.field = common_types.IPNetworkField()
|
||||
addrs = [
|
||||
tools.get_random_ip_network(version=ip_version)
|
||||
for ip_version in const.IP_ALLOWED_VERSIONS
|
||||
]
|
||||
self.coerce_good_values = [(addr, addr) for addr in addrs]
|
||||
self.coerce_bad_values = [
|
||||
'ypp', 'g3:vvv',
|
||||
# the field type is strict and does not allow to pass strings, even
|
||||
# if they represent a valid IP network
|
||||
'10.0.0.0/24',
|
||||
]
|
||||
self.to_primitive_values = ((a1, str(a2))
|
||||
for a1, a2 in self.coerce_good_values)
|
||||
self.from_primitive_values = ((a2, a1)
|
||||
for a1, a2 in self.to_primitive_values)
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class IPVersionEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IPVersionEnumFieldTest, self).setUp()
|
||||
self.field = common_types.IPVersionEnumField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in const.IP_ALLOWED_VERSIONS]
|
||||
self.coerce_bad_values = [5, 0, -1, 'str']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("%s" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class FlowDirectionEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(FlowDirectionEnumFieldTest, self).setUp()
|
||||
self.field = common_types.FlowDirectionEnumField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in const.VALID_DIRECTIONS]
|
||||
self.coerce_bad_values = ['test', '8', 10, []]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class DomainNameFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(DomainNameFieldTest, self).setUp()
|
||||
self.field = common_types.DomainNameField()
|
||||
self.coerce_good_values = [
|
||||
(val, val)
|
||||
for val in ('www.google.com', 'hostname', '1abc.com')
|
||||
]
|
||||
self.coerce_bad_values = ['x' * (db_const.FQDN_FIELD_SIZE + 1), 10, []]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class EtherTypeEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(EtherTypeEnumFieldTest, self).setUp()
|
||||
self.field = common_types.EtherTypeEnumField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in const.VALID_ETHERTYPES]
|
||||
self.coerce_bad_values = ['IpV4', 8, 'str', 'ipv6']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class IpProtocolEnumFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(IpProtocolEnumFieldTest, self).setUp()
|
||||
self.field = common_types.IpProtocolEnumField()
|
||||
self.coerce_good_values = [
|
||||
(val, val)
|
||||
for val in itertools.chain(
|
||||
const.IP_PROTOCOL_MAP.keys(),
|
||||
[str(v) for v in range(256)]
|
||||
)
|
||||
]
|
||||
self.coerce_bad_values = ['test', 'Udp', 256]
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class UUIDFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(UUIDFieldTest, self).setUp()
|
||||
self.field = common_types.UUIDField()
|
||||
self.coerce_good_values = [
|
||||
('f1d9cb3f-c263-45d3-907c-d12a9ef1629e',
|
||||
'f1d9cb3f-c263-45d3-907c-d12a9ef1629e'),
|
||||
('7188f6637cbd4097a3b1d1bb7897c7c0',
|
||||
'7188f6637cbd4097a3b1d1bb7897c7c0')]
|
||||
self.coerce_bad_values = [
|
||||
'f1d9cb3f-c263-45d3-907c-d12a9ef16zzz',
|
||||
'7188f6637cbd4097a3b1d1bb7897']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual('%s' % in_val, self.field.stringify(in_val))
|
||||
|
||||
|
||||
class DictOfMiscValuesFieldTest(test_base.BaseTestCase, TestField):
|
||||
def setUp(self):
|
||||
super(DictOfMiscValuesFieldTest, self).setUp()
|
||||
self.field = common_types.DictOfMiscValues
|
||||
test_dict_1 = {'a': True,
|
||||
'b': 1.23,
|
||||
'c': ['1', 1.23, True],
|
||||
'd': {'aa': 'zz'},
|
||||
'e': '10.0.0.1'}
|
||||
test_dict_str = jsonutils.dumps(test_dict_1)
|
||||
self.coerce_good_values = [
|
||||
(test_dict_1, test_dict_1),
|
||||
(test_dict_str, test_dict_1)
|
||||
]
|
||||
self.coerce_bad_values = [str(test_dict_1), '{"a":}']
|
||||
self.to_primitive_values = [
|
||||
(test_dict_1, test_dict_str)
|
||||
]
|
||||
self.from_primitive_values = [
|
||||
(test_dict_str, test_dict_1)
|
||||
]
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual(jsonutils.dumps(in_val),
|
||||
self.field.stringify(in_val))
|
||||
|
||||
|
||||
class NetworkSegmentRangeNetworkTypeEnumFieldTest(test_base.BaseTestCase,
|
||||
TestField):
|
||||
def setUp(self):
|
||||
super(NetworkSegmentRangeNetworkTypeEnumFieldTest, self).setUp()
|
||||
self.field = common_types.NetworkSegmentRangeNetworkTypeEnumField()
|
||||
self.coerce_good_values = [(val, val)
|
||||
for val in [const.TYPE_VLAN,
|
||||
const.TYPE_VXLAN,
|
||||
const.TYPE_GRE,
|
||||
const.TYPE_GENEVE]]
|
||||
self.coerce_bad_values = [const.TYPE_FLAT, 'foo-network-type']
|
||||
self.to_primitive_values = self.coerce_good_values
|
||||
self.from_primitive_values = self.coerce_good_values
|
||||
|
||||
def test_stringify(self):
|
||||
for in_val, out_val in self.coerce_good_values:
|
||||
self.assertEqual("'%s'" % in_val, self.field.stringify(in_val))
|
|
@ -14,6 +14,9 @@
|
|||
import random
|
||||
import socket
|
||||
|
||||
import netaddr
|
||||
import six
|
||||
|
||||
from neutron_lib import constants
|
||||
|
||||
|
||||
|
@ -85,3 +88,41 @@ def is_port_trusted(port):
|
|||
"""
|
||||
return port['device_owner'].startswith(
|
||||
constants.DEVICE_OWNER_NETWORK_PREFIX)
|
||||
|
||||
|
||||
class _AuthenticBase(object):
|
||||
def __init__(self, addr, **kwargs):
|
||||
super(_AuthenticBase, self).__init__(addr, **kwargs)
|
||||
self._initial_value = addr
|
||||
|
||||
def __str__(self):
|
||||
if isinstance(self._initial_value, six.string_types):
|
||||
return self._initial_value
|
||||
return super(_AuthenticBase, self).__str__()
|
||||
|
||||
# NOTE(ihrachys): override deepcopy because netaddr.* classes are
|
||||
# slot-based and hence would not copy _initial_value
|
||||
def __deepcopy__(self, memo):
|
||||
return self.__class__(self._initial_value)
|
||||
|
||||
|
||||
class AuthenticIPNetwork(_AuthenticBase, netaddr.IPNetwork):
|
||||
'''AuthenticIPNetwork class
|
||||
|
||||
This class retains the format of the IP network string passed during
|
||||
initialization.
|
||||
|
||||
This is useful when we want to make sure that we retain the format passed
|
||||
by a user through API.
|
||||
'''
|
||||
|
||||
|
||||
class AuthenticEUI(_AuthenticBase, netaddr.EUI):
|
||||
'''AuthenticEUI class
|
||||
|
||||
This class retains the format of the MAC address string passed during
|
||||
initialization.
|
||||
|
||||
This is useful when we want to make sure that we retain the format passed
|
||||
by a user through API.
|
||||
'''
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
---
|
||||
features:
|
||||
- The ``neutron.objects.common_types`` module is now available in
|
||||
``neutron_lib.objects.common_types``.
|
||||
- The ``get_random_EUI`` and ``get_random_ip_network`` functions are now
|
||||
available in ``neutron_lib.tests.tools``.
|
||||
- The ``AuthenticIPNetwork`` and ``AuthenticEUI`` classes are now available
|
||||
in ``neutron_lib.utils.net``.
|
|
@ -7,6 +7,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
|
|||
SQLAlchemy>=1.2.0 # MIT
|
||||
pecan!=1.0.2,!=1.0.3,!=1.0.4,!=1.2,>=1.0.0 # BSD
|
||||
keystoneauth1>=3.4.0 # Apache-2.0
|
||||
netaddr>=0.7.18 # BSD
|
||||
six>=1.10.0 # MIT
|
||||
stevedore>=1.20.0 # Apache-2.0
|
||||
oslo.concurrency>=3.26.0 # Apache-2.0
|
||||
|
|
Loading…
Reference in New Issue