Support ovn-bgp agent with DVR disabled

The patch adds events that match on gateway chassis hosting the cr lrp
ports for the given floating ips. If there is a failover detected and
the CR LRP moves to a different chassis, all the floating IPs are moved
to with it.

It also adds functional tests for agent and its watchers. The watchers
part can be improved and the functional framework can be used further to
extend testing coverage.

Closes-Bug: #2056477

Change-Id: Ia857df921eb32e5e822fc790064179b07351d8d3
Signed-off-by: Jakub Libosvar <libosvar@redhat.com>
(cherry picked from commit 89421f84beb9cbd11fef1d3d73445b79966688c9)
This commit is contained in:
Jakub Libosvar 2024-10-04 18:19:23 +00:00
parent ed9cd177f8
commit 165c2aa147
14 changed files with 967 additions and 46 deletions

View File

@ -42,6 +42,7 @@ OVN_LB_VIP_IP_EXT_ID_KEY = 'neutron:vip'
OVN_LB_VIP_FIP_EXT_ID_KEY = 'neutron:vip_fip'
OVN_LB_VIP_PORT_EXT_ID_KEY = 'neutron:vip_port_id'
OVN_LB_LR_REF_EXT_ID_KEY = 'lr_ref'
OVN_FIP_DISTRIBUTED = 'neutron:fip-distributed'
OVN_LB_EXT_ID_ROUTER_KEY = [
OVN_LB_LR_REF_EXT_ID_KEY,

View File

@ -15,3 +15,7 @@
class NATNotFound(Exception):
pass
class ChassisNotFound(Exception):
pass

View File

@ -25,6 +25,7 @@ from ovn_bgp_agent.drivers import driver_api
from ovn_bgp_agent.drivers.openstack import nb_exceptions
from ovn_bgp_agent.drivers.openstack.utils import bgp as bgp_utils
from ovn_bgp_agent.drivers.openstack.utils import driver_utils
from ovn_bgp_agent.drivers.openstack.utils import nat as nat_utils
from ovn_bgp_agent.drivers.openstack.utils import ovn
from ovn_bgp_agent.drivers.openstack.utils import ovs
from ovn_bgp_agent.drivers.openstack.utils import port as port_utils
@ -40,7 +41,8 @@ LOG = logging.getLogger(__name__)
# logging.basicConfig(level=logging.DEBUG)
OVN_TABLES = ['Logical_Switch_Port', 'NAT', 'Logical_Switch', 'Logical_Router',
'Logical_Router_Port', 'Load_Balancer', 'DHCP_Options']
'Logical_Router_Port', 'Load_Balancer', 'DHCP_Options',
'NB_Global']
LOCAL_CLUSTER_OVN_TABLES = ['Logical_Switch', 'Logical_Switch_Port',
'Logical_Router', 'Logical_Router_Port',
'Logical_Router_Policy',
@ -48,6 +50,67 @@ LOCAL_CLUSTER_OVN_TABLES = ['Logical_Switch', 'Logical_Switch_Port',
'Static_MAC_Binding']
def _validate_ovn_version(distributed, idl):
if not distributed and 'gateway_port' not in idl.tables['NAT'].columns:
raise RuntimeError(
"Centralized routing requires gateway_port column in the "
"OVN_Northbound schema. Please update OVN to 23.09.0 or later.")
class NATExposer:
def __init__(self, agent):
self.agent = agent
def expose_fip_from_nat(self, nat):
raise RuntimeError(
"The exposer does not have distributed flag set yet")
def withdraw_fip_from_nat(self, nat):
raise RuntimeError(
"The exposer does not have distributed flag set yet")
@property
def distributed(self):
pass
@distributed.setter
def distributed(self, value):
if value:
self.expose_fip_from_nat = self._expose_nat_distributed
self.withdraw_fip_from_nat = self._withdraw_nat_distributed
else:
self.expose_fip_from_nat = self._expose_nat_centralized
self.withdraw_fip_from_nat = self._withdraw_nat_centralized
def _expose_nat_distributed(self, nat):
raise NotImplementedError("Distributed NAT is not implemented yet.")
def _expose_nat_centralized(self, nat):
net_id = nat.external_ids[constants.OVN_FIP_NET_EXT_ID_KEY]
ls_name = "neutron-{}".format(net_id)
try:
mac = nat_utils.get_gateway_lrp(nat).mac
except IndexError:
LOG.error("Gateway port for NAT entry %s has no MAC address set",
nat.uuid)
return
# nat has the logical port and its in the db, that was checked in
# match_fn
lsp = self.agent.nb_idl.lsp_get(
nat.logical_port[0]).execute(check_error=True)
self.agent.expose_fip(nat.external_ip, mac, ls_name, lsp)
def _withdraw_nat_distributed(self, nat):
raise NotImplementedError("Distributed NAT is not implemented yet.")
def _withdraw_nat_centralized(self, nat):
lsp = self.agent.nb_idl.lsp_get(nat.logical_port[0]).execute()
self.agent.withdraw_fip(nat.external_ip, lsp)
class NBOVNBGPDriver(driver_api.AgentDriverBase):
def __init__(self):
@ -58,6 +121,20 @@ class NBOVNBGPDriver(driver_api.AgentDriverBase):
self._nb_idl = None
self._local_nb_idl = None
self._post_start_event = threading.Event()
self.nat_exposer = NATExposer(self)
self.__d_events = {
True: [
watcher.NATMACAddedEvent(self),
watcher.LogicalSwitchPortFIPCreateEvent(self),
watcher.LogicalSwitchPortFIPDeleteEvent(self),
],
False: [
watcher.ExposeFIPOnCRLRP(self),
watcher.WithdrawFIPOnCRLRP(self),
watcher.CrLrpChassisChangeExposeEvent(self),
watcher.CrLrpChassisChangeWithdrawEvent(self),
]}
@property
def _expose_tenant_networks(self):
@ -84,6 +161,30 @@ class NBOVNBGPDriver(driver_api.AgentDriverBase):
def local_nb_idl(self, val):
self._local_nb_idl = val
@property
def distributed(self):
if not hasattr(self, '_distributed'):
self._distributed = self.nb_idl.get_distributed_flag()
_validate_ovn_version(self._distributed, self.nb_idl.idl)
self.nat_exposer.distributed = self._distributed
return self._distributed
@distributed.setter
def distributed(self, value):
_validate_ovn_version(value, self.nb_idl.idl)
self._distributed = value
self.nat_exposer.distributed = value
self._switch_distributed_events(value)
def _switch_distributed_events(self, distributed):
to_unwatch = self._get_additional_events(not distributed)
to_watch = self._get_additional_events(distributed)
self.nb_idl.ovsdb_connection.idl.notify_handler.unwatch_events(
to_unwatch)
self.nb_idl.ovsdb_connection.idl.notify_handler.watch_events(
to_watch)
def _init_vars(self):
self.ovn_bridge_mappings = {} # {'public': 'br-ex'}
self.ovs_flows = {}
@ -128,7 +229,7 @@ class NBOVNBGPDriver(driver_api.AgentDriverBase):
self._post_start_event.clear()
events = self._get_events()
events = self._get_base_events()
self.nb_idl = ovn.OvnNbIdl(
self.ovn_remote,
tables=OVN_TABLES,
@ -142,21 +243,23 @@ class NBOVNBGPDriver(driver_api.AgentDriverBase):
events=[],
leader_only=True).start()
self.nb_idl.ovsdb_connection.idl.notify_handler.watch_events(
self._get_additional_events(self.distributed))
# Now IDL connections can be safely used
self._post_start_event.set()
def _get_events(self):
def _get_base_events(self):
events = {watcher.LogicalSwitchPortProviderCreateEvent(self),
watcher.LogicalSwitchPortProviderDeleteEvent(self),
watcher.LogicalSwitchPortFIPCreateEvent(self),
watcher.LogicalSwitchPortFIPDeleteEvent(self),
watcher.OVNLBCreateEvent(self),
watcher.OVNLBDeleteEvent(self),
watcher.OVNPFCreateEvent(self),
watcher.OVNPFDeleteEvent(self),
watcher.ChassisRedirectCreateEvent(self),
watcher.ChassisRedirectDeleteEvent(self),
watcher.NATMACAddedEvent(self)}
watcher.DistributedFlagChangedEvent(self),
}
if CONF.exposing_method == constants.EXPOSE_METHOD_VRF:
# For vrf we require more information on the logical_switch
@ -175,6 +278,9 @@ class NBOVNBGPDriver(driver_api.AgentDriverBase):
})
return events
def _get_additional_events(self, distributed):
return self.__d_events[distributed]
@lockutils.synchronized('nbbgp')
def frr_sync(self):
LOG.debug("Ensuring VRF configuration for advertising routes")
@ -211,8 +317,14 @@ class NBOVNBGPDriver(driver_api.AgentDriverBase):
'address_scopes': driver_utils.get_addr_scopes(port)}
self._expose_subnet(ips, subnet_info)
# add missing routes/ips for IPs on provider network
# add missing routes/ips for IPs on provider network and FIPs
ports = self.nb_idl.get_active_lsp_on_chassis(self.chassis)
if not self.distributed:
# expose all FIPs if this chassis hosts the gateway port
lsp_with_fips = ovn.GetLSPsForGwChassisCommand(
self.nb_idl, self.chassis_id).execute(check_error=True)
for lsp_data in lsp_with_fips:
self._expose_fip(*lsp_data)
for port in ports:
if port.type not in [constants.OVN_VM_VIF_PORT_TYPE,
constants.OVN_VIRTUAL_VIF_PORT_TYPE]:

