use AuthenticIPNetwork and AuthenticEUI from neutron-lib

This patch switches over to neutron-lib's version of the the
_AuthenticBase, AuthenticEUI and AuthenticIPNetwork classes by
deleting them from neutron and using lib's version instead.

Depends-On: https://review.opendev.org/#/c/659881/

NeutronLibImpact

Change-Id: Ia3d3db401d6abcb9c9965b945bcd4c199f8e812b
This commit is contained in:
Boden R 2019-05-17 15:17:34 -06:00
parent 84335b3c7b
commit 39c7ac3ffe
16 changed files with 47 additions and 106 deletions

View File

@ -44,7 +44,6 @@ from oslo_db import exception as db_exc
from oslo_log import log as logging
from oslo_utils import excutils
import pkg_resources
import six
import neutron
from neutron._i18n import _
@ -703,44 +702,6 @@ def wait_until_true(predicate, timeout=60, sleep=1, exception=None):
raise WaitTimeout(_("Timed out after %d seconds") % timeout)
class _AuthenticBase(object):
def __init__(self, addr, **kwargs):
super(_AuthenticBase, self).__init__(addr, **kwargs)
self._initial_value = addr
def __str__(self):
if isinstance(self._initial_value, six.string_types):
return self._initial_value
return super(_AuthenticBase, self).__str__()
# NOTE(ihrachys): override deepcopy because netaddr.* classes are
# slot-based and hence would not copy _initial_value
def __deepcopy__(self, memo):
return self.__class__(self._initial_value)
class AuthenticEUI(_AuthenticBase, netaddr.EUI):
'''AuthenticEUI class
This class retains the format of the MAC address string passed during
initialization.
This is useful when we want to make sure that we retain the format passed
by a user through API.
'''
class AuthenticIPNetwork(_AuthenticBase, netaddr.IPNetwork):
'''AuthenticIPNetwork class
This class retains the format of the IP network string passed during
initialization.
This is useful when we want to make sure that we retain the format passed
by a user through API.
'''
class classproperty(object):
def __init__(self, f):
self.func = f

View File

