Add Local IP Extension and DB

This adds Local IP API extension, DB and OVO models, DB mixin,
migration and service plugin.

Partial-Bug: #1930200
Change-Id: I0ab7c5e9bc918f7fad282673ac6e32e1b01985c5
This commit is contained in:
Oleg Bondarev 2021-08-03 18:05:33 +03:00
parent 81f8524527
commit cd1d96863e
14 changed files with 1184 additions and 7 deletions

300
neutron/db/local_ip_db.py Normal file
View File

@ -0,0 +1,300 @@
# Copyright 2021 Huawei, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from neutron_lib.api import validators
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.db import api as db_api
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import local_ip as lip_exc
from neutron_lib.objects import exceptions as obj_exc
from neutron_lib.plugins import directory
from neutron_lib.plugins import utils as plugin_utils
from oslo_log import log as logging
from oslo_utils import uuidutils
from neutron._i18n import _
from neutron.extensions import local_ip as lip_ext
from neutron.objects import base as base_obj
from neutron.objects import local_ip as lip_obj
from neutron.objects import ports as port_obj
LOG = logging.getLogger(__name__)
@registry.has_registry_receivers
class LocalIPDbMixin(lip_ext.LocalIPPluginBase):
"""Mixin class to add Local IPs to db_base_plugin_v2."""
@property
def _core_plugin(self):
return directory.get_plugin()
@staticmethod
def _make_local_ip_dict(local_ip, fields=None):
res = local_ip.to_dict()
return db_utils.resource_fields(res, fields)
def _get_local_ip(self, context, id):
obj = lip_obj.LocalIP.get_object(context, id=id)
if obj is None:
raise lip_exc.LocalIPNotFound(id=id)
return obj
def _create_local_port(self, context, network_id, ip_address):
net_db = self._core_plugin._get_network(context, network_id)
if not any(s.ip_version == constants.IP_VERSION_4 for
s in net_db.subnets):
msg = _("Network %s does not contain any IPv4 subnet") % network_id
raise lib_exc.BadRequest(resource='local_ip', msg=msg)
# This local port is never exposed to the tenant.
# it is used purely for internal system and admin use when
# managing Local IPs.
port = {'project_id': '', # project intentionally not set
'network_id': network_id,
'admin_state_up': True,
'device_id': 'PENDING',
'device_owner': constants.DEVICE_OWNER_LOCAL_IP,
'status': constants.PORT_STATUS_NOTAPPLICABLE}
# If requested ip_address is not in the subnet,
# InvalidIpForSubnet exception will be raised.
if validators.is_attr_set(ip_address):
port['fixed_ips'] = [{'ip_address': ip_address}]
# 'status' in port dict could not be updated by default, use
# check_allow_post to stop the verification of system
return plugin_utils.create_port(
self._core_plugin, context.elevated(),
{'port': port}, check_allow_post=False)
def _get_local_ip_address(self, port, requested_ip):
fixed_ips = port.fixed_ips
if len(fixed_ips) == 0:
raise lip_exc.LocalIPNoIP(port_id=port.id)
if len(fixed_ips) == 1:
fixed_ip = str(fixed_ips[0].ip_address)
if (validators.is_attr_set(requested_ip) and
(requested_ip != fixed_ip)):
raise lip_exc.LocalIPRequestedIPNotFound(
port_id=port.id, ip=requested_ip)
return fixed_ip
elif validators.is_attr_set(requested_ip):
for fixed_ip in fixed_ips:
if str(fixed_ip.ip_address) == requested_ip:
return requested_ip
raise lip_exc.LocalIPRequestedIPNotFound(
port_id=port.id, ip=requested_ip)
else:
raise lip_exc.LocalIPNoRequestedIP(port_id=port.id)
@db_api.retry_if_session_inactive()
def create_local_ip(self, context, local_ip):
"""Create a Local IP."""
fields = local_ip['local_ip']
local_port_id = fields.get('local_port_id')
local_ip_address = fields.get('local_ip_address')
network_id = fields.get('network_id')
new_local_port = False
if validators.is_attr_set(local_port_id):
local_port = port_obj.Port.get_object(context, id=local_port_id)
if not local_port:
msg = _("Port %s not found") % local_port_id
raise lib_exc.BadRequest(resource='local_ip', msg=msg)
local_ip_address = self._get_local_ip_address(local_port,
local_ip_address)
elif validators.is_attr_set(network_id):
local_port = self._create_local_port(context, network_id,
local_ip_address)
local_port_id = local_port['id']
local_ip_address = local_port['fixed_ips'][0]['ip_address']
new_local_port = True
else:
raise lip_exc.LocalIPPortOrNetworkRequired()
if new_local_port:
ctx_mgr = plugin_utils.delete_port_on_error(
self._core_plugin, context.elevated(),
local_port_id)
else:
ctx_mgr = contextlib.suppress()
with ctx_mgr, db_api.CONTEXT_WRITER.using(context):
args = {'id': uuidutils.generate_uuid(),
'name': fields['name'],
'description': fields['description'],
'project_id': fields['project_id'],
'local_port_id': local_port_id,
'network_id': local_port['network_id'],
'local_ip_address': local_ip_address,
'ip_mode': fields['ip_mode']}
lip = lip_obj.LocalIP(context, **args)
lip.create()
if new_local_port:
self._core_plugin.update_port(
context.elevated(), local_port_id,
{'port': {'device_id': lip.id,
'project_id': lip.project_id}})
return self._make_local_ip_dict(lip)
@db_api.retry_if_session_inactive()
def update_local_ip(self, context, lip_id, local_ip):
fields = local_ip['local_ip']
lip = self._get_local_ip(context, lip_id)
lip.update_fields(fields)
lip.update()
lip_dict = self._make_local_ip_dict(lip)
return lip_dict
def get_local_ip(self, context, lip_id, fields=None):
lip = self._get_local_ip(context, lip_id)
return self._make_local_ip_dict(lip, fields)
def get_local_ips(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
lips = lip_obj.LocalIP.get_objects(
context, _pager=pager, **filters)
return [
self._make_local_ip_dict(lip, fields)
for lip in lips
]
@db_api.retry_if_session_inactive()
def delete_local_ip(self, context, lip_id):
with db_api.CONTEXT_WRITER.using(context):
if lip_obj.LocalIPAssociation.get_objects(context.elevated(),
local_ip_id=lip_id):
raise lip_exc.LocalIPInUse(id=lip_id)
lip = self._get_local_ip(context, lip_id)
local_port = port_obj.Port.get_object(
context, id=lip.local_port_id)
lip.delete()
if local_port.device_owner == constants.DEVICE_OWNER_LOCAL_IP:
self._core_plugin.delete_port(context.elevated(),
local_port.id)
@staticmethod
def _make_local_ip_assoc_dict(local_ip_association, fields=None):
res = local_ip_association.to_dict()
res = db_utils.resource_fields(res, fields)
fixed_port = local_ip_association.db_obj.port
res['local_ip_address'] = (
local_ip_association.local_ip.local_ip_address)
if fixed_port.port_bindings:
res['host'] = fixed_port.port_bindings[0].host
else:
res['host'] = ''
return res
@db_api.CONTEXT_WRITER
def _create_local_ip_port_association(self, context, local_ip_id,
port_association):
fields = port_association['port_association']
fixed_port = port_obj.Port.get_object(
context, id=fields['fixed_port_id'])
if not fixed_port:
msg = _("Port %s not found") % fixed_port.id
raise lib_exc.BadRequest(
resource='local_ip_port_association', msg=msg)
requested_ip = fields['fixed_ip']
if validators.is_attr_set(requested_ip):
for ip in fixed_port.fixed_ips:
if str(ip.ip_address) == requested_ip:
break
else:
raise lip_exc.LocalIPRequestedIPNotFound(
port_id=fixed_port.id, ip=requested_ip)
else:
if not fixed_port.fixed_ips:
raise lip_exc.LocalIPNoIP(port_id=fixed_port.id)
if len(fixed_port.fixed_ips) > 1:
raise lip_exc.LocalIPNoRequestedIP(port_id=fixed_port.id)
requested_ip = fixed_port.fixed_ips[0]['ip_address']
args = {'local_ip_id': local_ip_id,
'fixed_port_id': fixed_port.id,
'fixed_ip': requested_ip}
lip_assoc = lip_obj.LocalIPAssociation(context, **args)
try:
lip_assoc.create()
except obj_exc.NeutronDbObjectDuplicateEntry:
LOG.error("Local IP %(lip)s association to port "
"%(port)s already exists.",
{'lip': local_ip_id,
'port': fixed_port.id})
return
return lip_assoc
def create_local_ip_port_association(self, context, local_ip_id,
port_association):
lip_assoc = self._create_local_ip_port_association(
context, local_ip_id, port_association)
return self._make_local_ip_assoc_dict(lip_assoc)
def get_local_ip_port_association(self, context, fixed_port_id,
local_ip_id, fields=None):
assoc = lip_obj.LocalIPAssociation.get_object(
context, local_ip_id=local_ip_id, fixed_port_id=fixed_port_id)
if assoc is None:
raise lip_exc.LocalIPAssociationNotFound(
local_ip_id=local_ip_id, port_id=fixed_port_id)
return self._make_local_ip_assoc_dict(assoc, fields)
def get_local_ip_port_associations(self, context, local_ip_id,
filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
# TODO(obondarev): fix bug that 'id' sort is added for subresources
sorts.remove(('id', True))
pager = base_obj.Pager(sorts, limit, page_reverse, marker)
lip_associations = lip_obj.LocalIPAssociation.get_objects(
context, _pager=pager, local_ip_id=local_ip_id, **filters)
return [
self._make_local_ip_assoc_dict(lip_assoc, fields)
for lip_assoc in lip_associations]
@db_api.CONTEXT_WRITER
def delete_local_ip_port_association(self, context, fixed_port_id,
local_ip_id):
assoc = lip_obj.LocalIPAssociation.get_object(
context, local_ip_id=local_ip_id, fixed_port_id=fixed_port_id)
if not assoc:
raise lip_exc.LocalIPAssociationNotFound(local_ip_id=local_ip_id,
port_id=fixed_port_id)
assoc.delete()
return assoc
@staticmethod
@registry.receives(resources.PORT, [events.BEFORE_DELETE])
def _prevent_local_port_delete_callback(resource, event,
trigger, payload=None):
port_id = payload.resource_id
if lip_obj.LocalIP.count(payload.context, local_port_id=port_id):
reason = _('still referenced by Local IPs')
raise lib_exc.ServicePortInUse(port_id=port_id,
reason=reason)

View File

@ -1 +1 @@
e981acd076d3
76df7844a8c6

View File

@ -0,0 +1,72 @@
# Copyright 2021 Huawei, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""add Local IP tables
Revision ID: 76df7844a8c6
Revises: e981acd076d3
Create Date: 2021-08-05 14:04:01.380941
"""
from alembic import op
from neutron_lib.db import constants as db_const
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '76df7844a8c6'
down_revision = 'e981acd076d3'
def upgrade():
op.create_table(
'local_ips',
sa.Column('id', sa.String(
length=db_const.UUID_FIELD_SIZE),
primary_key=True),
sa.Column('standard_attr_id', sa.BigInteger(),
sa.ForeignKey('standardattributes.id', ondelete='CASCADE'),
nullable=False),
sa.Column('name', sa.String(length=db_const.NAME_FIELD_SIZE)),
sa.Column('project_id', sa.String(
length=db_const.PROJECT_ID_FIELD_SIZE),
index=True),
sa.Column('local_port_id', sa.String(
length=db_const.UUID_FIELD_SIZE),
sa.ForeignKey('ports.id'),
nullable=False),
sa.Column('network_id', sa.String(
length=db_const.UUID_FIELD_SIZE),
nullable=False),
sa.Column('local_ip_address', sa.String(
length=db_const.IP_ADDR_FIELD_SIZE),
nullable=False),
sa.Column('ip_mode', sa.String(length=32),
nullable=False),
sa.UniqueConstraint('standard_attr_id')
)
op.create_table(
'local_ip_associations',
sa.Column('local_ip_id', sa.String(length=db_const.UUID_FIELD_SIZE),
sa.ForeignKey('local_ips.id'),
primary_key=True),
sa.Column('fixed_port_id', sa.String(length=db_const.UUID_FIELD_SIZE),
sa.ForeignKey('ports.id', ondelete='CASCADE'),
primary_key=True),
sa.Column('fixed_ip', sa.String(length=db_const.IP_ADDR_FIELD_SIZE),
nullable=False),
)

View File

@ -0,0 +1,75 @@
# Copyright 2021 Huawei, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import local_ip as local_ip_apidef
from neutron_lib.db import constants as db_const
from neutron_lib.db import model_base
from neutron_lib.db import standard_attr
import sqlalchemy as sa
from sqlalchemy import orm
class LocalIP(standard_attr.HasStandardAttributes, model_base.BASEV2,
model_base.HasId, model_base.HasProject):
"""Represents a Local IP address.
This IP address may or may not be allocated to a tenant, and may or
may not be associated with one or more internal ports.
"""
__tablename__ = 'local_ips'
name = sa.Column(sa.String(db_const.NAME_FIELD_SIZE))
local_port_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
sa.ForeignKey('ports.id'),
nullable=False)
network_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
nullable=False)
local_ip_address = sa.Column(sa.String(db_const.IP_ADDR_FIELD_SIZE),
nullable=False)
ip_mode = sa.Column(sa.String(32), nullable=False)
api_collections = [local_ip_apidef.COLLECTION_NAME]
collection_resource_map = {
local_ip_apidef.COLLECTION_NAME: local_ip_apidef.RESOURCE_NAME}
class LocalIPAssociation(model_base.BASEV2):
"""Represents an association between a Local IP and an internal Port."""
__tablename__ = 'local_ip_associations'
local_ip_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
sa.ForeignKey('local_ips.id'),
primary_key=True)
fixed_port_id = sa.Column(sa.String(db_const.UUID_FIELD_SIZE),
sa.ForeignKey('ports.id', ondelete='CASCADE'),
primary_key=True)
fixed_ip = sa.Column(sa.String(db_const.IP_ADDR_FIELD_SIZE),
nullable=False)
port = orm.relationship("Port",
lazy='joined',
foreign_keys=fixed_port_id)
local_ip = orm.relationship("LocalIP",
lazy='joined',
foreign_keys=local_ip_id,
backref=orm.backref("port_associations",
uselist=True))
# standard attributes support:
api_collections = []
api_sub_resources = [local_ip_apidef.LOCAL_IP_ASSOCIATIONS]
collection_resource_map = {local_ip_apidef.LOCAL_IP_ASSOCIATIONS:
local_ip_apidef.LOCAL_IP_ASSOCIATION}

View File

@ -0,0 +1,123 @@
# Copyright 2021 Huawei, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from neutron_lib.api.definitions import local_ip as local_ip_apidef
from neutron_lib.api import extensions as api_extensions
from neutron_lib.plugins import directory
from neutron_lib.services import base as service_base
from neutron.api import extensions
from neutron.api.v2 import base
PLUGIN_TYPE = 'LOCAL_IP'
class Local_ip(api_extensions.APIExtensionDescriptor):
"""Extension class supporting Local IPs."""
api_definition = local_ip_apidef
@classmethod
def get_resources(cls):
plugin = directory.get_plugin(PLUGIN_TYPE)
collection_name = local_ip_apidef.COLLECTION_NAME.replace('_', '-')
params = local_ip_apidef.RESOURCE_ATTRIBUTE_MAP.get(
local_ip_apidef.COLLECTION_NAME, dict())
controller = base.create_resource(collection_name,
local_ip_apidef.RESOURCE_NAME,
plugin, params,
allow_bulk=True,
allow_pagination=True,
allow_sorting=True)
ext = extensions.ResourceExtension(collection_name, controller,
attr_map=params)
resources = [ext]
for collection_name in local_ip_apidef.SUB_RESOURCE_ATTRIBUTE_MAP:
resource_name = local_ip_apidef.LOCAL_IP_ASSOCIATION
parent = local_ip_apidef.SUB_RESOURCE_ATTRIBUTE_MAP[
collection_name].get('parent')
params = local_ip_apidef.SUB_RESOURCE_ATTRIBUTE_MAP[
collection_name].get('parameters')
controller = base.create_resource(collection_name, resource_name,
plugin, params,
allow_bulk=True,
parent=parent,
allow_pagination=True,
allow_sorting=True)
resource = extensions.ResourceExtension(
collection_name,
controller, parent,
attr_map=params)
resources.append(resource)
return resources
class LocalIPPluginBase(service_base.ServicePluginBase, metaclass=abc.ABCMeta):
@classmethod
def get_plugin_type(cls):
return PLUGIN_TYPE
def get_plugin_description(self):
return "Local IP Service Plugin"
@abc.abstractmethod
def create_local_ip(self, context, local_ip):
pass
@abc.abstractmethod
def update_local_ip(self, context, lip_id, local_ip):
pass
@abc.abstractmethod
def get_local_ip(self, context, lip_id, fields=None):
pass
@abc.abstractmethod
def get_local_ips(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None,
page_reverse=False):
pass
@abc.abstractmethod
def delete_local_ip(self, context, lip_id):
pass
@abc.abstractmethod
def create_local_ip_port_association(self, context, local_ip_id,
port_association):
pass
@abc.abstractmethod
def get_local_ip_port_association(self, context, fixed_port_id,
local_ip_id, fields=None):
pass
@abc.abstractmethod
def get_local_ip_port_associations(self, context, local_ip_id,
filters=None, fields=None,
sorts=None, limit=None,
marker=None, page_reverse=False):
pass
@abc.abstractmethod
def delete_local_ip_port_association(self, context, fixed_port_id,
local_ip_id):
pass

112
neutron/objects/local_ip.py Normal file
View File

@ -0,0 +1,112 @@
# Copyright 2021 Huawei, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron_lib.objects import common_types
from oslo_versionedobjects import fields as obj_fields
from neutron.db.models import local_ip as lip_db
from neutron.objects import base
@base.NeutronObjectRegistry.register
class LocalIP(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = lip_db.LocalIP
fields = {
'id': common_types.UUIDField(),
'name': obj_fields.StringField(nullable=True),
'description': obj_fields.StringField(nullable=True),
'project_id': obj_fields.StringField(nullable=True),
'local_port_id': common_types.UUIDField(),
'network_id': common_types.UUIDField(),
'local_ip_address': obj_fields.IPAddressField(),
'ip_mode': obj_fields.StringField(),
}
foreign_keys = {'Port': {'local_port_id': 'id'},
'LocalIPAssociation': {'id': 'local_ip_id'}}
fields_no_update = ['project_id', 'local_ip_address',
'network_id', 'local_port_id']
synthetic_fields = []
@classmethod
def modify_fields_to_db(cls, fields):
result = super(LocalIP, cls).modify_fields_to_db(fields)
if 'local_ip_address' in result:
result['local_ip_address'] = cls.filter_to_str(
result['local_ip_address'])
return result
@classmethod
def modify_fields_from_db(cls, db_obj):
fields = super(LocalIP, cls).modify_fields_from_db(db_obj)
if 'local_ip_address' in fields:
fields['local_ip_address'] = netaddr.IPAddress(
fields['local_ip_address'])
return fields
@base.NeutronObjectRegistry.register
class LocalIPAssociation(base.NeutronDbObject):
# Version 1.0: Initial version
VERSION = '1.0'
db_model = lip_db.LocalIPAssociation
fields = {
'id': obj_fields.StringField(),
'local_ip_id': common_types.UUIDField(nullable=False),
'fixed_port_id': common_types.UUIDField(nullable=False),
'fixed_ip': obj_fields.IPAddressField(nullable=False),
'local_ip': obj_fields.ObjectField('LocalIP'),
}
primary_keys = ['local_ip_id', 'fixed_port_id']
foreign_keys = {'LocalIP': {'local_ip_id': 'id'},
'Port': {'fixed_port_id': 'id'}}
fields_no_update = ['local_ip_id', 'fixed_port_id', 'fixed_ip']
synthetic_fields = ['id', 'local_ip']
@classmethod
def modify_fields_to_db(cls, fields):
result = super(LocalIPAssociation, cls).modify_fields_to_db(fields)
if 'fixed_ip' in result:
result['fixed_ip'] = cls.filter_to_str(result['fixed_ip'])
return result
@classmethod
def modify_fields_from_db(cls, db_obj):
fields = super(LocalIPAssociation, cls).modify_fields_from_db(db_obj)
if 'fixed_ip' in fields:
fields['fixed_ip'] = netaddr.IPAddress(fields['fixed_ip'])
return fields
def obj_load_attr(self, attrname):
if attrname in ['id']:
self._set_id()
super(LocalIPAssociation, self).obj_load_attr(attrname)
def from_db_object(self, db_obj):
super(LocalIPAssociation, self).from_db_object(db_obj)
self._set_id()
def _set_id(self):
self.id = self.local_ip_id + '_' + self.fixed_port_id
self.obj_reset_changes(['id'])

View File

View File

@ -0,0 +1,28 @@
# Copyright 2021 Huawei, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api.definitions import local_ip as local_ip_apidef
from neutron.db import local_ip_db
class LocalIPPlugin(local_ip_db.LocalIPDbMixin):
"""Implementation of the Neutron logging api plugin."""
supported_extension_aliases = [local_ip_apidef.ALIAS]
__native_pagination_support = True
__native_sorting_support = True
__filter_validation_support = True

View File

@ -250,9 +250,10 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
subresource=subresource, context=context)
def new_list_request(self, resource, fmt=None, params=None,
subresource=None):
subresource=None, parent_id=None):
return self._req(
'GET', resource, None, fmt, params=params, subresource=subresource
'GET', resource, None, fmt, params=params, id=parent_id,
subresource=subresource
)
def new_show_request(self, resource, id, fmt=None,
@ -598,8 +599,10 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
def _delete(self, collection, id,
expected_code=webob.exc.HTTPNoContent.code,
neutron_context=None, headers=None):
req = self.new_delete_request(collection, id, headers=headers)
neutron_context=None, headers=None, subresource=None,
sub_id=None):
req = self.new_delete_request(collection, id, headers=headers,
subresource=subresource, sub_id=sub_id)
if neutron_context:
# create a specific auth context for this request
req.environ['neutron.context'] = neutron_context
@ -636,9 +639,12 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
return self.deserialize(self.fmt, res)
def _list(self, resource, fmt=None, neutron_context=None,
query_params=None, expected_code=webob.exc.HTTPOk.code):
query_params=None, expected_code=webob.exc.HTTPOk.code,
parent_id=None, subresource=None):
fmt = fmt or self.fmt
req = self.new_list_request(resource, fmt, query_params)
req = self.new_list_request(resource, fmt, query_params,
subresource=subresource,
parent_id=parent_id)
if neutron_context:
req.environ['neutron.context'] = neutron_context
res = req.get_response(self._api_for_resource(resource))

View File

@ -0,0 +1,375 @@
# Copyright 2021 Huawei, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import netaddr
from neutron_lib.api.definitions import local_ip as apidef
from neutron_lib import constants
from neutron_lib import context
import webob.exc
from neutron.extensions import local_ip as lip_ext
from neutron.tests.unit.db import test_db_base_plugin_v2
class LocalIPTestExtensionManager(object):
def get_resources(self):
return lip_ext.Local_ip.get_resources()
def get_actions(self):
return []
def get_request_extensions(self):
return []
class LocalIPTestBase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
def _create_local_ip(self, **kwargs):
kwargs.setdefault('project_id', self._tenant_id)
local_ip = {'local_ip': {}}
for k, v in kwargs.items():
local_ip['local_ip'][k] = v
req = self.new_create_request('local-ips', local_ip)
neutron_context = context.Context(
'', kwargs.get('project_id', self._tenant_id))
req.environ['neutron.context'] = neutron_context
res = req.get_response(self.ext_api)
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(self.fmt, res)
def _update_local_ip(self, lip_id, data):
update_req = self.new_update_request(
'local-ips', data, lip_id)
update_req.environ['neutron.context'] = context.Context(
'', self._tenant_id)
res = update_req.get_response(self.ext_api)
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(self.fmt, res)
def _create_local_ip_association(self, local_ip_id, fixed_port_id,
fixed_ip=None):
local_ip_assoc = {'port_association': {'fixed_port_id': fixed_port_id,
'fixed_ip': fixed_ip}}
req = self.new_create_request('local_ips',
data=local_ip_assoc,
id=local_ip_id,
subresource='port_associations')
neutron_context = context.Context('', self._tenant_id)
req.environ['neutron.context'] = neutron_context
res = req.get_response(self.ext_api)
if res.status_int >= webob.exc.HTTPClientError.code:
raise webob.exc.HTTPClientError(code=res.status_int)
return self.deserialize(self.fmt, res)
@contextlib.contextmanager
def local_ip(self, **kwargs):
yield self._create_local_ip(**kwargs)
@contextlib.contextmanager
def local_ip_assoc(self, local_ip_id, fixed_port_id,
fixed_ip=None):
yield self._create_local_ip_association(
local_ip_id, fixed_port_id, fixed_ip)
class TestLocalIP(LocalIPTestBase):
def setUp(self):
ext_mgr = LocalIPTestExtensionManager()
svc_plugins = (
'neutron.services.local_ip.local_ip_plugin.LocalIPPlugin',)
super(TestLocalIP, self).setUp(ext_mgr=ext_mgr,
service_plugins=svc_plugins)
def test_create_local_ip_with_local_port_id(self):
with self.port() as p:
local_port = p['port']
with self.local_ip(local_port_id=local_port['id'],
name='testname',
description='testdescr') as lip:
lip = lip['local_ip']
self.assertEqual('testname', lip['name'])
self.assertEqual('testdescr', lip['description'])
self.assertEqual(local_port['id'], lip['local_port_id'])
self.assertEqual(local_port['fixed_ips'][0]['ip_address'],
lip['local_ip_address'])
self.assertEqual(apidef.IP_MODE_TRANSLATE,
lip['ip_mode'])
def test_create_local_ip_with_local_port_id_and_ip(self):
with self.port() as p:
local_port = p['port']
ip_addr = local_port['fixed_ips'][0]['ip_address']
with self.local_ip(local_port_id=local_port['id'],
local_ip_address=ip_addr) as lip:
lip = lip['local_ip']
self.assertEqual(local_port['id'], lip['local_port_id'])
self.assertEqual(ip_addr, lip['local_ip_address'])
def test_create_local_ip_with_local_port_id_and_wrong_ip(self):
with self.port() as p:
local_port = p['port']
try:
self._create_local_ip(local_port_id=local_port['id'],
local_ip_address='100.0.0.100')
self.fail("Local IP created with IP "
"not belonging to local port")
except webob.exc.HTTPClientError as e:
self.assertEqual(400, e.code)
def test_create_local_ip_with_local_port_id_no_ip(self):
with self.port() as p:
local_port = p['port']
data = {'port': {'fixed_ips': []}}
req = self.new_update_request('ports', data, local_port['id'])
req.get_response(self.api)
try:
self._create_local_ip(local_port_id=local_port['id'])
self.fail("Local IP created with Port "
"having no IPs")
except webob.exc.HTTPClientError as e:
self.assertEqual(400, e.code)
def _port_add_new_ip(self, port):
subnet_id = port['fixed_ips'][0]['subnet_id']
cur_ip = port['fixed_ips'][0]['ip_address']
data = {'port': {}}
data['port']['fixed_ips'] = [
{'subnet_id': subnet_id, 'ip_address': cur_ip},
{'subnet_id': subnet_id}]
req = self.new_update_request('ports', data, port['id'])
port = self.deserialize(self.fmt, req.get_response(self.api))['port']
for ip in port['fixed_ips']:
if ip['ip_address'] != cur_ip:
return ip['ip_address']
def test_create_local_ip_with_local_port_id_and_multiple_ips(self):
with self.port() as p:
local_port = p['port']
new_ip = self._port_add_new_ip(local_port)
with self.local_ip(local_port_id=local_port['id'],
local_ip_address=new_ip) as lip:
lip = lip['local_ip']
self.assertEqual(local_port['id'], lip['local_port_id'])
self.assertEqual(new_ip, lip['local_ip_address'])
def test_create_local_ip_with_local_port_id_and_mult_ips_wrong_ip(self):
with self.port() as p:
local_port = p['port']
self._port_add_new_ip(local_port)
try:
self._create_local_ip(local_port_id=local_port['id'],
local_ip_address='100.0.0.100')
self.fail("Local IP created with IP "
"not belonging to local port")
except webob.exc.HTTPClientError as e:
self.assertEqual(400, e.code)
def test_create_local_ip_with_network_id(self):
with self.subnet() as s:
subnet = s['subnet']
with self.local_ip(network_id=subnet['network_id'],
ip_mode=apidef.IP_MODE_PASSTHROUGH) as lip:
lip = lip['local_ip']
self.assertEqual(subnet['network_id'], lip['network_id'])
self.assertEqual(apidef.IP_MODE_PASSTHROUGH, lip['ip_mode'])
req = self.new_show_request(
'ports', lip['local_port_id'], self.fmt)
local_port = self.deserialize(
self.fmt, req.get_response(self.api))['port']
self.assertEqual(constants.DEVICE_OWNER_LOCAL_IP,
local_port['device_owner'])
self.assertEqual(lip['id'], local_port['device_id'])
self.assertEqual(lip['local_ip_address'],
local_port['fixed_ips'][0]['ip_address'])
def test_create_local_ip_with_network_id_and_ip(self):
with self.subnet() as s:
subnet = s['subnet']
ip_addr = str(netaddr.IPNetwork(subnet['cidr']).ip + 10)
with self.local_ip(network_id=subnet['network_id'],
local_ip_address=ip_addr) as lip:
lip = lip['local_ip']
self.assertEqual(subnet['network_id'], lip['network_id'])
self.assertEqual(ip_addr, lip['local_ip_address'])
req = self.new_show_request(
'ports', lip['local_port_id'], self.fmt)
local_port = self.deserialize(
self.fmt, req.get_response(self.api))['port']
self.assertEqual(lip['local_ip_address'],
local_port['fixed_ips'][0]['ip_address'])
def test_update_local_ip(self):
with self.subnet() as s:
subnet = s['subnet']
with self.local_ip(network_id=subnet['network_id']) as lip:
data = {'local_ip': {'name': 'bar', 'description': 'bar'}}
lip = self._update_local_ip(lip['local_ip']['id'], data)
self.assertEqual(lip['local_ip']['name'],
data['local_ip']['name'])
self.assertEqual(lip['local_ip']['description'],
data['local_ip']['description'])
def test_list_local_ips(self):
with self.subnet() as s:
subnet = s['subnet']
with self.local_ip(network_id=subnet['network_id']),\
self.local_ip(network_id=subnet['network_id']):
res = self._list('local-ips')
self.assertEqual(2, len(res['local_ips']))
def test_get_local_ip(self):
with self.subnet() as s:
subnet = s['subnet']
with self.local_ip(network_id=subnet['network_id']) as lip:
req = self.new_show_request('local-ips',
lip['local_ip']['id'])
res = self.deserialize(
self.fmt, req.get_response(self.ext_api))
self.assertEqual(lip['local_ip']['id'],
res['local_ip']['id'])
def test_delete_local_ip(self):
with self.subnet() as s:
subnet = s['subnet']
lip = self._create_local_ip(network_id=subnet['network_id'])
self._delete('local-ips', lip['local_ip']['id'])
self._show('local-ips', lip['local_ip']['id'],
expected_code=webob.exc.HTTPNotFound.code)
def test_create_local_ip_association(self):
with self.subnet() as s, self.port() as p:
subnet = s['subnet']
fixed_port = p['port']
with self.local_ip(network_id=subnet['network_id'],
ip_mode=apidef.IP_MODE_PASSTHROUGH) as lip:
lip = lip['local_ip']
with self.local_ip_assoc(lip['id'], fixed_port['id']) as assoc:
assoc = assoc['port_association']
self.assertEqual(fixed_port['id'], assoc['fixed_port_id'])
self.assertEqual(fixed_port['fixed_ips'][0]['ip_address'],
assoc['fixed_ip'])
def test_create_local_ip_association_request_ip(self):
with self.subnet() as s, self.port() as p:
subnet = s['subnet']
fixed_port = p['port']
fixed_ip = fixed_port['fixed_ips'][0]['ip_address']
with self.local_ip(network_id=subnet['network_id']) as lip:
lip = lip['local_ip']
with self.local_ip_assoc(lip['id'], fixed_port['id'],
fixed_ip=fixed_ip) as assoc:
assoc = assoc['port_association']
self.assertEqual(fixed_port['id'], assoc['fixed_port_id'])
self.assertEqual(fixed_ip, assoc['fixed_ip'])
def test_create_local_ip_association_request_ip_not_found(self):
with self.subnet() as s, self.port() as p:
subnet = s['subnet']
fixed_port = p['port']
with self.local_ip(network_id=subnet['network_id']) as lip:
lip = lip['local_ip']
try:
self._create_local_ip_association(
lip['id'], fixed_port['id'], fixed_ip='100.0.0.100')
self.fail("Local IP associated with IP "
"not belonging to fixed port")
except webob.exc.HTTPClientError as e:
self.assertEqual(400, e.code)
def test_create_local_ip_association_multiple_ips(self):
with self.subnet() as s, self.port() as p:
subnet = s['subnet']
fixed_port = p['port']
new_ip = self._port_add_new_ip(fixed_port)
lip = self._create_local_ip(network_id=subnet['network_id'])
lip = lip['local_ip']
assoc = self._create_local_ip_association(
lip['id'], fixed_port['id'], new_ip)['port_association']
self.assertEqual(new_ip, assoc['fixed_ip'])
def test_create_local_ip_association_multiple_ips_negative(self):
with self.subnet() as s, self.port() as p:
subnet = s['subnet']
fixed_port = p['port']
self._port_add_new_ip(fixed_port)
lip = self._create_local_ip(network_id=subnet['network_id'])
lip = lip['local_ip']
try:
self._create_local_ip_association(lip['id'], fixed_port['id'])
self.fail("Local IP associated with Port "
"with multiple IPs and no IP specified")
except webob.exc.HTTPClientError as e:
self.assertEqual(400, e.code)
def test_create_local_ip_association_no_ips(self):
with self.subnet() as s, self.port() as p:
subnet = s['subnet']
fixed_port = p['port']
data = {'port': {'fixed_ips': []}}
req = self.new_update_request('ports', data, fixed_port['id'])
req.get_response(self.api)
lip = self._create_local_ip(network_id=subnet['network_id'])
lip = lip['local_ip']
try:
self._create_local_ip_association(
lip['id'], fixed_port['id'])
self.fail("Local IP associated with Port "
"with no IPs")
except webob.exc.HTTPClientError as e:
self.assertEqual(400, e.code)
def test_list_local_ip_associations(self):
with self.subnet() as s, self.port() as p1, self.port() as p2:
subnet = s['subnet']
port1 = p1['port']
port2 = p2['port']
lip = self._create_local_ip(network_id=subnet['network_id'])
lip = lip['local_ip']
self._create_local_ip_association(lip['id'], port1['id'])
self._create_local_ip_association(lip['id'], port2['id'])
res = self._list('local_ips', parent_id=lip['id'],
subresource='port_associations')
self.assertEqual(2, len(res['port_associations']))
def test_delete_local_ip_association(self):
with self.subnet() as s, self.port() as p1, self.port() as p2:
subnet = s['subnet']
port1 = p1['port']
port2 = p2['port']
lip = self._create_local_ip(network_id=subnet['network_id'])
lip = lip['local_ip']
self._create_local_ip_association(lip['id'], port1['id'])
self._create_local_ip_association(lip['id'], port2['id'])
res = self._list('local_ips', parent_id=lip['id'],
subresource='port_associations')
self.assertEqual(2, len(res['port_associations']))
self._delete('local_ips', lip['id'],
subresource='port_associations',
sub_id=port1['id'])
res = self._list('local_ips', parent_id=lip['id'],
subresource='port_associations')
self.assertEqual(1, len(res['port_associations']))

View File

@ -44,6 +44,7 @@ from neutron.objects import agent
from neutron.objects import base
from neutron.objects.db import api as obj_db_api
from neutron.objects import flavor
from neutron.objects import local_ip
from neutron.objects import network as net_obj
from neutron.objects.port.extensions import port_device_profile
from neutron.objects.port.extensions import port_numa_affinity_policy
@ -1659,6 +1660,23 @@ class BaseDbObjectTestCase(_BaseObjectTestCase,
_address_group.create()
return _address_group.id
def _create_test_local_ip_id(self, **lip_attrs):
return self._create_test_local_ip(**lip_attrs)['id']
def _create_test_local_ip(self, **lip_attrs):
if 'network_id' not in lip_attrs:
lip_attrs['network_id'] = self._create_test_network_id()
if 'local_port_id' not in lip_attrs:
lip_attrs['local_port_id'] = self._create_test_port_id()
if 'local_ip_address' not in lip_attrs:
lip_attrs['local_ip_address'] = '10.10.10.10'
if 'ip_mode' not in lip_attrs:
lip_attrs['ip_mode'] = 'translate'
lip = local_ip.LocalIP(self.context, **lip_attrs)
lip.create()
return lip
def _create_test_agent_id(self):
attrs = self.get_random_object_fields(obj_cls=agent.Agent)
_agent = agent.Agent(self.context, **attrs)

View File

@ -0,0 +1,65 @@
# Copyright 2021 Huawei, Inc.
# All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from neutron.objects import base as obj_base
from neutron.objects import local_ip
from neutron.tests.unit.objects import test_base as obj_test_base
from neutron.tests.unit import testlib_api
class LocalIPIfaceObjectTestCase(obj_test_base.BaseObjectIfaceTestCase):
_test_class = local_ip.LocalIP
class LocalIPDbObjectTestCase(obj_test_base.BaseDbObjectTestCase,
testlib_api.SqlTestCase):
_test_class = local_ip.LocalIP
def setUp(self):
super(LocalIPDbObjectTestCase, self).setUp()
self.update_obj_fields(
{'local_port_id': lambda: self._create_test_port_id(),
'network_id': lambda: self._create_test_network_id()})
class LocalIPAssociationIfaceObjectTestCase(
obj_test_base.BaseObjectIfaceTestCase):
_test_class = local_ip.LocalIPAssociation
def setUp(self):
super(LocalIPAssociationIfaceObjectTestCase, self).setUp()
mock.patch.object(obj_base.NeutronDbObject,
'load_synthetic_db_fields').start()
class LocalIPAssociationDbObjectTestCase(
obj_test_base.BaseDbObjectTestCase, testlib_api.SqlTestCase):
_test_class = local_ip.LocalIPAssociation
def setUp(self):
super(LocalIPAssociationDbObjectTestCase, self).setUp()
self.update_obj_fields(
{
'local_ip_id':
lambda: self._create_test_local_ip_id(),
'fixed_port_id':
lambda: self._create_test_port_id()
})

View File

@ -59,6 +59,8 @@ object_data = {
'L3HARouterAgentPortBinding': '1.0-d1d7ee13f35d56d7e225def980612ee5',
'L3HARouterNetwork': '1.0-87acea732853f699580179a94d2baf91',
'L3HARouterVRIdAllocation': '1.0-37502aebdbeadc4f9e3bd5e9da714ab9',
'LocalIP': '1.0-85843868a01585c4e2614f2f635159ca',
'LocalIPAssociation': '1.0-4febb23ad22f11a69c431b077b3aac21',
'MeteringLabel': '1.0-cc4b620a3425222447cbe459f62de533',
'MeteringLabelRule': '2.0-0ad09894c62e1ce6e868f725158959ba',
'Log': '1.0-6391351c0f34ed34375a19202f361d24',

View File

@ -85,6 +85,7 @@ neutron.service_plugins =
placement = neutron.services.placement_report.plugin:PlacementReportPlugin
conntrack_helper = neutron.services.conntrack_helper.plugin:Plugin
ovn-router = neutron.services.ovn_l3.plugin:OVNL3RouterPlugin
local_ip = neutron.services.local_ip.local_ip_plugin:LocalIPPlugin
neutron.ml2.type_drivers =
flat = neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver
local = neutron.plugins.ml2.drivers.type_local:LocalTypeDriver