View File

@ -0,0 +1,33 @@
# Copyright 2024 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ovn_bgp_agent import constants
from ovn_bgp_agent.drivers.openstack import nb_exceptions
def get_gateway_lrp(nat):
try:
return nat.gateway_port[0]
except IndexError:
raise nb_exceptions.NATNotFound("Port %s has no NAT entry" % nat.uuid)
def get_chassis_hosting_crlrp(nat):
gateway_lrp = get_gateway_lrp(nat)
try:
return gateway_lrp.status[constants.OVN_STATUS_CHASSIS]
except KeyError:
raise nb_exceptions.ChassisNotFound(
"Gateway port %s has no chassis set in its status column.",
gateway_lrp)

View File

@ -375,6 +375,46 @@ class StaticMACBindingDelCommand(command.BaseCommand):
"exist" % (self.port, self.ip))
class GetLSPsForGwChassisCommand(command.ReadOnlyCommand):
"""Get all LSPs associated with a NAT entry.
(FIPs) which its CR-LRP ports (gateway ports) are hosted on the given
chassis.
The resulted format is in tuple that can be consumed by the `wire` module.
"""
def __init__(self, api, chassis_id):
super().__init__(api)
self.chassis_id = chassis_id
def run_idl(self, txn):
self.result = []
for nat in self.api.tables['NAT'].rows.values():
if nat.type != 'dnat_and_snat':
continue
try:
gw_chassis = nat.gateway_port[0].status.get(
constants.OVN_STATUS_CHASSIS)
except IndexError:
continue
if gw_chassis == self.chassis_id:
ls_name = "neutron-{}".format(
nat.external_ids[constants.OVN_FIP_NET_EXT_ID_KEY])
try:
lsp = self.api.lookup(
'Logical_Switch_Port', nat.logical_port[0])
except IndexError:
LOG.debug("NAT %s does not have a logical_port set", nat)
continue
self.result.append(
(nat.external_ip,
nat.gateway_port[0].mac,
ls_name,
lsp))
class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
def __init__(self, connection):
super(OvsdbNbOvnIdl, self).__init__(connection)
@ -532,6 +572,34 @@ class OvsdbNbOvnIdl(nb_impl_idl.OvnNbApiIdlImpl, Backend):
router_name = 'neutron-' + router
return self.lr_get(router_name).execute(check_error=True)
def get_distributed_flag(self):
"""Return distributed flag set by Neutron.
The method returns True by default if the flag is not set in Neutron.
"""
nb_global_ext_ids = self.db_get(
'NB_Global', '.', 'external_ids').execute(check_error=True)
try:
distributed = nb_global_ext_ids[constants.OVN_FIP_DISTRIBUTED]
except KeyError:
LOG.warning(
"Agent did not find %s configuration in the OVN Northbound "
"NB_Global table. This option determines if the traffic is "
"centralized or distributed. The agent default to "
"distributed traffic but be aware Neutron by default works "
"in centralized manner. Please make sure Neutron has "
"the enable_distributed_floating_ip config option set "
"to True." % constants.OVN_FIP_DISTRIBUTED)
distributed = "True"
return True if distributed == "True" else False
def get_nats_by_lrp(self, lrp):
return self.db_find_rows(
'NAT',
('gateway_port', '=', lrp.uuid),
('type', '=', constants.OVN_DNAT_AND_SNAT)
).execute(check_error=True)
class OvsdbSbOvnIdl(sb_impl_idl.OvnSbApiIdlImpl, Backend):
def __init__(self, connection):