@ -21,8 +21,8 @@ from neutron_lib.db import resource_extend
from neutron_lib.db import utils as db_utils
from neutron_lib.exceptions import allowedaddresspairs as addr_exc
from neutron_lib.objects import exceptions
from neutron_lib.utils import net as net_utils
from neutron.common import utils
from neutron.objects.port.extensions import (allowedaddresspairs
as obj_addr_pair)
@ -42,9 +42,9 @@ class AllowedAddressPairsMixin(object):
if 'mac_address' not in address_pair:
address_pair['mac_address'] = port['mac_address']
# retain string format as passed through API
mac_address = utils.AuthenticEUI(
mac_address = net_utils.AuthenticEUI(
address_pair['mac_address'])
ip_address = utils.AuthenticIPNetwork(
ip_address = net_utils.AuthenticIPNetwork(
address_pair['ip_address'])
pair_obj = obj_addr_pair.AllowedAddressPair(
context,

View File

@ -23,11 +23,11 @@ from neutron_lib.callbacks import resources
from neutron_lib.db import resource_extend
from neutron_lib.exceptions import extraroute as xroute_exc
from neutron_lib.utils import helpers
from neutron_lib.utils import net as net_utils
from oslo_config import cfg
from oslo_log import log as logging
from neutron._i18n import _
from neutron.common import utils
from neutron.conf.db import extraroute_db
from neutron.db import l3_db
from neutron.objects import router as l3_obj
@ -117,7 +117,7 @@ class ExtraRoute_dbonly_mixin(l3_db.L3_NAT_dbonly_mixin):
l3_obj.RouterRoute(
context,
router_id=router['id'],
destination=utils.AuthenticIPNetwork(route['destination']),
destination=net_utils.AuthenticIPNetwork(route['destination']),
nexthop=netaddr.IPAddress(route['nexthop'])).create()
LOG.debug('Removed routes are %s', removed)

View File

@ -25,13 +25,13 @@ from neutron_lib import constants as const
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as exc
from neutron_lib.utils import net as net_utils
from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy.orm import exc as orm_exc
from neutron._i18n import _
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
from neutron.db import db_base_plugin_common
from neutron.db import models_v2
from neutron.extensions import segment
@ -130,7 +130,7 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
for route_str in new_route_set - old_route_set:
route = subnet_obj.Route(
context,
destination=common_utils.AuthenticIPNetwork(
destination=net_utils.AuthenticIPNetwork(
route_str.partition("_")[0]),
nexthop=netaddr.IPAddress(route_str.partition("_")[2]),
subnet_id=id)
@ -555,7 +555,7 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
route = subnet_obj.Route(
context,
subnet_id=subnet.id,
destination=common_utils.AuthenticIPNetwork(
destination=net_utils.AuthenticIPNetwork(
rt['destination']),
nexthop=netaddr.IPAddress(rt['nexthop']))
route.create()

View File

@ -35,7 +35,6 @@ from sqlalchemy.orm import scoped_session
from neutron._i18n import _
from neutron.common import _constants as const
from neutron.common import utils
from neutron.db.models import securitygroup as sg_models
from neutron.db import rbac_db_mixin as rbac_mixin
from neutron.extensions import securitygroup as ext_sg
@ -387,7 +386,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase,
rule_dict = security_group_rule['security_group_rule']
remote_ip_prefix = rule_dict.get('remote_ip_prefix')
if remote_ip_prefix:
remote_ip_prefix = utils.AuthenticIPNetwork(remote_ip_prefix)
remote_ip_prefix = net.AuthenticIPNetwork(remote_ip_prefix)
protocol = rule_dict.get('protocol')
if protocol:

View File

@ -18,13 +18,13 @@ import netaddr
from neutron_lib import constants
from neutron_lib.db import constants as lib_db_const
from neutron_lib.objects import exceptions as o_exc
from neutron_lib.utils import net as net_utils
from oslo_serialization import jsonutils
from oslo_versionedobjects import fields as obj_fields
import six
from neutron._i18n import _
from neutron.common import utils
class HARouterEnumField(obj_fields.AutoTypedField):
@ -215,7 +215,7 @@ class MACAddress(obj_fields.FieldType):
@staticmethod
def from_primitive(obj, attr, value):
try:
return utils.AuthenticEUI(value)
return net_utils.AuthenticEUI(value)
except Exception:
msg = _("Field value %s is not a netaddr.EUI") % value
raise ValueError(msg)
@ -286,7 +286,7 @@ class IPNetwork(obj_fields.FieldType):
@staticmethod
def from_primitive(obj, attr, value):
try:
return utils.AuthenticIPNetwork(value)
return net_utils.AuthenticIPNetwork(value)
except Exception:
msg = _("Field value %s is not a netaddr.IPNetwork") % value
raise ValueError(msg)

View File

@ -12,9 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.utils import net as net_utils
from oslo_versionedobjects import fields as obj_fields
from neutron.common import utils
from neutron.db.models import metering as metering_models
from neutron.objects import base
from neutron.objects import common_types
@ -43,7 +43,7 @@ class MeteringLabelRule(base.NeutronDbObject):
def modify_fields_from_db(cls, db_obj):
result = super(MeteringLabelRule, cls).modify_fields_from_db(db_obj)
if 'remote_ip_prefix' in result:
result['remote_ip_prefix'] = utils.AuthenticIPNetwork(
result['remote_ip_prefix'] = net_utils.AuthenticIPNetwork(
result['remote_ip_prefix'])
return result

View File

@ -10,8 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.utils import net as net_utils
from neutron.common import utils
from neutron.db.models import allowed_address_pair as models
from neutron.objects import base
from neutron.objects import common_types
@ -54,10 +54,10 @@ class AllowedAddressPair(base.NeutronDbObject):
fields = super(AllowedAddressPair, cls).modify_fields_from_db(db_obj)
if 'ip_address' in fields:
# retain string format as stored in the database
fields['ip_address'] = utils.AuthenticIPNetwork(
fields['ip_address'] = net_utils.AuthenticIPNetwork(
fields['ip_address'])
if 'mac_address' in fields:
# retain string format as stored in the database
fields['mac_address'] = utils.AuthenticEUI(
fields['mac_address'] = net_utils.AuthenticEUI(
fields['mac_address'])
return fields

View File

@ -14,10 +14,10 @@
import netaddr
from neutron_lib import constants
from neutron_lib.utils import net as net_utils
from oslo_utils import versionutils
from oslo_versionedobjects import fields as obj_fields
from neutron.common import utils
from neutron.db.models import dns as dns_models
from neutron.db.models import l3
from neutron.db.models import securitygroup as sg_models
@ -440,7 +440,8 @@ class Port(base.NeutronDbObject):
# TODO(rossella_s): get rid of it once we switch the db model to using
# custom types.
if 'mac_address' in fields:
fields['mac_address'] = utils.AuthenticEUI(fields['mac_address'])
fields['mac_address'] = net_utils.AuthenticEUI(
fields['mac_address'])
distributed_port_binding = fields.get('distributed_bindings')
if distributed_port_binding:

View File

@ -17,11 +17,11 @@ import netaddr
from neutron_lib.api.definitions import availability_zone as az_def
from neutron_lib.api.validators import availability_zone as az_validator
from neutron_lib import constants as n_const
from neutron_lib.utils import net as net_utils
from oslo_versionedobjects import fields as obj_fields
import six
from sqlalchemy import func
from neutron.common import utils
from neutron.db.models import dvr as dvr_models
from neutron.db.models import l3
from neutron.db.models import l3_attrs
@ -51,7 +51,7 @@ class RouterRoute(base.NeutronDbObject):
def modify_fields_from_db(cls, db_obj):
result = super(RouterRoute, cls).modify_fields_from_db(db_obj)
if 'destination' in result:
result['destination'] = utils.AuthenticIPNetwork(
result['destination'] = net_utils.AuthenticIPNetwork(
result['destination'])
if 'nexthop' in result:
result['nexthop'] = netaddr.IPAddress(result['nexthop'])
@ -177,7 +177,8 @@ class DVRMacAddress(base.NeutronDbObject):
if 'mac_address' in fields:
# NOTE(tonytan4ever): Here uses AuthenticEUI to retain the format
# passed from API.
fields['mac_address'] = utils.AuthenticEUI(fields['mac_address'])
fields['mac_address'] = net_utils.AuthenticEUI(
fields['mac_address'])
return fields
@classmethod

View File

@ -10,10 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.utils import net as net_utils
from oslo_utils import versionutils
from oslo_versionedobjects import fields as obj_fields
from neutron.common import utils
from neutron.db.models import securitygroup as sg_models
from neutron.db import rbac_db_models
from neutron.objects import base
@ -153,5 +153,5 @@ class SecurityGroupRule(base.NeutronDbObject):
fields = super(SecurityGroupRule, cls).modify_fields_from_db(db_obj)
if 'remote_ip_prefix' in fields:
fields['remote_ip_prefix'] = (
utils.AuthenticIPNetwork(fields['remote_ip_prefix']))
net_utils.AuthenticIPNetwork(fields['remote_ip_prefix']))
return fields

View File

@ -14,11 +14,11 @@ import netaddr
from neutron_lib.api import validators
from neutron_lib import constants as const
from neutron_lib.db import model_query
from neutron_lib.utils import net as net_utils
from oslo_versionedobjects import fields as obj_fields
from sqlalchemy import and_, or_
from neutron.common import utils
from neutron.db.models import segment as segment_model
from neutron.db.models import subnet_service_type
from neutron.db import models_v2
@ -86,7 +86,7 @@ class Route(base.NeutronDbObject):
# TODO(korzen) remove this method when IP and CIDR decorator ready
result = super(Route, cls).modify_fields_from_db(db_obj)
if 'destination' in result:
result['destination'] = utils.AuthenticIPNetwork(
result['destination'] = net_utils.AuthenticIPNetwork(
result['destination'])
if 'nexthop' in result:
result['nexthop'] = netaddr.IPAddress(result['nexthop'])
@ -281,7 +281,7 @@ class Subnet(base.NeutronDbObject):
# TODO(korzen) remove this method when IP and CIDR decorator ready
result = super(Subnet, cls).modify_fields_from_db(db_obj)
if 'cidr' in result:
result['cidr'] = utils.AuthenticIPNetwork(result['cidr'])
result['cidr'] = net_utils.AuthenticIPNetwork(result['cidr'])
if 'gateway_ip' in result and result['gateway_ip'] is not None:
result['gateway_ip'] = netaddr.IPAddress(result['gateway_ip'])
return result

View File

@ -358,30 +358,6 @@ class TestPortRuleMasking(base.BaseTestCase):
self.compare_port_ranges_results(port_min, port_max)
class TestAuthenticEUI(base.BaseTestCase):
def test_retains_original_format(self):
for mac_str in ('FA-16-3E-73-A2-E9', 'fa:16:3e:73:a2:e9'):
self.assertEqual(mac_str, str(utils.AuthenticEUI(mac_str)))
def test_invalid_values(self):
for mac in ('XXXX', 'ypp', 'g3:vvv'):
with testtools.ExpectedException(netaddr.core.AddrFormatError):
utils.AuthenticEUI(mac)
class TestAuthenticIPNetwork(base.BaseTestCase):
def test_retains_original_format(self):
for addr_str in ('10.0.0.0/24', '10.0.0.10/32', '100.0.0.1'):
self.assertEqual(addr_str, str(utils.AuthenticIPNetwork(addr_str)))
def test_invalid_values(self):
for addr in ('XXXX', 'ypp', 'g3:vvv'):
with testtools.ExpectedException(netaddr.core.AddrFormatError):
utils.AuthenticIPNetwork(addr)
class TestExcDetails(base.BaseTestCase):
def test_attach_exc_details(self):

View File

@ -23,6 +23,7 @@ from neutron_lib import constants
from neutron_lib import context as nctx
from neutron_lib.db import api as db_api
from neutron_lib.plugins import directory
from neutron_lib.utils import net as net_utils
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_serialization import jsonutils
@ -30,7 +31,6 @@ from oslo_utils import uuidutils
import testscenarios
from webob import exc
from neutron.common import utils
from neutron.db import l3_db
from neutron.db import l3_gwmode_db
from neutron.db.models import l3 as l3_models
@ -186,7 +186,7 @@ class TestL3GwModeMixin(testlib_api.SqlTestCase):
id=self.int_sub_id,
project_id=self.tenant_id,
ip_version=constants.IP_VERSION_4,
cidr=utils.AuthenticIPNetwork('3.3.3.0/24'),
cidr=net_utils.AuthenticIPNetwork('3.3.3.0/24'),
gateway_ip=netaddr.IPAddress('3.3.3.1'),
network_id=self.int_net_id)
self.router_port = port_obj.Port(

View File

@ -17,9 +17,9 @@ import mock
from neutron_lib import constants as const
from neutron_lib import context
from neutron_lib.services.logapi import constants as log_const
from neutron_lib.utils import net as net_utils
from oslo_utils import uuidutils
from neutron.common import utils
from neutron.objects.logapi import logging_resource as log_object
from neutron.services.logapi.common import db_api
from neutron.services.logapi.common import validators
@ -224,7 +224,8 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase):
'ethertype': u'IPv4',
'protocol': u'tcp',
'dest_ip_prefix':
utils.AuthenticIPNetwork('10.0.0.1/32'),
net_utils.AuthenticIPNetwork(
'10.0.0.1/32'),
'security_group_id': sg_id}]
}],
'project_id': tenant_id
@ -294,7 +295,7 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase):
'port_range_min': 11,
'protocol': u'tcp',
'source_ip_prefix':
utils.AuthenticIPNetwork(
net_utils.AuthenticIPNetwork(
'10.0.0.1/32'),
'security_group_id': sg_id},
{'direction': 'egress',

View File

@ -20,10 +20,10 @@ from neutron_lib.db import api as db_api
from neutron_lib.plugins import constants
from neutron_lib.plugins import directory
from neutron_lib.tests import tools
from neutron_lib.utils import net as net_utils
from oslo_utils import uuidutils
from neutron.api.rpc.agentnotifiers import metering_rpc_agent_api
from neutron.common import utils
from neutron.db.metering import metering_rpc
from neutron.extensions import l3 as ext_l3
from neutron.extensions import metering as ext_metering
@ -264,8 +264,9 @@ class TestMeteringPlugin(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rule': {
'remote_ip_prefix': utils.AuthenticIPNetwork(
'10.0.0.0/24'),
'remote_ip_prefix':
net_utils.AuthenticIPNetwork(
'10.0.0.0/24'),
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
@ -281,13 +282,14 @@ class TestMeteringPlugin(test_db_base_plugin_v2.NeutronDbPluginV2TestCase,
'tenant_id': self.tenant_id,
'_metering_labels': [
{'rule': {
'remote_ip_prefix': utils.AuthenticIPNetwork(
'10.0.0.0/24'),
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
'id': second_uuid},
'id': self.uuid}],
'remote_ip_prefix':
net_utils.AuthenticIPNetwork(
'10.0.0.0/24'),
'direction': 'ingress',
'metering_label_id': self.uuid,
'excluded': False,
'id': second_uuid},
'id': self.uuid}],
'id': self.uuid}]
with self.router(tenant_id=self.tenant_id, set_context=True):