Merge "[ovn]: port forwarding -- feature support under ovn_db_sync"
This commit is contained in:
commit
239f7b500a
@ -191,7 +191,9 @@ def main():
|
||||
cfg.CONF.set_override('mechanism_drivers', ['ovn-sync'], 'ml2')
|
||||
conf.service_plugins = [
|
||||
'neutron.services.ovn_l3.plugin.OVNL3RouterPlugin',
|
||||
'neutron.services.segments.plugin.Plugin']
|
||||
'neutron.services.segments.plugin.Plugin',
|
||||
'port_forwarding',
|
||||
]
|
||||
else:
|
||||
LOG.error('Invalid core plugin : ["%s"].', cfg.CONF.core_plugin)
|
||||
return
|
||||
|
@ -71,6 +71,11 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
|
||||
core_plugin, ovn_api, ovn_driver)
|
||||
self.mode = mode
|
||||
self.l3_plugin = directory.get_plugin(plugin_constants.L3)
|
||||
self.pf_plugin = directory.get_plugin(plugin_constants.PORTFORWARDING)
|
||||
if not self.pf_plugin:
|
||||
self.pf_plugin = (
|
||||
manager.NeutronManager.load_class_for_provider(
|
||||
'neutron.service_plugins', 'port_forwarding')())
|
||||
self._ovn_client = ovn_client.OVNClient(ovn_api, sb_ovn)
|
||||
self.segments_plugin = directory.get_plugin('segments')
|
||||
if not self.segments_plugin:
|
||||
@ -325,6 +330,62 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
|
||||
|
||||
return to_add, to_remove
|
||||
|
||||
def _calculate_fip_pfs_differences(self, ovn_rtr_lb_pfs, db_pfs):
|
||||
to_add_or_update = set()
|
||||
to_remove = []
|
||||
ovn_pfs = utils.parse_ovn_lb_port_forwarding(ovn_rtr_lb_pfs)
|
||||
|
||||
# check that all pfs are accounted for in ovn_pfs by building
|
||||
# a set for each protocol and then comparing it with ovn_pfs
|
||||
db_mapped_pfs = {}
|
||||
for db_pf in db_pfs:
|
||||
fip_id = db_pf.get('floatingip_id')
|
||||
protocol = self.l3_plugin.port_forwarding.ovn_lb_protocol(
|
||||
db_pf.get('protocol'))
|
||||
db_vip = "{}:{} {}:{}".format(
|
||||
db_pf.get('floating_ip_address'), db_pf.get('external_port'),
|
||||
db_pf.get('internal_ip_address'), db_pf.get('internal_port'))
|
||||
|
||||
fip_dict = db_mapped_pfs.get(fip_id, {})
|
||||
fip_dict_proto = fip_dict.get(protocol, set())
|
||||
fip_dict_proto.add(db_vip)
|
||||
if protocol not in fip_dict:
|
||||
fip_dict[protocol] = fip_dict_proto
|
||||
if fip_id not in db_mapped_pfs:
|
||||
db_mapped_pfs[fip_id] = fip_dict
|
||||
for fip_id in db_mapped_pfs:
|
||||
ovn_pfs_fip_id = ovn_pfs.get(fip_id, {})
|
||||
# check for cases when ovn has lbs for protocols that are not in
|
||||
# neutron db
|
||||
if len(db_mapped_pfs[fip_id]) != len(ovn_pfs_fip_id):
|
||||
to_add_or_update.add(fip_id)
|
||||
continue
|
||||
# check that vips in each protocol are an exact match
|
||||
for protocol in db_mapped_pfs[fip_id]:
|
||||
ovn_fip_dict_proto = ovn_pfs_fip_id.get(protocol)
|
||||
if db_mapped_pfs[fip_id][protocol] != ovn_fip_dict_proto:
|
||||
to_add_or_update.add(fip_id)
|
||||
|
||||
# remove pf entries that exist in ovn lb but have no fip in
|
||||
# neutron db.
|
||||
for fip_id in ovn_pfs:
|
||||
for db_pf in db_pfs:
|
||||
pf_fip_id = db_pf.get('floatingip_id')
|
||||
if pf_fip_id == fip_id:
|
||||
break
|
||||
else:
|
||||
to_remove.append(fip_id)
|
||||
|
||||
return list(to_add_or_update), to_remove
|
||||
|
||||
def _create_or_update_floatingip_pfs(self, context, fip_id, txn):
|
||||
self.l3_plugin.port_forwarding.db_sync_create_or_update(
|
||||
context, fip_id, txn)
|
||||
|
||||
def _delete_floatingip_pfs(self, context, fip_id, txn):
|
||||
self.l3_plugin.port_forwarding.db_sync_delete(
|
||||
context, fip_id, txn)
|
||||
|
||||
def sync_routers_and_rports(self, ctx):
|
||||
"""Sync Routers between neutron and NB.
|
||||
|
||||
@ -358,6 +419,7 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
|
||||
db_extends[router['id']]['routes'] = []
|
||||
db_extends[router['id']]['snats'] = []
|
||||
db_extends[router['id']]['fips'] = []
|
||||
db_extends[router['id']]['fips_pfs'] = []
|
||||
if not router.get(l3.EXTERNAL_GW_INFO):
|
||||
continue
|
||||
gateways = self._ovn_client._get_gw_info(ctx, router)
|
||||
@ -385,6 +447,11 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
|
||||
ctx, {'router_id': list(db_routers.keys())})
|
||||
for fip in fips:
|
||||
db_extends[fip['router_id']]['fips'].append(fip)
|
||||
if self.pf_plugin:
|
||||
fip_pfs = self.pf_plugin.get_floatingip_port_forwardings(
|
||||
ctx, fip['id'])
|
||||
for fip_pf in fip_pfs:
|
||||
db_extends[fip['router_id']]['fips_pfs'].append(fip_pf)
|
||||
interfaces = self.l3_plugin._get_sync_interfaces(
|
||||
ctx, list(db_routers.keys()),
|
||||
[constants.DEVICE_OWNER_ROUTER_INTF,
|
||||
@ -403,6 +470,7 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
|
||||
update_lrport_list = []
|
||||
update_snats_list = []
|
||||
update_fips_list = []
|
||||
update_pfs_list = []
|
||||
for lrouter in lrouters:
|
||||
ovn_rtr_lb_pfs = self.ovn_api.get_router_floatingip_lbs(
|
||||
utils.ovn_name(lrouter['name']))
|
||||
@ -438,6 +506,12 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
|
||||
update_fips_list.append({'id': lrouter['name'],
|
||||
'add': add_fips,
|
||||
'del': del_fips})
|
||||
db_fips_pfs = db_extends[lrouter['name']]['fips_pfs']
|
||||
add_fip_pfs, del_fip_pfs = self._calculate_fip_pfs_differences(
|
||||
ovn_rtr_lb_pfs, db_fips_pfs)
|
||||
update_pfs_list.append({'id': lrouter['name'],
|
||||
'add': add_fip_pfs,
|
||||
'del': del_fip_pfs})
|
||||
ovn_nats = lrouter['snats']
|
||||
db_snats = db_extends[lrouter['name']]['snats']
|
||||
add_snats, del_snats = helpers.diff_list_of_dict(
|
||||
@ -479,6 +553,14 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
|
||||
{'id': router['id'],
|
||||
'add': db_extends[router['id']]['fips'],
|
||||
'del': []})
|
||||
if 'fips_pfs' in db_extends[router['id']]:
|
||||
add_fip_pfs = {
|
||||
db_pf['floatingip_id'] for
|
||||
db_pf in db_extends[router['id']]['fips_pfs']}
|
||||
update_pfs_list.append(
|
||||
{'id': router['id'],
|
||||
'add': list(add_fip_pfs),
|
||||
'del': []})
|
||||
except RuntimeError:
|
||||
LOG.warning("Create router in OVN NB failed for router %s",
|
||||
router['id'])
|
||||
@ -578,6 +660,32 @@ class OvnNbSynchronizer(OvnDbSynchronizer):
|
||||
for nat in fip['add']:
|
||||
self._ovn_client._create_or_update_floatingip(
|
||||
nat, txn=txn)
|
||||
|
||||
for pf in update_pfs_list:
|
||||
if pf['del']:
|
||||
LOG.warning("Router %(id)s port forwarding for floating "
|
||||
"ips %(fip)s found in OVN but not in Neutron",
|
||||
{'id': pf['id'], 'fip': pf['del']})
|
||||
if self.mode == SYNC_MODE_REPAIR:
|
||||
LOG.warning(
|
||||
"Delete port forwarding for fips %s from "
|
||||
"OVN NB DB",
|
||||
pf['del'])
|
||||
for pf_id in pf['del']:
|
||||
self._delete_floatingip_pfs(ctx, pf_id, txn)
|
||||
if pf['add']:
|
||||
LOG.warning("Router %(id)s port forwarding for floating "
|
||||
"ips %(fip)s Neutron out of sync or missing "
|
||||
"in OVN",
|
||||
{'id': pf['id'], 'fip': pf['add']})
|
||||
if self.mode == SYNC_MODE_REPAIR:
|
||||
LOG.warning("Add port forwarding for fips %s "
|
||||
"to OVN NB DB",
|
||||
pf['add'])
|
||||
for pf_fip_id in pf['add']:
|
||||
self._create_or_update_floatingip_pfs(
|
||||
ctx, pf_fip_id, txn)
|
||||
|
||||
for snat in update_snats_list:
|
||||
if snat['del']:
|
||||
LOG.warning("Router %(id)s snat %(snat)s "
|
||||
|
@ -12,23 +12,30 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from collections import namedtuple
|
||||
|
||||
from neutron.common.ovn import acl as acl_utils
|
||||
from neutron.common.ovn import constants as ovn_const
|
||||
from neutron.common.ovn import utils
|
||||
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf as ovn_config
|
||||
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovn_db_sync
|
||||
from neutron.services.portforwarding.drivers.ovn.driver import \
|
||||
OVNPortForwarding as ovn_pf
|
||||
from neutron.services.segments import db as segments_db
|
||||
from neutron.tests.functional import base
|
||||
from neutron.tests.unit.api import test_extensions
|
||||
from neutron.tests.unit.extensions import test_extraroute
|
||||
from neutron.tests.unit.extensions import test_securitygroup
|
||||
from neutron_lib.api.definitions import dns as dns_apidef
|
||||
from neutron_lib.api.definitions import fip_pf_description as ext_pf_def
|
||||
from neutron_lib.api.definitions import floating_ip_port_forwarding as pf_def
|
||||
from neutron_lib.api.definitions import l3
|
||||
from neutron_lib.api.definitions import port_security as ps
|
||||
from neutron_lib import constants
|
||||
from neutron_lib import context
|
||||
from oslo_utils import uuidutils
|
||||
from ovsdbapp.backend.ovs_idl import idlutils
|
||||
from ovsdbapp import constants as ovsdbapp_const
|
||||
|
||||
|
||||
class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
@ -56,6 +63,8 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
self.delete_lrouter_ports = []
|
||||
self.delete_lrouter_routes = []
|
||||
self.delete_lrouter_nats = []
|
||||
self.create_fip_fws = []
|
||||
self.delete_fip_fws = []
|
||||
self.delete_acls = []
|
||||
self.create_port_groups = []
|
||||
self.delete_port_groups = []
|
||||
@ -136,6 +145,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
update_port_ids_v4 = []
|
||||
update_port_ids_v6 = []
|
||||
n1_port_dict = {}
|
||||
n1_port_details_dict = {}
|
||||
for p in ['p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7']:
|
||||
if p in ['p1', 'p5']:
|
||||
port_kwargs = {
|
||||
@ -152,6 +162,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
**port_kwargs)
|
||||
port = self.deserialize(self.fmt, res)
|
||||
n1_port_dict[p] = port['port']['id']
|
||||
n1_port_details_dict[p] = port['port']
|
||||
lport_name = port['port']['id']
|
||||
|
||||
lswitch_name = 'neutron-' + n1['network']['id']
|
||||
@ -432,6 +443,45 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
self.context, r1_f2['id'], {'floatingip': {
|
||||
'port_id': n1_port_dict['p2']}})
|
||||
|
||||
# Floating ip used for exercising port forwarding (via ovn lb)
|
||||
r1_f3 = self.l3_plugin.create_floatingip(
|
||||
self.context, {'floatingip': {
|
||||
'tenant_id': self._tenant_id,
|
||||
'floating_network_id': e1['network']['id'],
|
||||
'floating_ip_address': '100.0.0.22',
|
||||
'subnet_id': None,
|
||||
'port_id': None}})
|
||||
|
||||
p5_ip = n1_port_details_dict['p5']['fixed_ips'][0]['ip_address']
|
||||
fip_pf_args = {
|
||||
pf_def.EXTERNAL_PORT: 2222,
|
||||
pf_def.INTERNAL_PORT: 22,
|
||||
pf_def.INTERNAL_PORT_ID: n1_port_dict['p5'],
|
||||
pf_def.PROTOCOL: "tcp",
|
||||
ext_pf_def.DESCRIPTION_FIELD: 'PortFwd r1_f3_p5:22 tcp',
|
||||
pf_def.INTERNAL_IP_ADDRESS: p5_ip}
|
||||
fip_args = {pf_def.RESOURCE_NAME: {pf_def.RESOURCE_NAME: fip_pf_args}}
|
||||
self.pf_plugin.create_floatingip_port_forwarding(
|
||||
self.context, r1_f3['id'], **fip_args)
|
||||
|
||||
# Add port forwarding with same external and internal value
|
||||
fip_pf_args[pf_def.EXTERNAL_PORT] = 80
|
||||
fip_pf_args[pf_def.INTERNAL_PORT] = 80
|
||||
fip_pf_args[ext_pf_def.DESCRIPTION_FIELD] = 'PortFwd r1_f3_p5:80 tcp'
|
||||
self.pf_plugin.create_floatingip_port_forwarding(
|
||||
self.context, r1_f3['id'], **fip_args)
|
||||
|
||||
fip_pf_args = {
|
||||
pf_def.EXTERNAL_PORT: 5353,
|
||||
pf_def.INTERNAL_PORT: 53,
|
||||
pf_def.INTERNAL_PORT_ID: n1_port_dict['p5'],
|
||||
pf_def.PROTOCOL: "udp",
|
||||
ext_pf_def.DESCRIPTION_FIELD: 'PortFwd r1_f3_p5:53 udp',
|
||||
pf_def.INTERNAL_IP_ADDRESS: p5_ip}
|
||||
fip_args = {pf_def.RESOURCE_NAME: {pf_def.RESOURCE_NAME: fip_pf_args}}
|
||||
self.pf_plugin.create_floatingip_port_forwarding(
|
||||
self.context, r1_f3['id'], **fip_args)
|
||||
|
||||
# update External subnet gateway ip to test function _subnet_update
|
||||
# of L3 OVN plugin.
|
||||
data = {'subnet': {'gateway_ip': '100.0.0.1'}}
|
||||
@ -480,6 +530,21 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
'logical_ip':
|
||||
r1_f1['fixed_ip_address'],
|
||||
'type': 'dnat_and_snat'}))
|
||||
# Floating IP Port Forwardings
|
||||
self.create_fip_fws.append(('pf-floatingip-{}-tcp'.format(r1_f3['id']),
|
||||
{'vip': '{}:8080'.format(
|
||||
r1_f3['floating_ip_address']),
|
||||
'ips': ['{}:80'.format(p5_ip)],
|
||||
'protocol': 'tcp',
|
||||
'may_exist': False},
|
||||
'neutron-' + r1['id'],))
|
||||
self.delete_fip_fws.append(('pf-floatingip-{}-udp'.format(r1_f3['id']),
|
||||
{'vip': '{}:5353'.format(
|
||||
r1_f3['floating_ip_address']),
|
||||
'if_exists': False}))
|
||||
self.delete_fip_fws.append(('pf-floatingip-{}-tcp'.format(r1_f3['id']),
|
||||
{'vip': '100.9.0.99:9999',
|
||||
'if_exists': True}))
|
||||
|
||||
res = self._create_network(self.fmt, 'n4', True, **net_kwargs)
|
||||
n4 = self.deserialize(self.fmt, res)
|
||||
@ -736,6 +801,14 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
txn.add(self.nb_api.delete_nat_rule_in_lrouter(
|
||||
lrouter_name, if_exists=True, **nat_dict))
|
||||
|
||||
for lb_name, lb_dict, lrouter_name in self.create_fip_fws:
|
||||
txn.add(self.nb_api.lb_add(lb_name, **lb_dict))
|
||||
txn.add(self.nb_api.lr_lb_add(lrouter_name, lb_name,
|
||||
may_exist=True))
|
||||
|
||||
for lb_name, lb_dict in self.delete_fip_fws:
|
||||
txn.add(self.nb_api.lb_del(lb_name, **lb_dict))
|
||||
|
||||
for acl in self.create_acls:
|
||||
txn.add(self.nb_api.add_acl(**acl))
|
||||
|
||||
@ -1139,6 +1212,9 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
fip_port = ''
|
||||
if fip['id'] in fip_macs:
|
||||
fip_port = fip['port_id']
|
||||
# Fips that do not have fip_port are used as port forwarding,
|
||||
# and we shall skip those in this iteration
|
||||
if fip_port:
|
||||
db_nats[fip['router_id']].append(
|
||||
fip['floating_ip_address'] + fip['fixed_ip_address'] +
|
||||
'dnat_and_snat' + mac_address + fip_port)
|
||||
@ -1307,6 +1383,57 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
AssertionError, self.assertItemsEqual, r_nats,
|
||||
monitor_nats)
|
||||
|
||||
def _validate_fip_port_forwarding(self, should_match=True):
|
||||
fip_pf_cmp = namedtuple(
|
||||
'fip_pf_cmp',
|
||||
'fip_id proto rtr_name ext_ip ext_port int_ip int_port')
|
||||
|
||||
# Helper function to break a single ovn lb entry into multiple
|
||||
# floating ip port forwarding entries.
|
||||
def _parse_ovn_lb_pf(ovn_lb):
|
||||
protocol = (ovn_lb.protocol[0]
|
||||
if ovn_lb.protocol else ovsdbapp_const.PROTO_TCP)
|
||||
ext_ids = ovn_lb.external_ids
|
||||
fip_id = ext_ids[ovn_const.OVN_FIP_EXT_ID_KEY]
|
||||
router_name = ext_ids[ovn_const.OVN_ROUTER_NAME_EXT_ID_KEY]
|
||||
for vip, ips in ovn_lb.vips.items():
|
||||
ext_ip, ext_port = vip.split(':')
|
||||
for ip in ips.split(','):
|
||||
int_ip, int_port = ip.split(':')
|
||||
yield fip_pf_cmp(fip_id, protocol, router_name,
|
||||
ext_ip, int(ext_port),
|
||||
int_ip, int(int_port))
|
||||
|
||||
_plugin_nb_ovn = self.mech_driver._nb_ovn
|
||||
db_pfs = []
|
||||
fips = self._list('floatingips')
|
||||
for fip in fips['floatingips']:
|
||||
for pf in self.pf_plugin.get_floatingip_port_forwardings(
|
||||
self.ctx, floatingip_id=fip['id']):
|
||||
db_pfs.append(fip_pf_cmp(
|
||||
fip['id'],
|
||||
ovn_pf.ovn_lb_protocol(pf['protocol']),
|
||||
utils.ovn_name(pf['router_id']),
|
||||
pf['floating_ip_address'],
|
||||
pf['external_port'],
|
||||
pf['internal_ip_address'],
|
||||
pf['internal_port'],
|
||||
))
|
||||
|
||||
nb_pfs = []
|
||||
rtr_names = [row.name for row in _plugin_nb_ovn._tables[
|
||||
'Logical_Router'].rows.values()]
|
||||
for rtr_name in rtr_names:
|
||||
for ovn_lb in _plugin_nb_ovn.get_router_floatingip_lbs(rtr_name):
|
||||
for pf in _parse_ovn_lb_pf(ovn_lb):
|
||||
nb_pfs.append(pf)
|
||||
|
||||
if should_match:
|
||||
self.assertItemsEqual(nb_pfs, db_pfs)
|
||||
else:
|
||||
self.assertRaises(AssertionError, self.assertItemsEqual,
|
||||
nb_pfs, db_pfs)
|
||||
|
||||
def _validate_port_groups(self, should_match=True):
|
||||
_plugin_nb_ovn = self.mech_driver._nb_ovn
|
||||
|
||||
@ -1381,6 +1508,7 @@ class TestOvnNbSync(base.TestOVNFunctionalBase):
|
||||
self._validate_dhcp_opts(should_match=should_match)
|
||||
self._validate_acls(should_match=should_match)
|
||||
self._validate_routers_and_router_ports(should_match=should_match)
|
||||
self._validate_fip_port_forwarding(should_match=should_match)
|
||||
self._validate_port_groups(should_match=should_match)
|
||||
self._validate_dns_records(should_match=should_match)
|
||||
|
||||
|
@ -373,6 +373,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
|
||||
ovn_api = ovn_nb_synchronizer.ovn_api
|
||||
ovn_driver = ovn_nb_synchronizer.ovn_driver
|
||||
l3_plugin = ovn_nb_synchronizer.l3_plugin
|
||||
pf_plugin = ovn_nb_synchronizer.pf_plugin
|
||||
segments_plugin = ovn_nb_synchronizer.segments_plugin
|
||||
|
||||
core_plugin.get_networks = mock.Mock()
|
||||
@ -445,6 +446,7 @@ class TestOvnNbSyncML2(test_mech_driver.OVNMechanismDriverTestCase):
|
||||
# end of router-sync block
|
||||
l3_plugin.get_floatingips = mock.Mock()
|
||||
l3_plugin.get_floatingips.return_value = self.floating_ips
|
||||
pf_plugin.get_floatingip_port_forwardings = mock.Mock(return_value=[])
|
||||
ovn_api.get_all_logical_switches_with_ports = mock.Mock()
|
||||
ovn_api.get_all_logical_switches_with_ports.return_value = (
|
||||
self.lswitches_with_ports)
|
||||
|
Loading…
x
Reference in New Issue
Block a user