View File

@ -17,6 +17,7 @@ from ovsdbapp.backend.ovs_idl import event as row_event
from ovn_bgp_agent import constants
from ovn_bgp_agent.drivers.openstack.utils import driver_utils
from ovn_bgp_agent.drivers.openstack.utils import nat as nat_utils
LOG = logging.getLogger(__name__)
@ -101,9 +102,48 @@ class ChassisPrivateCreateEvent(ChassisCreateEventBase):
table = 'Chassis_Private'
class DnatSnatUpdatedBaseEvent(Event):
class DnatSnatBaseEvent(Event):
events = None
def __init__(self, bgp_agent):
events = (self.ROW_UPDATE)
table = 'NAT'
super().__init__(
bgp_agent, events, table, (('type', '=', 'dnat_and_snat'),))
bgp_agent,
self.__class__.events,
table,
(('type', '=', 'dnat_and_snat'),))
class FipOnCRLRPBaseEvent(DnatSnatBaseEvent):
"""Base class for NAT event.
It matches if the associated LSP is either virtual port or a normal port
and if the gateway chassis port is on this chassis.
"""
def match_fn(self, event, row, old):
try:
lsp_uuid = row.logical_port[0]
except IndexError:
LOG.warning("NAT entry %s has no associated logical port",
row.uuid)
return False
lsp = self.agent.nb_idl.lsp_get(
lsp_uuid).execute(check_error=True)
if not lsp:
LOG.warning("Switch Port %s cannot be found" % lsp_uuid)
return False
if lsp.type not in [constants.OVN_VM_VIF_PORT_TYPE,
constants.OVN_VIRTUAL_VIF_PORT_TYPE]:
return False
if constants.OVN_FIP_NET_EXT_ID_KEY not in row.external_ids:
LOG.error("NAT entry %(nat)s does not have %(fip_net)s set in "
"external_ids", {
'nat': row.uuid,
'fip_net': constants.OVN_FIP_NET_EXT_ID_KEY})
return False
gw_chassis = nat_utils.get_chassis_hosting_crlrp(row)
return self.agent.chassis_id == gw_chassis

View File

