[ovn]: port forwarding -- core changes

This is a subset of the changes for implementing the floating IP
port forwarding feature in neutron, using OVN as the backend.

This changeset covers the core implementation for portforwarding/drivers/ovn,
mech_driver, ovn-router as well as a subset of tests.

Port forwarding support in ovn_db_sync is not included here to facilitate review.
That, as well as all other supporting changes, are under the ovn/port_forwarding topic:
https://review.opendev.org/#/q/topic:ovn/port_forwarding+(status:open+OR+status:merged)

Depends-On: https://review.opendev.org/#/c/726478/
Partially-implements: ovn/port_forwarding
Partial-Bug: #1877447

Change-Id: I019fe11ac1ddcf2304f3f144c62d52667fc11dce
This commit is contained in:
Flavio Fernandes 2020-07-15 14:26:31 -04:00
parent b425ca45dd
commit d74f409c82
21 changed files with 806 additions and 8 deletions

View File

@ -47,6 +47,7 @@ disable_service q-meta
enable_plugin neutron https://opendev.org/openstack/neutron
enable_service q-trunk
enable_service q-dns
enable_service q-port-forwarding
enable_service q-qos
# Enable neutron tempest plugin tests

View File

@ -14,6 +14,9 @@
from neutron_lib.api.definitions import agent as agent_def
from neutron_lib.api.definitions import availability_zone as az_def
from neutron_lib.api.definitions import expose_port_forwarding_in_fip
from neutron_lib.api.definitions import fip_pf_description
from neutron_lib.api.definitions import floating_ip_port_forwarding
from neutron_lib.api.definitions import router_availability_zone as raz_def
from neutron_lib.api.definitions import segment as seg_def
@ -63,4 +66,7 @@ ML2_SUPPORTED_API_EXTENSIONS = [
'trunk',
'quota_details',
seg_def.ALIAS,
expose_port_forwarding_in_fip.ALIAS,
fip_pf_description.ALIAS,
floating_ip_port_forwarding.ALIAS,
]

View File

@ -35,6 +35,7 @@ from oslo_utils import strutils
from ovs.db import idl
from ovsdbapp.backend.ovs_idl import connection
from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp import constants as ovsdbapp_const
from neutron._i18n import _
from neutron.common.ovn import constants
@ -545,3 +546,22 @@ def get_chassis_availability_zones(chassis):
azs = [az.strip() for az in values.split(':') if az.strip()]
break
return azs
def parse_ovn_lb_port_forwarding(ovn_rtr_lb_pfs):
"""Return a dictionary compatible with port forwarding from OVN lb."""
result = {}
for ovn_lb in ovn_rtr_lb_pfs:
ext_ids = ovn_lb.external_ids
fip_id = ext_ids.get(constants.OVN_FIP_EXT_ID_KEY)
protocol = (ovn_lb.protocol[0]
if ovn_lb.protocol else ovsdbapp_const.PROTO_TCP)
fip_dict = result.get(fip_id, {})
fip_dict_proto = fip_dict.get(protocol, set())
ovn_vips = ovn_lb.vips
for vip, ips in ovn_vips.items():
for ip in ips.split(','):
fip_dict_proto.add("{} {}".format(vip, ip))
fip_dict[protocol] = fip_dict_proto
result[fip_id] = fip_dict
return result

View File