@ -879,7 +879,9 @@ class OVNPFDeleteEvent(OVNPFBaseEvent):
self.agent.withdraw_ovn_pf_lb_fip(row)
class NATMACAddedEvent(base_watcher.DnatSnatUpdatedBaseEvent):
class NATMACAddedEvent(base_watcher.DnatSnatBaseEvent):
events = (base_watcher.DnatSnatBaseEvent.ROW_UPDATE,)
def match_fn(self, event, row, old):
try:
lsp_id = row.logical_port[0]
@ -934,3 +936,141 @@ class NATMACAddedEvent(base_watcher.DnatSnatUpdatedBaseEvent):
with _SYNC_STATE_LOCK.read_lock():
self.agent.expose_fip(
row.external_ip, row.external_mac[0], ls_name, lsp)
class ExposeFIPOnCRLRP(base_watcher.FipOnCRLRPBaseEvent):
"""Expose floating IP on the gateway chassis hosting the gateway port.
This event happens when NAT entry is created. It exposes the floating IP on
the gateway chassis hosting the gateway router.
"""
events = (base_watcher.DnatSnatBaseEvent.ROW_CREATE,)
def run(self, event, row, old):
with _SYNC_STATE_LOCK.read_lock():
self.agent.nat_exposer.expose_fip_from_nat(row)
class WithdrawFIPOnCRLRP(base_watcher.FipOnCRLRPBaseEvent):
"""Withdraw floating IP from the gateway chassis hosting the gateway port.
This event happens when NAT entry is deleted. It withdraws the floating IP
from the gateway chassis hosting the gateway router.
"""
events = (base_watcher.DnatSnatBaseEvent.ROW_DELETE,)
def run(self, event, row, old):
self.agent.nat_exposer.withdraw_fip_from_nat(row)
class CrLrpChassisChangeBaseEvent(base_watcher.LRPChassisEvent):
"""Base class for case when gateway port moves.
It matches if the hosting-chassis in status column of the gateway port has
changed.
"""
def __init__(self, bgp_agent):
super().__init__(bgp_agent, (self.ROW_UPDATE,))
def match_fn(self, event, row, old):
new_chassis = row.status.get(constants.OVN_STATUS_CHASSIS)
try:
old_chassis = old.status.get(constants.OVN_STATUS_CHASSIS)
except AttributeError:
return False
# Match only if the port was moved
return new_chassis != old_chassis
class CrLrpChassisChangeExposeEvent(CrLrpChassisChangeBaseEvent):
"""A LRP event to expose floating IPs on centralized node.
Expose all floating IPs hosted by a router with this gateway port hosted on
this chassis. It matches in case of gateway port changes its hosting
chassis to this chassis.
"""
def match_fn(self, event, row, old):
if not super().match_fn(event, row, old):
return False
if constants.OVN_STATUS_CHASSIS not in row.status:
return False
if row.status[constants.OVN_STATUS_CHASSIS] != self.agent.chassis_id:
return False
return True
def run(self, event, row, old):
nats = self.agent.nb_idl.get_nats_by_lrp(row)
with _SYNC_STATE_LOCK.read_lock():
for nat in nats:
self.agent.nat_exposer.expose_fip_from_nat(nat)
class CrLrpChassisChangeWithdrawEvent(CrLrpChassisChangeBaseEvent):
"""A LRP event to expose floating IPs on centralized node.
Expose all floating IPs hosted by a router with this gateway port hosted on
this chassis. It matches in case of gateway port changes its hosting
chassis to this chassis.
"""
def match_fn(self, event, row, old):
if not super().match_fn(event, row, old):
return False
# if old does not have status, it would have failed in
# super().match_fn()
if constants.OVN_STATUS_CHASSIS not in old.status:
return False
if old.status[constants.OVN_STATUS_CHASSIS] != self.agent.chassis_id:
return False
return True
def run(self, event, row, old):
nats = self.agent.nb_idl.get_nats_by_lrp(row)
with _SYNC_STATE_LOCK.read_lock():
for nat in nats:
self.agent.nat_exposer.withdraw_fip_from_nat(nat)
class DistributedFlagChangedEvent(base_watcher.Event):
"""Re-register events if Neutron changed the distributed flag.
The event matches if distributed flag was switched in OVN. Then it
re-registers the events to react on the right events and does a full
re-sync to withdraw or expose IPs.
"""
def __init__(self, bgp_agent):
table = 'NB_Global'
events = (self.ROW_UPDATE,)
super().__init__(bgp_agent, events, table)
self.event_name = self.__class__.__name__
def match_fn(self, event, row, old):
try:
if (old.external_ids.get(constants.OVN_FIP_DISTRIBUTED) ==
row.external_ids[constants.OVN_FIP_DISTRIBUTED]):
return False
except KeyError:
# Distributed flag was deleted, behave like distributed agent
pass
except AttributeError:
return False
return True
def run(self, event, row, old):
if row.external_ids.get(constants.OVN_FIP_DISTRIBUTED) == "True":
self.agent.distributed = True
elif row.external_ids.get(constants.OVN_FIP_DISTRIBUTED) == "False":
self.agent.distributed = False
else:
# Default to True
self.agent.distributed = True
self.agent.sync()
self.agent.frr_sync()

View File

@ -18,16 +18,21 @@ import functools
import inspect
import os
import sys
from unittest import mock
import eventlet.timeout
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import fileutils
from oslo_utils import uuidutils
from oslotest import base
from ovsdbapp.backend.ovs_idl import connection
from ovsdbapp.tests.functional import base as ovsdbapp_base
import ovn_bgp_agent
from ovn_bgp_agent import config
from ovn_bgp_agent.drivers.openstack import nb_ovn_bgp_driver
from ovn_bgp_agent.drivers.openstack.utils import ovn
from ovn_bgp_agent.tests.functional import fixtures
@ -78,35 +83,8 @@ def sanitize_log_path(path):
return path.replace(' ', '-').replace('(', '_').replace(')', '_')
# Test worker cannot survive eventlet's Timeout exception, which effectively
# kills the whole worker, with all test cases scheduled to it. This metaclass
# makes all test cases convert Timeout exceptions into unittest friendly
# failure mode (self.fail).
class BaseFunctionalTestCase(base.BaseTestCase,
metaclass=_CatchTimeoutMetaclass):
"""Base class for functional tests."""
COMPONENT_NAME = 'ovn_bgp_agent'
PRIVILEGED_GROUP = 'privsep'
def setUp(self):
super(BaseFunctionalTestCase, self).setUp()
logging.register_options(CONF)
setup_logging(self.COMPONENT_NAME)
fileutils.ensure_tree(DEFAULT_LOG_DIR, mode=0o755)
log_file = sanitize_log_path(
os.path.join(DEFAULT_LOG_DIR, "%s.txt" % self.id()))
self.flags(log_file=log_file)
config.register_opts()
config.setup_privsep()
privsep_helper = os.path.join(
os.getenv('VIRTUAL_ENV', os.path.dirname(sys.executable)[:-4]),
'bin', 'privsep-helper')
self.flags(
helper_command=' '.join(['sudo', '-E', privsep_helper]),
group=self.PRIVILEGED_GROUP)
def flags(self, **kw):
def configure_functional_test(id_):
def flags(**kw):
"""Override some configuration values.
The keyword arguments are the names of configuration options to
@ -122,10 +100,111 @@ class BaseFunctionalTestCase(base.BaseTestCase,
for k, v in kw.items():
CONF.set_override(k, v, group)
COMPONENT_NAME = 'ovn_bgp_agent'
PRIVILEGED_GROUP = 'privsep'
logging.register_options(CONF)
setup_logging(COMPONENT_NAME)
fileutils.ensure_tree(DEFAULT_LOG_DIR, mode=0o755)
log_file = sanitize_log_path(
os.path.join(DEFAULT_LOG_DIR, "%s.txt" % id_))
config.register_opts()
flags(log_file=log_file)
config.setup_privsep()
privsep_helper = os.path.join(
os.getenv('VIRTUAL_ENV', os.path.dirname(sys.executable)[:-4]),
'bin', 'privsep-helper')
flags(
helper_command=' '.join(['sudo', '-E', privsep_helper]),
group=PRIVILEGED_GROUP)
# Test worker cannot survive eventlet's Timeout exception, which effectively
# kills the whole worker, with all test cases scheduled to it. This metaclass
# makes all test cases convert Timeout exceptions into unittest friendly
# failure mode (self.fail).
class BaseFunctionalTestCase(base.BaseTestCase,
metaclass=_CatchTimeoutMetaclass):
"""Base class for functional tests."""
def setUp(self):
super(BaseFunctionalTestCase, self).setUp()
configure_functional_test(self.id())
class BaseFunctionalNorthboundTestCase(ovsdbapp_base.FunctionalTestCase):
schemas = ['OVN_Northbound']
schemas = ['OVN_Northbound', 'Open_vSwitch']
COMPONENT_NAME = 'ovn_bgp_agent'
PRIVILEGED_GROUP = 'privsep'
def setUp(self):
super().setUp()
self.api = self.useFixture(fixtures.NbApiFixture(self.connection)).obj
self.nb_api = self.useFixture(
fixtures.NbApiFixture(self.connection['OVN_Northbound'])).obj
class BaseFunctionalNBAgentTestCase(BaseFunctionalNorthboundTestCase):
@classmethod
def create_connection(cls, schema):
if schema == 'OVN_Northbound':
idl = ovn.OvnNbIdl.from_server(cls.schema_map[schema], schema)
return connection.Connection(idl, timeout=5)
else:
return super().create_connection(schema)
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.agent_config = {
None: {
'exposing_method': 'vrf',
'ovsdb_connection': cls.schema_map['Open_vSwitch'],
},
'ovn': {
'ovn_nb_connection': cls.schema_map['OVN_Northbound'],
},
}
def setUp(self):
super().setUp()
configure_functional_test(self.id())
# TODO(jlibosva): Find a way to isolate vrf and frr processes
self.bgp_utils = mock.patch.object(
nb_ovn_bgp_driver, 'bgp_utils').start()
self.ovs_api = self.configure_local_ovs()
self.set_agent()
self.agent = nb_ovn_bgp_driver.NBOVNBGPDriver()
self.agent.start()
def set_agent(self):
for group, options in self.__class__.agent_config.items():
for key, value in options.items():
CONF.set_override(key, value, group)
# We do not want to interfere with the syncs
self.agent_sync = mock.patch.object(
nb_ovn_bgp_driver.NBOVNBGPDriver, 'sync').start()
self.agent_frr_sync = mock.patch.object(
nb_ovn_bgp_driver.NBOVNBGPDriver, 'frr_sync').start()
def configure_local_ovs(self):
ovs_api = self.useFixture(
fixtures.OvsApiFixture(self.connection['Open_vSwitch'])).obj
system_id = uuidutils.generate_uuid()
ovs_config_external_ids = {
'system-id': system_id,
'hostname': f'func-{system_id}',
'ovn-nb-remote': self.schema_map['OVN_Northbound'],
}
ovs_api.db_set(
'Open_vSwitch', '.', external_ids=ovs_config_external_ids
).execute(check_error=True)
return ovs_api

View File

@ -13,16 +13,17 @@
# limitations under the License.
from ovn_bgp_agent import constants
from ovn_bgp_agent.drivers.openstack.utils import ovn as ovn_utils
from ovn_bgp_agent.tests.functional import base
class OvsdbNbOvnIdl(base.BaseFunctionalNorthboundTestCase):
def _lsp_add(self, ls_name, lsp_name, type_, tag):
self.api.lsp_add(ls_name, lsp_name, type=type_).execute(
self.nb_api.lsp_add(ls_name, lsp_name, type=type_).execute(
check_error=True)
# lsp_add requires parent to be specified with the tag, work it
# around with the db_set
self.api.db_set(
self.nb_api.db_set(
'Logical_Switch_Port', lsp_name, ('tag', tag)).execute(
check_error=True)
@ -32,7 +33,7 @@ class OvsdbNbOvnIdl(base.BaseFunctionalNorthboundTestCase):
len_tags = len(expected_tags)
for i, tag in enumerate(expected_tags):
self.api.ls_add('ls%d' % i).execute(check_error=True)
self.nb_api.ls_add('ls%d' % i).execute(check_error=True)
ls_name = 'ls%d' % (i % 2)
lsp_name = 'localnetport%d' % i
self._lsp_add(
@ -45,5 +46,182 @@ class OvsdbNbOvnIdl(base.BaseFunctionalNorthboundTestCase):
ls_name, lsp_name,
type_=None, tag=i + len_tags)
tags = self.api.get_network_vlan_tags()
tags = self.nb_api.get_network_vlan_tags()
self.assertCountEqual(expected_tags, tags)
def test_get_distributed_flag_default(self):
self.nb_api.db_remove(
'NB_Global', '.', 'external_ids',
constants.OVN_FIP_DISTRIBUTED, if_exists=True).execute(
check_error=True)
self.assertTrue(self.nb_api.get_distributed_flag())
def test_get_distributed_flag_True(self):
self.nb_api.db_set(
'NB_Global', '.',
('external_ids', {constants.OVN_FIP_DISTRIBUTED: "True"})).execute(
check_error=True)
self.assertTrue(self.nb_api.get_distributed_flag())
def test_get_distributed_flag_False(self):
self.nb_api.db_set(
'NB_Global', '.',
('external_ids', {
constants.OVN_FIP_DISTRIBUTED: "False"})).execute(
check_error=True)
self.assertFalse(self.nb_api.get_distributed_flag())
def test_get_nats_by_lrp(self):
router_name = 'router'
with self.nb_api.transaction(check_error=True) as txn:
txn.add(self.nb_api.lr_add(router_name, nat=[]))
snat = txn.add(self.nb_api.lr_nat_add(
router_name,
constants.OVN_SNAT,
'10.0.0.1',
'192.168.0.1/24'))
dnat_lrp = txn.add(self.nb_api.lr_nat_add(
router_name,
constants.OVN_DNAT_AND_SNAT,
'10.0.0.10',
'192.168.0.10'))
dnat1_lrp = txn.add(self.nb_api.lr_nat_add(
router_name,
constants.OVN_DNAT_AND_SNAT,
'10.0.0.11',
'192.168.0.11'))
dnat_lrp2 = txn.add(self.nb_api.lr_nat_add(
router_name,
constants.OVN_DNAT_AND_SNAT,
'10.0.0.20',
'192.168.0.20'))
lrp1 = txn.add(self.nb_api.lrp_add(
router_name, 'lrp', '00:00:00:00:00:01', ['10.0.0.0/24']))
lrp2 = txn.add(self.nb_api.lrp_add(
router_name, 'lrp2', '00:00:00:00:00:02', ['10.0.2.0/24']))
with self.nb_api.transaction(check_error=True) as txn:
for nat, lrp in [
(snat.result, lrp1.result),
(dnat_lrp.result, lrp1.result),
(dnat1_lrp.result, lrp1.result),
(dnat_lrp2.result, lrp2.result)]:
txn.add(self.nb_api.db_set(
'NAT', nat.uuid, ('gateway_port', lrp.uuid)))
nats = self.nb_api.get_nats_by_lrp(lrp1.result)
self.assertCountEqual(
[dnat_lrp.result.external_ip, dnat1_lrp.result.external_ip],
[nat.external_ip for nat in nats])
class GetLSPsForGwChassisCommandTestCase(
base.BaseFunctionalNorthboundTestCase):
# format is 10.0.<router>.<switch>*10 + <port>
FIP_TEMP = '10.0.%d.%d'
GW_MAC_TEMP = '00:00:00:00:00:%d0'
def setUp(self):
super().setUp()
with self.nb_api.transaction(check_error=True) as txn:
for switch in range(6):
txn.add(self.nb_api.ls_add('ls%d' % switch))
for switch_port in range(4):
txn.add(self.nb_api.lsp_add(
'ls%d' % switch, 'lsp%d-%d' % (switch, switch_port)))
for router in range(3):
router_name = 'lr%d' % router
txn.add(self.nb_api.lr_add(router_name, nat=[]))
# gateway port
txn.add(self.nb_api.lrp_add(
'lr%d' % router,
'lrp%d-0' % router,
self.GW_MAC_TEMP % router,
['10.0.%d.0/24' % router],
status={
constants.OVN_STATUS_CHASSIS: 'chassis%d' % (
router % 2)}))
# connect two switches to the router
for lrp_index, lrp_peer in enumerate([1, 2]):
ls_index = router + lrp_peer * 2
network = '192.168.%d.0/24' % ls_index
txn.add(self.nb_api.lrp_add(
router_name,
'lrp%d-%d' % (router, lrp_peer),
'00:00:00:00:00:%d%d' % (router, lrp_index),
[network],
peer='lsp%d-0' % ls_index))
txn.add(self.nb_api.lr_nat_add(
router_name,
constants.OVN_SNAT,
'10.0.%d.1' % router,
network))
# Add DNAT rules to ports on one switch 2 NATs, then 1 NAT,
# and so on
for router, switch in enumerate(range(0, 6, 2)):
for lsp in range(1, 3):
txn.add(self.nb_api.lr_nat_add(
'lr%d' % router,
constants.OVN_DNAT_AND_SNAT,
self.FIP_TEMP % (router, switch * 10 + lsp),
'192.168.%d.%d' % (switch, lsp),
logical_port='lsp%d-%d' % (switch, lsp),
external_mac='00:00:00:00:0%d:0%d' % (switch, lsp)))
switch += 1
txn.add(self.nb_api.lr_nat_add(
'lr%d' % router,
constants.OVN_DNAT_AND_SNAT,
self.FIP_TEMP % (router, switch * 10 + 1),
'192.168.%d.1' % switch,
logical_port='lsp%d-1' % switch,
external_mac='00:00:00:00:0%d:01' % switch))
# Set the gateway ports for all NATs
for router in range(3):
for nat in self.nb_api.lr_nat_list(
'lr%d' % router).execute(check_error=True):
lrp = self.nb_api.lrp_get(
'lrp%d-0' % router).execute(check_error=True)
self.nb_api.db_set(
'NAT',
nat.uuid,
('gateway_port', lrp),
('external_ids', {
constants.OVN_FIP_NET_EXT_ID_KEY: 'external'})
).execute(check_error=True)
def _make_fip_result(self, router_index, switch_index, lsp_index):
lsp = self.nb_api.lsp_get(
'lsp%d-%d' % (switch_index, lsp_index)).execute(check_error=True)
return (self.FIP_TEMP % (router_index, switch_index * 10 + lsp_index),
self.GW_MAC_TEMP % router_index,
'neutron-external',
lsp)
def _get_expected_fips(self):
expected_found_fips = []
for router in [0, 2]:
switch = router * 2
# 2 FIPs on the first switch
for lsp in [1, 2]:
expected_found_fips.append(
self._make_fip_result(router, switch, lsp))
# 1 FIP on the second switch
switch = router * 2 + 1
expected_found_fips.append(
self._make_fip_result(router, switch, lsp_index=1))
return expected_found_fips
def test_get_lsps(self):
# We expect only LSPs from switches attached to routers 0 and 2
# each router has w switches. The first switch has 2 FIPs and the
# second has one FIP. That is 6 FIPs in total.
expected_found_fips = self._get_expected_fips()
result = ovn_utils.GetLSPsForGwChassisCommand(
self.nb_api, 'chassis0').execute()
self.assertCountEqual(expected_found_fips, result)

View File

@ -0,0 +1,133 @@
# Copyright 2024 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from ovsdbapp.backend.ovs_idl import event
from ovn_bgp_agent import constants
from ovn_bgp_agent.tests.functional import base
from ovn_bgp_agent.tests import utils
class DistributedWaitEvent(event.WaitEvent):
event_name = 'DistributedWaitEvent'
def __init__(self, timeout=5):
table = 'NB_Global'
events = (self.ROW_UPDATE,)
super().__init__(events, table, None, timeout=timeout)
def match_fn(self, event, row, old):
return row.external_ids != getattr(old, 'external_ids')
class DistributedFlagChangedEventTestCase(
base.BaseFunctionalNBAgentTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.agent_config = copy.copy(cls.agent_config)
cls.agent_config.setdefault(None, {})[
'advertisement_method_tenant_networks'] = 'subnet'
def _make_nb_global_event(self):
nb_global_event = DistributedWaitEvent()
self.nb_api.ovsdb_connection.idl.notify_handler.watch_event(
nb_global_event)
return nb_global_event
def _wait_for_events_added(self, events):
def _events_intersect():
registered_events = set(idl.notify_handler._watched_events)
intersection = events & registered_events
return intersection == events
idl = self.agent.nb_idl.ovsdb_connection.idl
utils.wait_until_true(
_events_intersect,
timeout=5,
sleep=0.1,
exception=AssertionError(
"Events %s still not registered in the agent" % events))
def _wait_for_events_removed(self, events):
def _events_disjunctive():
registered_events = set(idl.notify_handler._watched_events)
return not bool(events & registered_events)
idl = self.agent.nb_idl.ovsdb_connection.idl
utils.wait_until_true(
_events_disjunctive,
timeout=5,
sleep=0.1,
exception=AssertionError(
"Events %s still registered in the agent" % events))
def test_distributed_flag_changed(self):
distributed = self.nb_api.db_get(
'NB_Global', '.', 'external_ids').execute(check_error=True).get(
constants.OVN_FIP_DISTRIBUTED)
self.assertIsNone(distributed)
distributed_events = set(self.agent._get_additional_events(
distributed=True))
centralized_events = set(self.agent._get_additional_events(
distributed=False))
# At start there is no distributed flag but the agent should default to
# distributed
self._wait_for_events_added(distributed_events)
self._wait_for_events_removed(centralized_events)
nb_global_event = self._make_nb_global_event()
self.nb_api.db_set('NB_Global', '.', external_ids={
constants.OVN_FIP_DISTRIBUTED: "False"}).execute(check_error=True)
self.assertTrue(nb_global_event.wait())
self._wait_for_events_added(centralized_events)
self._wait_for_events_removed(distributed_events)
nb_global_event = self._make_nb_global_event()
self.nb_api.db_set('NB_Global', '.', external_ids={
constants.OVN_FIP_DISTRIBUTED: "True"}).execute(check_error=True)
self.assertTrue(nb_global_event.wait())
self._wait_for_events_added(distributed_events)
self._wait_for_events_removed(centralized_events)
nb_global_event = self._make_nb_global_event()
self.nb_api.db_set('NB_Global', '.', external_ids={
constants.OVN_FIP_DISTRIBUTED: "False"}).execute(check_error=True)
self.assertTrue(nb_global_event.wait())
self._wait_for_events_added(centralized_events)
self._wait_for_events_removed(distributed_events)
nb_global_event = self._make_nb_global_event()
self.nb_api.db_remove(
'NB_Global', '.', 'external_ids',
constants.OVN_FIP_DISTRIBUTED).execute(check_error=True)
self.assertTrue(nb_global_event.wait())
self._wait_for_events_added(distributed_events)
self._wait_for_events_removed(centralized_events)

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ovsdbapp.schema.open_vswitch import impl_idl
from ovsdbapp.tests.functional.schema import fixtures
from ovn_bgp_agent.drivers.openstack.utils import ovn
@ -19,3 +20,7 @@ from ovn_bgp_agent.drivers.openstack.utils import ovn
class NbApiFixture(fixtures.ApiImplFixture):
api_cls = ovn.OvsdbNbOvnIdl
class OvsApiFixture(fixtures.ApiImplFixture):
api_cls = impl_idl.OvsdbIdl

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import functools
from unittest import mock
from oslo_config import cfg
@ -88,6 +89,8 @@ class TestNBOVNBGPDriver(test_base.TestCase):
self.mock_ovs_idl.get_own_chassis_id.return_value = 'chassis-id'
self.mock_ovs_idl.get_ovn_remote.return_value = (
self.conf_ovsdb_connection)
nb_idl = self.mock_nbdb.return_value.start.return_value
nb_idl.get_distributed_flag.return_value = True
self.nb_bgp_driver.start()
@ -1806,3 +1809,77 @@ class TestNBOVNBGPDriver(test_base.TestCase):
self.nb_bgp_driver.withdraw_ovn_pf_lb_fip(lb)
mock_withdraw_provider_port.assert_not_called()
def set_distributed(distributed):
def outer(f):
@functools.wraps(f)
def inner(self, *args, **kwargs):
self.exposer.distributed = distributed
return f(self, *args, **kwargs)
return inner
return outer
class NATExposerTestCase(test_base.TestCase):
def setUp(self):
super().setUp()
self.fake_agent = mock.Mock()
self.exposer = nb_ovn_bgp_driver.NATExposer(self.fake_agent)
self.lsp = (
self.fake_agent.nb_idl.lsp_get.return_value.execute.return_value)
def test_expose_uninitialized(self):
self.assertRaises(
RuntimeError, self.exposer.expose_fip_from_nat, mock.ANY)
def test_withdraw_uninitialized(self):
self.assertRaises(
RuntimeError, self.exposer.withdraw_fip_from_nat, mock.ANY)
def test_expose_distributed(self):
with mock.patch.object(self.exposer, '_expose_nat_distributed') as c:
self.exposer.distributed = True
self.exposer.expose_fip_from_nat(mock.ANY)
self.assertTrue(c.called)
def test_withdraw_distributed(self):
with mock.patch.object(self.exposer, '_withdraw_nat_distributed') as c:
self.exposer.distributed = True
self.exposer.withdraw_fip_from_nat(mock.ANY)
self.assertTrue(c.called)
def test_expose_centralized(self):
with mock.patch.object(self.exposer, '_expose_nat_centralized') as c:
self.exposer.distributed = False
self.exposer.expose_fip_from_nat(mock.ANY)
self.assertTrue(c.called)
def test_withdraw_centralized(self):
with mock.patch.object(self.exposer, '_withdraw_nat_centralized') as c:
self.exposer.distributed = False
self.exposer.withdraw_fip_from_nat(mock.ANY)
self.assertTrue(c.called)
@set_distributed(False)
def test__expose_nat_centralized(self):
gw_port = utils.create_row(mac='mac')
nat = utils.create_row(
logical_port=['lp'],
external_ip='ext-ip',
external_ids={constants.OVN_FIP_NET_EXT_ID_KEY: 'foo'},
gateway_port=[gw_port])
self.exposer.expose_fip_from_nat(nat)
self.fake_agent.expose_fip.assert_called_once_with(
'ext-ip', 'mac', 'neutron-foo', self.lsp)
@set_distributed(False)
def test__withdraw_nat_centralized(self):
nat = utils.create_row(external_ip='ext-ip', logical_port=['lp'])
self.exposer.withdraw_fip_from_nat(nat)
self.fake_agent.withdraw_fip.assert_called_once_with(
'ext-ip', self.lsp)

View File

@ -0,0 +1,51 @@
# Copyright 2024 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ovn_bgp_agent import constants
from ovn_bgp_agent.drivers.openstack import nb_exceptions
from ovn_bgp_agent.drivers.openstack.utils import nat as nat_utils
from ovn_bgp_agent.tests import base as test_base
from ovn_bgp_agent.tests import utils as test_utils
class TestGetGatewayLrp(test_base.TestCase):
def test_get(self):
port = 'foo'
nat = test_utils.create_row(gateway_port=[port])
observed = nat_utils.get_gateway_lrp(nat)
self.assertEqual(port, observed)
def test_no_gw_port(self):
nat = test_utils.create_row(gateway_port=[])
self.assertRaises(
nb_exceptions.NATNotFound, nat_utils.get_gateway_lrp, nat)
class TestGetChassisHostingCrlrp(test_base.TestCase):
def test_get_chassis(self):
chassis = 'foo'
port = test_utils.create_row(
status={constants.OVN_STATUS_CHASSIS: chassis})
nat = test_utils.create_row(gateway_port=[port])
observed = nat_utils.get_chassis_hosting_crlrp(nat)
self.assertEqual(chassis, observed)
def test_no_chasssis(self):
port = test_utils.create_row(
status={})
nat = test_utils.create_row(gateway_port=[port])
self.assertRaises(
nb_exceptions.ChassisNotFound,
nat_utils.get_chassis_hosting_crlrp, nat)