@ -20,6 +20,7 @@ from neutron._i18n import _
from neutron.common.ovn import constants as ovn_const
from neutron.common.ovn import exceptions as ovn_exc
from neutron.common.ovn import utils
from neutron.services.portforwarding.constants import PORT_FORWARDING_PREFIX
RESOURCE_TYPE_MAP = {
ovn_const.TYPE_NETWORKS: 'Logical_Switch',
@ -708,7 +709,13 @@ class CheckRevisionNumberCommand(command.BaseCommand):
self.resource_type = resource_type
self.if_exists = if_exists
def _get_floatingip(self):
def _get_floatingip_or_pf(self):
# TYPE_FLOATINGIPS: Determine table to use based on name.
# Floating ip port forwarding resources are kept in load
# balancer table and have a well known name.
if self.name.startswith(PORT_FORWARDING_PREFIX):
return self.api.lookup('Load_Balancer', self.name)
# TODO(lucasagomes): We can't use self.api.lookup() because that
# method does not introspect map type columns. We could either:
# 1. Enhance it to look into maps or, 2. Add a new ``name`` column
@ -747,7 +754,7 @@ class CheckRevisionNumberCommand(command.BaseCommand):
ovn_resource = None
if self.resource_type == ovn_const.TYPE_FLOATINGIPS:
ovn_resource = self._get_floatingip()
ovn_resource = self._get_floatingip_or_pf()
elif self.resource_type == ovn_const.TYPE_SUBNETS:
ovn_resource = self._get_subnet()
else:

View File

@ -157,10 +157,10 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
},
ovn_const.TYPE_FLOATINGIPS: {
'neutron_get': self._ovn_client._l3_plugin.get_floatingip,
'ovn_get': self._nb_idl.get_floatingip,
'ovn_create': self._ovn_client.create_floatingip,
'ovn_update': self._ovn_client.update_floatingip,
'ovn_delete': self._ovn_client.delete_floatingip,
'ovn_get': self._nb_idl.get_floatingip_in_nat_or_lb,
'ovn_create': self._create_floatingip_and_pf,
'ovn_update': self._update_floatingip_and_pf,
'ovn_delete': self._delete_floatingip_and_pf,
},
ovn_const.TYPE_ROUTERS: {
'neutron_get': self._ovn_client._l3_plugin.get_router,
@ -429,6 +429,21 @@ class DBInconsistenciesPeriodics(SchemaAwarePeriodicsBase):
inconsistent_subnets.append(subnet)
return inconsistent_subnets
def _create_floatingip_and_pf(self, context, floatingip):
self._ovn_client.create_floatingip(context, floatingip)
self._ovn_client._l3_plugin.port_forwarding.maintenance_create(
context, floatingip)
def _update_floatingip_and_pf(self, context, floatingip):
self._ovn_client.update_floatingip(context, floatingip)
self._ovn_client._l3_plugin.port_forwarding.maintenance_update(
context, floatingip)
def _delete_floatingip_and_pf(self, context, fip_id):
self._ovn_client._l3_plugin.port_forwarding.maintenance_delete(
context, fip_id)
self._ovn_client.delete_floatingip(context, fip_id)
# A static spacing value is used here, but this method will only run
# once per lock due to the use of periodics.NeverAgain().
@periodics.periodic(spacing=600,

View File

@ -612,6 +612,18 @@ class OVNClient(object):
if not router_id:
return
# FIPs used for port forwarding have no fixed address
# configured. Also, OVN handler for port forwarding
# is delegated to OVNPortForwarding. Nothing further
# to do here.
if floatingip['fixed_ip_address'] is None:
LOG.debug("Skipping NAT for floating ip %(id)s, external ip "
"%(fip_ip)s on router %(rtr_id)s: no logical_ip",
{'id': floatingip['id'],
'fip_ip': floatingip['floating_ip_address'],
'rtr_id': router_id})
return
commands = []
admin_context = n_context.get_admin_context()
fip_db = self._l3_plugin._get_floatingip(

View File

@ -300,10 +300,14 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
LOG.debug('ACL-SYNC: finished @ %s', str(datetime.now()))
def _calculate_fips_differences(self, ovn_fips, db_fips):
def _calculate_fips_differences(self, ovn_fips, ovn_rtr_lb_pfs, db_fips):
to_add = []
to_remove = []
ovn_pfs = utils.parse_ovn_lb_port_forwarding(ovn_rtr_lb_pfs)
for db_fip in db_fips:
# skip fips that are used for port forwarding
if db_fip['id'] in ovn_pfs:
continue
for ovn_fip in ovn_fips:
if (ovn_fip['logical_ip'] == db_fip['fixed_ip_address'] and
ovn_fip['external_ip'] == db_fip['floating_ip_address']):
@ -400,6 +404,8 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
update_snats_list = []
update_fips_list = []
for lrouter in lrouters:
ovn_rtr_lb_pfs = self.ovn_api.get_router_floatingip_lbs(
utils.ovn_name(lrouter['name']))
if lrouter['name'] in db_routers:
for lrport, lrport_nets in lrouter['ports'].items():
if lrport in db_router_ports:
@ -428,7 +434,7 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
ovn_fips = lrouter['dnat_and_snats']
db_fips = db_extends[lrouter['name']]['fips']
add_fips, del_fips = self._calculate_fips_differences(
ovn_fips, db_fips)
ovn_fips, ovn_rtr_lb_pfs, db_fips)
update_fips_list.append({'id': lrouter['name'],
'add': add_fips,
'del': del_fips})

View File

@ -41,6 +41,8 @@ from neutron.db import l3_fip_port_details
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_client
from neutron.scheduler import l3_ovn_scheduler
from neutron.services.portforwarding.drivers.ovn import driver \
as port_forwarding
from neutron.services.ovn_l3 import exceptions as ovn_l3_exc
@ -76,6 +78,7 @@ class OVNL3RouterPlugin(service_base.ServicePluginBase,
self._mech = None
self._ovn_client_inst = None
self.scheduler = l3_ovn_scheduler.get_scheduler()
self.port_forwarding = port_forwarding.OVNPortForwarding(self)
self._register_precommit_callbacks()
def _register_precommit_callbacks(self):

View File

@ -15,5 +15,9 @@
# String literals representing core resources.
# TODO(flaviof): move to neutron_lib/callbacks/resources.py
from ovsdbapp import constants as const
PORT_FORWARDING = 'port_forwarding'
PORT_FORWARDING_PLUGIN = 'port_forwarding_plugin'
PORT_FORWARDING_PREFIX = 'pf-floatingip'
LB_PROTOCOL_MAP = {'udp': const.PROTO_UDP, 'tcp': const.PROTO_TCP}

View File

@ -0,0 +1,278 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from ovsdbapp import constants as ovsdbapp_const
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from neutron.common.ovn import constants as ovn_const
from neutron.db import ovn_revision_numbers_db as db_rev
from neutron import manager
from neutron.objects import port_forwarding as port_forwarding_obj
from neutron.services.portforwarding import constants as pf_const
LOG = log.getLogger(__name__)
class OVNPortForwardingHandler(object):
@staticmethod
def _get_lb_protocol(pf_obj):
return pf_const.LB_PROTOCOL_MAP[pf_obj.protocol]
@staticmethod
def lb_name(fip_id, proto):
return "{}-{}-{}".format(
pf_const.PORT_FORWARDING_PREFIX, fip_id, proto)
@classmethod
def lb_names(cls, fip_id):
return [cls.lb_name(fip_id, proto)
for proto in pf_const.LB_PROTOCOL_MAP.values()]
@classmethod
def _get_lb_attributes(cls, pf_obj):
lb_name = cls.lb_name(pf_obj.floatingip_id,
cls._get_lb_protocol(pf_obj))
vip = "{}:{}".format(pf_obj.floating_ip_address, pf_obj.external_port)
internal_ip = "{}:{}".format(pf_obj.internal_ip_address,
pf_obj.internal_port)
rtr_name = 'neutron-{}'.format(pf_obj.router_id)
return lb_name, vip, [internal_ip], rtr_name
def _port_forwarding_created(self, ovn_txn, nb_ovn, pf_obj):
# Add vip to its corresponding load balancer. There can be multiple
# vips, so load balancer may already be present.
lb_name, vip, internal_ips, rtr_name = self._get_lb_attributes(pf_obj)
external_ids = {
ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY:
pf_const.PORT_FORWARDING_PLUGIN,
ovn_const.OVN_FIP_EXT_ID_KEY: pf_obj.floatingip_id,
ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY: rtr_name,
}
ovn_txn.add(
nb_ovn.lb_add(lb_name, vip, internal_ips,
self._get_lb_protocol(pf_obj), may_exist=True,
external_ids=external_ids))
# Ensure logical router has load balancer configured.
ovn_txn.add(nb_ovn.lr_lb_add(rtr_name, lb_name, may_exist=True))
def port_forwarding_created(self, ovn_txn, nb_ovn, pf_obj):
LOG.info("CREATE for port-forwarding %s vip %s:%s to %s:%s",
pf_obj.protocol,
pf_obj.floating_ip_address, pf_obj.external_port,
pf_obj.internal_ip_address, pf_obj.internal_port)
self._port_forwarding_created(ovn_txn, nb_ovn, pf_obj)
def port_forwarding_updated(self, ovn_txn, nb_ovn, pf_obj, orig_pf_obj):
LOG.info("UPDATE for port-forwarding %s vip %s:%s to %s:%s",
pf_obj.protocol,
pf_obj.floating_ip_address, pf_obj.external_port,
pf_obj.internal_ip_address, pf_obj.internal_port)
self._port_forwarding_deleted(ovn_txn, nb_ovn, orig_pf_obj)
self._port_forwarding_created(ovn_txn, nb_ovn, pf_obj)
def _port_forwarding_deleted(self, ovn_txn, nb_ovn, pf_obj):
# NOTE: load balancer instance is expected to be removed by api once
# last vip is removed.
# Since router has weak ref to the lb, that gets taken care
# automatically, but that it is not best practice to rely on
# that. Unfortunately, we would need to add extra logic that
# ensures that the lr_lb_del is invoked only after the last
# vip was removed. So...
# TODO(flaviof): see about enhancing lb_del so that removal of lb
# can optionally take a logical router, which explicitly dissociates
# router from removed lb.
lb_name, vip, _internal_ips, _rtr = self._get_lb_attributes(pf_obj)
ovn_txn.add(nb_ovn.lb_del(lb_name, vip, if_exists=True))
def port_forwarding_deleted(self, ovn_txn, nb_ovn, pf_obj):
LOG.info("DELETE for port-forwarding %s vip %s:%s to %s:%s",
pf_obj.protocol,
pf_obj.floating_ip_address, pf_obj.external_port,
pf_obj.internal_ip_address, pf_obj.internal_port)
self._port_forwarding_deleted(ovn_txn, nb_ovn, pf_obj)
@registry.has_registry_receivers
class OVNPortForwarding(object):
def __init__(self, l3_plugin):
self._l3_plugin = l3_plugin
self._pf_plugin_property = None
self._handler = OVNPortForwardingHandler()
@property
def _pf_plugin(self):
if self._pf_plugin_property is None:
self._pf_plugin_property = directory.get_plugin(
plugin_constants.PORTFORWARDING)
if not self._pf_plugin_property:
self._pf_plugin_property = (
manager.NeutronManager.load_class_for_provider(
'neutron.service_plugins', 'port_forwarding')())
return self._pf_plugin_property
def _get_pf_objs(self, context, fip_id):
pf_dicts = self._pf_plugin.get_floatingip_port_forwardings(
context, fip_id)
return [port_forwarding_obj.PortForwarding(context=context, **pf_dict)
for pf_dict in pf_dicts]
def _get_fip_objs(self, context, payload):
floatingip_ids = set()
# Note on floatingip_id from payload: depending on the event that
# generated the payload provided, expect pf_payload.current_pf (in
# DELETE events) or pf_payload.original_pf (CREATE events) to be None.
# To be agnostic of what event this is, simply build a set from both.
for pf_payload in payload:
if pf_payload.current_pf:
floatingip_ids.add(pf_payload.current_pf.floatingip_id)
if pf_payload.original_pf:
floatingip_ids.add(pf_payload.original_pf.floatingip_id)
return {fip_id: self._l3_plugin.get_floatingip(context, fip_id)
for fip_id in floatingip_ids}
def _add_check_rev(self, ovn_txn, ovn_nb, fip_id, fip_obj):
"""Updating revision number of OVN lb entries based on floatingip id
A single floating ip maps to 1 or 2 OVN load balancer entries,
because while multiple vips can exist in a single OVN LB row,
they represent one protocol. So, to handle all port forwardings
for a given floating ip, OVN will have up to two LB entries: one
for udp and one for tcp. These 2 LB entries are expected to have
the same revision number, in sync with the revision of the floating
ip. And that is set via this function.
"""
check_rev_tuples = []
for lb_name in self._handler.lb_names(fip_id):
check_rev_cmd = ovn_nb.check_revision_number(lb_name, fip_obj,
ovn_const.TYPE_FLOATINGIPS, if_exists=True)
ovn_txn.add(check_rev_cmd)
check_rev_tuples.append((check_rev_cmd, fip_obj))
return check_rev_tuples
def _do_db_rev_bump_revision(self, context, check_rev_tuples):
if not all(check_rev_cmd.result == ovn_const.TXN_COMMITTED
for check_rev_cmd, _fip_obj in check_rev_tuples):
return
for _check_rev_cmd, fip_obj in check_rev_tuples:
db_rev.bump_revision(context, fip_obj, ovn_const.TYPE_FLOATINGIPS)
def _handle_notification(self, _resource, event_type, _pf_plugin, payload):
if not payload:
return
context = payload[0].context
ovn_nb = self._l3_plugin._ovn
with ovn_nb.transaction(check_error=True) as ovn_txn:
if event_type == events.AFTER_CREATE:
for pf_payload in payload:
self._handler.port_forwarding_created(ovn_txn, ovn_nb,
pf_payload.current_pf)
elif event_type == events.AFTER_UPDATE:
for pf_payload in payload:
self._handler.port_forwarding_updated(ovn_txn, ovn_nb,
pf_payload.current_pf, pf_payload.original_pf)
elif event_type == events.AFTER_DELETE:
for pf_payload in payload:
self._handler.port_forwarding_deleted(ovn_txn, ovn_nb,
pf_payload.original_pf)
# Collect the revision numbers of all floating ips visited and
# update the corresponding load balancer entries affected.
# Note that there may be 2 entries for a given floatingip_id;
# one for each protocol.
fip_objs = self._get_fip_objs(context, payload)
if not fip_objs:
return
for floatingip_id, fip_obj in fip_objs.items():
check_rev_tuples = self._add_check_rev(
ovn_txn, ovn_nb, floatingip_id, fip_obj)
# Update revision of affected floating ips. Note that even in
# cases where port forwarding is deleted, floating ip remains.
self._do_db_rev_bump_revision(context, check_rev_tuples)
def _maintenance_create_update(self, context, fip_id):
# NOTE: Since the maintenance callback is not granular to the level
# of the affected pfs AND the fact that pfs are all vips
# in a load balancer entry, it is cheap enough to simply rebuild.
pf_objs = self._get_pf_objs(context, fip_id)
LOG.debug("Maintenance port forwarding under fip %s : %s",
fip_id, pf_objs)
ovn_nb = self._l3_plugin._ovn
with ovn_nb.transaction(check_error=True) as ovn_txn:
for lb_name in self._handler.lb_names(fip_id):
ovn_txn.add(ovn_nb.lb_del(lb_name, vip=None, if_exists=True))
for pf_obj in pf_objs:
self._handler.port_forwarding_created(
ovn_txn, ovn_nb, pf_obj)
fip_obj = self._l3_plugin.get_floatingip(context, fip_id)
check_rev_tuples = self._add_check_rev(
ovn_txn, ovn_nb, fip_id, fip_obj)
self._do_db_rev_bump_revision(context, check_rev_tuples)
def maintenance_create(self, context, floatingip):
fip_id = floatingip['id']
LOG.info("Maintenance CREATE port-forwarding entries under fip %s",
fip_id)
self._maintenance_create_update(context, fip_id)
def maintenance_update(self, context, floatingip):
fip_id = floatingip['id']
LOG.info("Maintenance UPDATE port-forwarding entries under fip %s",
fip_id)
self._maintenance_create_update(context, fip_id)
def maintenance_delete(self, _context, fip_id):
LOG.info("Maintenance DELETE port-forwarding entries under fip %s",
fip_id)
ovn_nb = self._l3_plugin._ovn
with ovn_nb.transaction(check_error=True) as ovn_txn:
for lb_name in self._handler.lb_names(fip_id):
ovn_txn.add(ovn_nb.lb_del(lb_name, vip=None, if_exists=True))
def db_sync_create_or_update(self, context, fip_id, ovn_txn):
LOG.info("db_sync UPDATE entries under fip %s", fip_id)
# NOTE: Since the db_sync callback is not granular to the level
# of the affected pfs AND the fact that pfs are all vips
# in a load balancer entry, it is cheap enough to simply rebuild.
ovn_nb = self._l3_plugin._ovn
pf_objs = self._get_pf_objs(context, fip_id)
LOG.debug("Db sync port forwarding under fip %s : %s", fip_id, pf_objs)
for lb_name in self._handler.lb_names(fip_id):
ovn_txn.add(ovn_nb.lb_del(lb_name, vip=None, if_exists=True))
for pf_obj in pf_objs:
self._handler.port_forwarding_created(ovn_txn, ovn_nb, pf_obj)
fip_obj = self._l3_plugin.get_floatingip(context, fip_id)
self._add_check_rev(ovn_txn, ovn_nb, fip_id, fip_obj)
def db_sync_delete(self, context, fip_id, ovn_txn):
LOG.info("db_sync DELETE entries under fip %s", fip_id)
ovn_nb = self._l3_plugin._ovn
for lb_name in self._handler.lb_names(fip_id):
ovn_txn.add(ovn_nb.lb_del(lb_name, vip=None, if_exists=True))
@staticmethod
def ovn_lb_protocol(pf_protocol):
return pf_const.LB_PROTOCOL_MAP.get(
pf_protocol, ovsdbapp_const.PROTO_TCP)
@registry.receives(pf_const.PORT_FORWARDING_PLUGIN, [events.AFTER_INIT])
def register(self, resource, event, trigger, payload=None):
for event_type in (events.AFTER_CREATE, events.AFTER_UPDATE,
events.AFTER_DELETE):
registry.subscribe(self._handle_notification,
pf_const.PORT_FORWARDING, event_type)

View File

@ -40,6 +40,7 @@ from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
# Load all the models to register them into SQLAlchemy metadata before using
# the SqlFixture
from neutron.db import models # noqa
from neutron import manager
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import worker
from neutron.plugins.ml2.drivers import type_geneve # noqa
from neutron.tests import base
@ -183,6 +184,10 @@ class TestOVNFunctionalBase(test_plugin.Ml2PluginV2TestCase,
self.mech_driver = mm.mech_drivers['ovn'].obj
self.l3_plugin = directory.get_plugin(constants.L3)
self.segments_plugin = directory.get_plugin('segments')
# OVN does not use RPC: disable it for port-forwarding tests
self.pf_plugin = manager.NeutronManager.load_class_for_provider(
'neutron.service_plugins', 'port_forwarding')()
self.pf_plugin._rpc_notifications_required = False
self.ovsdb_server_mgr = None
self.ovn_northd_mgr = None
self.maintenance_worker = maintenance_worker

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections import namedtuple
from unittest import mock
import fixtures
@ -95,6 +96,51 @@ class TestUtils(base.BaseTestCase):
self.assertFalse(utils.is_security_groups_enabled(
{}))
def test_parse_ovn_lb_port_forwarding(self):
TC = namedtuple('TC', 'input output description')
fake_ovn_lb = namedtuple('fake_ovn_lb', 'external_ids protocol vips')
test_cases = [
TC([], {}, "empty"),
TC([{'external_ids': {'neutron:fip_id': 'fip1'},
'protocol': None,
'vips': {'172.24.4.8:2020': '10.0.0.10:22'}}],
{'fip1': {'tcp': {'172.24.4.8:2020 10.0.0.10:22'}}},
"simple"),
TC([{'external_ids': {'neutron:fip_id': 'fip1'},
'protocol': [],
'vips': {'172.24.4.8:2020': '10.0.0.10:22',
'172.24.4.8:2021': '10.0.0.11:22',
'172.24.4.8:8080': '10.0.0.10:80'}}],
{'fip1': {'tcp': {'172.24.4.8:8080 10.0.0.10:80',
'172.24.4.8:2021 10.0.0.11:22',
'172.24.4.8:2020 10.0.0.10:22'}}},
"multiple vips"),
TC([{'external_ids': {'neutron:fip_id': 'fip1'},
'protocol': ['tcp'],
'vips': {'ext_ip:ext_port1': 'int_ip1:int_port1'}},
{'external_ids': {'neutron:fip_id': 'fip1'},
'protocol': ['udp'],
'vips': {'ext_ip:ext_port1': 'int_ip1:int_port1'}}],
{'fip1': {'tcp': {'ext_ip:ext_port1 int_ip1:int_port1'},
'udp': {'ext_ip:ext_port1 int_ip1:int_port1'}}},
"2 protocols"),
TC([{'external_ids': {'neutron:fip_id': 'fip1'},
'protocol': ['tcp'],
'vips': {'ext_ip:ext_port1': 'int_ip1:int_port1'}},
{'external_ids': {'neutron:fip_id': 'fip2'},
'protocol': ['tcp'],
'vips': {'ext_ip:ext_port1': 'int_ip1:int_port1'}}],
{'fip1': {'tcp': {'ext_ip:ext_port1 int_ip1:int_port1'}},
'fip2': {'tcp': {'ext_ip:ext_port1 int_ip1:int_port1'}}},
"2 fips"),
]
for tc in test_cases:
tc_lbs = [
fake_ovn_lb(lb['external_ids'], lb['protocol'], lb['vips'])
for lb in tc.input]
rc = utils.parse_ovn_lb_port_forwarding(tc_lbs)
self.assertEqual(rc, tc.output, tc.description)
class TestGateWayChassisValidity(base.BaseTestCase):

View File

@ -113,6 +113,8 @@ class FakeOvsdbNbOvnIdl(object):
# remove it in the Rocky release.
self.get_floatingip_by_ips = mock.Mock()
self.get_floatingip_by_ips.return_value = None
self.get_router_floatingip_lbs = mock.Mock()
self.get_router_floatingip_lbs.return_value = []
self.is_col_present = mock.Mock()
self.is_col_present.return_value = False
self.get_lrouter = mock.Mock()
@ -137,6 +139,10 @@ class FakeOvsdbNbOvnIdl(object):
self.db_remove = mock.Mock()
self.set_lswitch_port_to_virtual_type = mock.Mock()
self.unset_lswitch_port_to_virtual_type = mock.Mock()
self.update_lb_external_ids = mock.Mock()
self.lb_add = mock.Mock()
self.lb_del = mock.Mock()
self.lr_lb_add = mock.Mock()
self.ls_get = mock.Mock()
self.check_liveness = mock.Mock()
self.ha_chassis_group_get = mock.Mock()

View File

@ -0,0 +1,381 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from unittest import mock
from neutron.common.ovn import constants as ovn_const
from neutron.services.portforwarding.constants import PORT_FORWARDING
from neutron.services.portforwarding.constants import PORT_FORWARDING_PLUGIN
from neutron.services.portforwarding.drivers.ovn import driver \
as port_forwarding
from neutron.tests import base
from neutron.tests.unit import fake_resources
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.plugins import constants as plugin_constants
from oslo_utils import uuidutils
from ovsdbapp import constants as ovsdbapp_const
class TestOVNPortForwardingBase(base.BaseTestCase):
def setUp(self):
super(TestOVNPortForwardingBase, self).setUp()
self.context = mock.Mock()
self.l3_plugin = mock.Mock()
self.l3_plugin._ovn = fake_resources.FakeOvsdbNbOvnIdl()
self.txn = self.l3_plugin._ovn.transaction
def _fake_pf_obj(self, **kwargs):
pf_obj_defaults_dict = {
'floatingip_id': 'fip_id',
'protocol': 'udp',
'floating_ip_address': 'fip_addr',
'external_port': 'ext_port',
'internal_ip_address': 'internal_addr',
'internal_port': 'internal_port',
'router_id': 'rtr_id'
}
pf_obj_dict = {**pf_obj_defaults_dict, **kwargs}
return mock.Mock(**pf_obj_dict)
def _fake_pf_payload_entry(self, curr_pf_id, orig_pf_id=None, **kwargs):
mock_pf_payload = mock.Mock()
fake_pf_obj = self._fake_pf_obj(**kwargs)
if 'context' not in kwargs:
mock_pf_payload.context = self.context
if curr_pf_id:
mock_pf_payload.current_pf = fake_pf_obj
mock_pf_payload.current_pf.floatingip_id = curr_pf_id
else:
mock_pf_payload.current_pf = None
if orig_pf_id:
mock_pf_payload.original_pf = fake_pf_obj
mock_pf_payload.original_pf.floatingip_id = orig_pf_id
else:
mock_pf_payload.original_pf = None
return mock_pf_payload
class TestOVNPortForwardingHandler(TestOVNPortForwardingBase):
def setUp(self):
super(TestOVNPortForwardingHandler, self).setUp()
self.handler = port_forwarding.OVNPortForwardingHandler()
def test_get_lb_protocol(self):
fake_pf_obj = self._fake_pf_obj(protocol='udp')
self.assertEqual(ovsdbapp_const.PROTO_UDP,
self.handler._get_lb_protocol(fake_pf_obj))
fake_pf_obj = self._fake_pf_obj(protocol='tcp')
self.assertEqual(ovsdbapp_const.PROTO_TCP,
self.handler._get_lb_protocol(fake_pf_obj))
fake_pf_obj = self._fake_pf_obj(protocol='xxx')
self.assertRaises(KeyError, self.handler._get_lb_protocol,
fake_pf_obj)
def test_lb_names(self):
expected_names = ['pf-floatingip-id-udp', 'pf-floatingip-id-tcp']
names = self.handler.lb_names('id')
self.assertEqual(names, expected_names)
def test_get_lb_attributes(self):
fake_pf_obj = self._fake_pf_obj()
lb_name, vip, internal_ip, rtr_name = self.handler._get_lb_attributes(
fake_pf_obj)
self.assertEqual(lb_name, 'pf-floatingip-fip_id-udp')
self.assertEqual(vip, 'fip_addr:ext_port')
self.assertEqual(internal_ip, ['internal_addr:internal_port'])
self.assertEqual(rtr_name, 'neutron-rtr_id')
@mock.patch.object(port_forwarding.LOG, 'info')
def test_port_forwarding_created(self, m_info):
fake_pf_obj = self._fake_pf_obj()
exp_lb_name, exp_vip, exp_internal_ips, exp_rtr_name = (self.handler.
_get_lb_attributes(fake_pf_obj))
exp_protocol = self.handler._get_lb_protocol(fake_pf_obj)
self.handler.port_forwarding_created(
self.txn, self.l3_plugin._ovn, fake_pf_obj)
info_args, _info_kwargs = m_info.call_args_list[0]
self.assertIn('CREATE for port-forwarding', info_args[0])
self.assertEqual(2, len(self.txn.add.call_args_list))
exp_external_ids = {
ovn_const.OVN_DEVICE_OWNER_EXT_ID_KEY: PORT_FORWARDING_PLUGIN,
ovn_const.OVN_FIP_EXT_ID_KEY: fake_pf_obj.floatingip_id,
ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY: exp_rtr_name,
}
self.l3_plugin._ovn.lb_add.assert_called_once_with(
exp_lb_name, exp_vip, exp_internal_ips, exp_protocol,
may_exist=True, external_ids=exp_external_ids)
self.l3_plugin._ovn.lr_lb_add.assert_called_once_with(
exp_rtr_name, exp_lb_name, may_exist=True)
@mock.patch.object(port_forwarding.LOG, 'info')
@mock.patch.object(
port_forwarding.OVNPortForwardingHandler, '_port_forwarding_deleted')
@mock.patch.object(
port_forwarding.OVNPortForwardingHandler, '_port_forwarding_created')
def test_port_forwarding_updated(self, m_created, m_deleted, m_info):
fake_pf_obj = self._fake_pf_obj(protocol='udp')
fake_orig_pf_obj = self._fake_pf_obj(protocol='tcp')
self.handler.port_forwarding_updated(
self.txn, self.l3_plugin._ovn, fake_pf_obj, fake_orig_pf_obj)
info_args, _info_kwargs = m_info.call_args_list[0]
self.assertIn('UPDATE for port-forwarding', info_args[0])
m_deleted.assert_called_once_with(self.txn, self.l3_plugin._ovn,
fake_orig_pf_obj)
m_created.assert_called_once_with(self.txn, self.l3_plugin._ovn,
fake_pf_obj)
@mock.patch.object(port_forwarding.LOG, 'info')
def test_port_forwarding_deleted(self, m_info):
fake_pf_obj = self._fake_pf_obj()
exp_lb_name, exp_vip, _, _ = self.handler._get_lb_attributes(
fake_pf_obj)
self.handler.port_forwarding_deleted(
self.txn, self.l3_plugin._ovn, fake_pf_obj)
info_args, _info_kwargs = m_info.call_args_list[0]
self.assertIn('DELETE for port-forwarding', info_args[0])
self.assertEqual(1, len(self.txn.add.call_args_list))
self.l3_plugin._ovn.lb_del.assert_called_once_with(
exp_lb_name, exp_vip, if_exists=mock.ANY)
class TestOVNPortForwarding(TestOVNPortForwardingBase):
def setUp(self):
super(TestOVNPortForwarding, self).setUp()
self.pf_plugin = mock.Mock()
self.handler = mock.Mock()
get_mock_pf_plugin = lambda alias: self.pf_plugin if (
alias == plugin_constants.PORTFORWARDING) else None
self.fake_get_dir_object = mock.patch(
"neutron_lib.plugins.directory.get_plugin",
side_effect=get_mock_pf_plugin).start()
self.fake_handler_object = mock.patch(
"neutron.services.portforwarding.drivers.ovn.driver."
"OVNPortForwardingHandler",
return_value=self.handler).start()
self._ovn_pf = port_forwarding.OVNPortForwarding(self.l3_plugin)
self.mock_pf_revs = mock.Mock()
self.fake_check_rev = mock.patch.object(
self._ovn_pf, '_add_check_rev',
return_value=self.mock_pf_revs).start()
self.fake_db_rev = mock.patch.object(
self._ovn_pf, '_do_db_rev_bump_revision').start()
def test_init(self):
self.assertIsNotNone(self._ovn_pf)
self.assertEqual(self._ovn_pf._l3_plugin, self.l3_plugin)
self.assertEqual(self._ovn_pf._handler, self.handler)
self.assertEqual(self._ovn_pf._pf_plugin, self.pf_plugin)
def test_register(self):
with mock.patch.object(registry, 'subscribe') as mock_subscribe:
self._ovn_pf.register(mock.ANY, mock.ANY, mock.Mock())
calls = [mock.call.mock_subscribe(mock.ANY,
PORT_FORWARDING,
events.AFTER_CREATE),
mock.call.mock_subscribe(mock.ANY,
PORT_FORWARDING,
events.AFTER_UPDATE),
mock.call.mock_subscribe(mock.ANY,
PORT_FORWARDING,
events.AFTER_DELETE)]
mock_subscribe.assert_has_calls(calls)
def test_get_pf_objs(self):
_uuid = uuidutils.generate_uuid
fip_id = _uuid()
fake_pf_dicts = [{'id': _uuid(),
'floatingip_id': fip_id,
'external_port': pf_port,
'protocol': 'tcp',
'internal_port_id': _uuid(),
'internal_ip_address': '1.1.1.2',
'internal_port': pf_port,
'floating_ip_address': '111.111.111.111',
'router_id': _uuid()} for pf_port in range(22, 32)]
self.pf_plugin.get_floatingip_port_forwardings = mock.Mock(
return_value=fake_pf_dicts)
pf_objs = self._ovn_pf._get_pf_objs(self.context, fip_id)
self.pf_plugin.get_floatingip_port_forwardings.assert_called_once_with(
self.context, fip_id)
for index, fake_pf_dict in enumerate(fake_pf_dicts):
self.assertEqual(pf_objs[index].id, fake_pf_dict['id'])
self.assertEqual(pf_objs[index].floatingip_id,
fake_pf_dict['floatingip_id'])
self.assertEqual(pf_objs[index].external_port,
fake_pf_dict['external_port'])
self.assertEqual(pf_objs[index].internal_port_id,
fake_pf_dict['internal_port_id'])
self.assertEqual(pf_objs[index].router_id,
fake_pf_dict['router_id'])
def test_get_fip_objs(self):
pf_payload = [self._fake_pf_payload_entry(1),
self._fake_pf_payload_entry(2),
self._fake_pf_payload_entry(None, 1),
self._fake_pf_payload_entry(1, 3)]
self.l3_plugin.get_floatingip = lambda _, fip_id: fip_id * 10
fip_objs = self._ovn_pf._get_fip_objs(self.context, pf_payload)
self.assertEqual(fip_objs, {3: 30, 2: 20, 1: 10})
def _handle_notification_common(self, event_type, payload=None,
fip_objs=None):
if not payload:
payload = []
if not fip_objs:
fip_objs = {}
with mock.patch.object(self._ovn_pf, '_get_fip_objs',
return_value=fip_objs) as mock_get_fip_objs:
self._ovn_pf._handle_notification(None, event_type, None, payload)
self.assertTrue(self.fake_db_rev.called or not fip_objs)
if not payload:
return
mock_get_fip_objs.assert_called_once_with(self.context, payload)
if fip_objs:
calls = [
mock.call(mock.ANY, self.l3_plugin._ovn, fip_id, fip_obj)
for fip_id, fip_obj in fip_objs.items()]
self.fake_check_rev.assert_has_calls(calls)
self.fake_db_rev.assert_called_once_with(
self.context, self.mock_pf_revs)
def test_handle_notification_noop(self):
self._handle_notification_common(events.AFTER_CREATE)
weird_event_type = 666
fake_payload = [self._fake_pf_payload_entry(None)]
self._handle_notification_common(weird_event_type, fake_payload)
def test_handle_notification_basic(self):
fake_payload_entry = self._fake_pf_payload_entry(1)
self._handle_notification_common(events.AFTER_CREATE,
[fake_payload_entry])
self.handler.port_forwarding_created.assert_called_once_with(
mock.ANY, self.l3_plugin._ovn, fake_payload_entry.current_pf)
def test_handle_notification_create(self):
fip_objs = {1: {'description': 'one'},
3: {'description': 'three', 'revision_number': '321'}}
fake_payload = [self._fake_pf_payload_entry(id) for id in range(1, 4)]
self._handle_notification_common(events.AFTER_CREATE, fake_payload,
fip_objs)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn, entry.current_pf)
for entry in fake_payload]
self.handler.port_forwarding_created.assert_has_calls(calls)
def test_handle_notification_update(self):
fip_objs = {100: {'description': 'hundred'}, 101: {}}
fake_payload = [self._fake_pf_payload_entry(100, 100),
self._fake_pf_payload_entry(101, 101)]
self._handle_notification_common(events.AFTER_UPDATE, fake_payload,
fip_objs)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn, entry.current_pf,
entry.original_pf) for entry in fake_payload]
self.handler.port_forwarding_updated.assert_has_calls(calls)
def test_handle_notification_delete(self):
fip_objs = {1: {'description': 'one'},
2: {'description': 'two', 'revision_number': '222'}}
fake_payload = [self._fake_pf_payload_entry(None, id)
for id in range(1, 4)]
self._handle_notification_common(events.AFTER_DELETE, fake_payload,
fip_objs)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn, entry.original_pf)
for entry in fake_payload]
self.handler.port_forwarding_deleted.assert_has_calls(calls)
def test_maintenance_create_or_update(self):
pf_objs = [self._fake_pf_obj()]
fip_id = pf_objs[0].floatingip_id
fake_fip_obj = {'floatingip_id': fip_id}
fake_lb_names = ['lb1', 'lb2']
self.handler.lb_names = mock.Mock(return_value=fake_lb_names)
self.handler.port_forwarding_created = mock.Mock()
self.l3_plugin.get_floatingip = mock.Mock(return_value=fake_fip_obj)
with mock.patch.object(self._ovn_pf, '_get_pf_objs',
return_value=pf_objs) as mock_get_pf_objs:
self._ovn_pf._maintenance_create_update(self.context, fip_id)
self.l3_plugin._ovn.transaction.assert_called_once_with(
check_error=True)
calls = [mock.call(lb_name, vip=None, if_exists=True)
for lb_name in fake_lb_names]
self.l3_plugin._ovn.lb_del.assert_has_calls(calls)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn, pf_obj)
for pf_obj in pf_objs]
self.handler.port_forwarding_created.assert_has_calls(calls)
mock_get_pf_objs.assert_called_once_with(self.context, fip_id)
self.l3_plugin.get_floatingip.assert_called_once_with(
self.context, fip_id)
self.fake_db_rev.assert_called_once_with(
self.context, self.mock_pf_revs)
def test_maintenance_delete(self):
pf_objs = [self._fake_pf_obj()]
fip_id = pf_objs[0].floatingip_id
fake_fip_obj = {'floatingip_id': fip_id}
fake_lb_names = ['lb1', 'lb2']
self.handler.lb_names = mock.Mock(return_value=fake_lb_names)
self.handler.port_forwarding_created = mock.Mock()
self.l3_plugin.get_floatingip = mock.Mock(return_value=fake_fip_obj)
with mock.patch.object(self._ovn_pf, '_get_pf_objs',
return_value=pf_objs) as mock_get_pf_objs:
self._ovn_pf.maintenance_delete(self.context, fip_id)
self.l3_plugin._ovn.transaction.assert_called_once_with(
check_error=True)
calls = [mock.call(lb_name, vip=None, if_exists=True)
for lb_name in fake_lb_names]
self.l3_plugin._ovn.lb_del.assert_has_calls(calls)
self.handler.port_forwarding_created.assert_not_called()
mock_get_pf_objs.assert_not_called()
self.l3_plugin.get_floatingip.assert_not_called()
self.fake_db_rev.assert_not_called()
@mock.patch.object(port_forwarding.LOG, 'info')
def test_db_sync_create_or_update(self, m_info):
pf_objs = [self._fake_pf_obj()]
fip_id = pf_objs[0].floatingip_id
fake_fip_obj = {'floatingip_id': fip_id, 'revision_number': 123456789}
fake_lb_names = ['lb1', 'lb2']
self.handler.lb_names = mock.Mock(return_value=fake_lb_names)
self.handler.port_forwarding_created = mock.Mock()
self.l3_plugin.get_floatingip = mock.Mock(return_value=fake_fip_obj)
with mock.patch.object(self._ovn_pf, '_get_pf_objs',
return_value=pf_objs) as mock_get_pf_objs:
self._ovn_pf.db_sync_create_or_update(
self.context, fip_id, self.txn)
info_args, _info_kwargs = m_info.call_args_list[0]
self.assertIn('db_sync UPDATE entries', info_args[0])
mock_get_pf_objs.assert_called_once_with(self.context, fip_id)
calls = [mock.call(lb_name, vip=None, if_exists=True)
for lb_name in fake_lb_names]
self.l3_plugin._ovn.lb_del.assert_has_calls(calls)
calls = [mock.call(mock.ANY, self.l3_plugin._ovn, pf_obj)
for pf_obj in pf_objs]
self.handler.port_forwarding_created.assert_has_calls(calls)
self.l3_plugin.get_floatingip.assert_called_once_with(
self.context, fip_id)
self.fake_check_rev.assert_called_once_with(
self.txn, self.l3_plugin._ovn, fip_id, fake_fip_obj)
@mock.patch.object(port_forwarding.LOG, 'info')
def test_db_sync_delete(self, m_info):
fip_id = 'fip_id'
fake_lb_names = ['lb1', 'lb2', 'lb3', 'lb4', 'lb5']
self.handler.lb_names = mock.Mock(return_value=fake_lb_names)
self._ovn_pf.db_sync_delete(self.context, fip_id, self.txn)
info_args, _info_kwargs = m_info.call_args_list[0]
self.assertIn('db_sync DELETE entries', info_args[0])
calls = [mock.call(lb_name, vip=None, if_exists=True)
for lb_name in fake_lb_names]
self.l3_plugin._ovn.lb_del.assert_has_calls(calls)

View File

@ -0,0 +1,6 @@
---
prelude: >
Added support for floating IPs port forwarding in OVN.
features:
- |
Support for floating IPs port forwarding has been added to OVN backend.

View File

@ -172,6 +172,7 @@
tls-proxy: true
q-qos: true
neutron-segments: True
q-port-forwarding: true
group-vars:
subnode:
devstack_services:

View File

@ -282,6 +282,7 @@
s-proxy: false
tls-proxy: true
q-qos: true
q-port-forwarding: true
- job:
name: neutron-ovn-tempest-ovs-master