bgp: Introduce service plugin

The service plugin registers a worker to the neutron-periodic-workers
process. A new IDL specific for the BGP use case is introduced.

Related-Bug: #2111276
Assisted-By: claude-4-sonnet

Change-Id: I163dbbde776914a8cc9bc301399ab240f77d93fe
Signed-off-by: Jakub Libosvar <jlibosva@redhat.com>
This commit is contained in:
Jakub Libosvar
2025-08-27 21:26:27 +00:00
committed by Jakub Libosvar
parent a991dd1d6b
commit 7fe015c871
11 changed files with 499 additions and 0 deletions

View File

@@ -0,0 +1,46 @@
# Copyright 2025 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from neutron._i18n import _
bgp_opts = [
cfg.StrOpt('main_router_name',
default='bgp-lr-main',
help=_('Name of the main BGP router.')),
cfg.StrOpt('bgp_router_tunnel_key',
default='42',
help=_('Tunnel key for the main BGP router.')),
cfg.StrOpt('bgp_mac_base',
default='00:96',
help=_('Base MAC address for BGP router ports.')),
]
def get_main_router_name():
return cfg.CONF.bgp.main_router_name
def get_bgp_router_tunnel_key():
return cfg.CONF.bgp.bgp_router_tunnel_key
def get_bgp_mac_base():
return cfg.CONF.bgp.bgp_mac_base
def register_opts(conf):
conf.register_opts(bgp_opts, group='bgp')

View File

View File

@@ -0,0 +1,88 @@
# Copyright 2025 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from ovsdbapp.backend.ovs_idl import connection
from ovsdbapp.backend.ovs_idl import idlutils
from ovsdbapp import event
from ovsdbapp.schema.ovn_northbound import impl_idl as nb_impl_idl
from ovsdbapp.schema.ovn_southbound import impl_idl as sb_impl_idl
from neutron.plugins.ml2.drivers.ovn.mech_driver.ovsdb import ovsdb_monitor
LOG = log.getLogger(__name__)
OVN_NB_TABLES = (
'Logical_Switch', 'Logical_Switch_Port',
'Logical_Router', 'Logical_Router_Port',
'HA_Chassis_Group', 'HA_Chassis',
'Logical_Router_Static_Route', 'Logical_Router_Policy',
)
OVN_SB_TABLES = ('Chassis', 'Chassis_Private')
class OvnIdl(connection.OvsdbIdl):
LEADER_ONLY = False
def __init__(self, connection_string):
if connection_string.startswith("ssl"):
ovsdb_monitor._check_and_set_ssl_files(self.SCHEMA)
helper = idlutils.get_schema_helper(connection_string, self.SCHEMA)
for table in self.tables:
helper.register_table(table)
self.notify_handler = event.RowEventHandler()
super().__init__(
connection_string, helper, leader_only=self.LEADER_ONLY)
def notify(self, event, row, updates=None):
self.notify_handler.notify(event, row, updates)
def start(self, timeout):
conn = connection.Connection(self, timeout=timeout)
return self.api_cls(conn)
class BgpOvnNbIdl(nb_impl_idl.OvnNbApiIdlImpl):
LOCK_NAME = 'bgp_topology_lock'
def set_lock(self):
LOG.debug("Setting lock for BGP topology")
self.ovsdb_connection.idl.set_lock(self.LOCK_NAME)
self.ovsdb_connection.txns.put(None)
@property
def has_lock(self):
return not self.ovsdb_connection.idl.is_lock_contended
def register_events(self, events):
self.ovsdb_connection.idl.notify_handler.watch_events(events)
class BgpOvnSbIdl(sb_impl_idl.OvnSbApiIdlImpl):
def register_events(self, events):
self.ovsdb_connection.idl.notify_handler.watch_events(events)
class OvnNbIdl(OvnIdl):
LEADER_ONLY = True
SCHEMA = 'OVN_Northbound'
tables = OVN_NB_TABLES
api_cls = BgpOvnNbIdl
class OvnSbIdl(OvnIdl):
SCHEMA = 'OVN_Southbound'
tables = OVN_SB_TABLES
api_cls = BgpOvnSbIdl

View File

@@ -0,0 +1,45 @@
# Copyright 2025 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.callbacks import registry
from neutron_lib.services import base as service_base
from oslo_config import cfg
from oslo_log import log
from neutron.conf.services import bgp as bgp_config
from neutron.services.bgp import worker
LOG = log.getLogger(__name__)
@registry.has_registry_receivers
class BGPServicePlugin(service_base.ServicePluginBase):
supported_extension_aliases = []
def __init__(self):
LOG.info("Starting BGP Service Plugin")
super().__init__()
bgp_config.register_opts(cfg.CONF)
def get_workers(self):
return [worker.BGPWorker()]
def get_plugin_description(self):
return "BGP service plugin for OVN"
@classmethod
def get_plugin_type(cls):
return "bgp-service"

View File

@@ -0,0 +1,62 @@
# Copyright 2025 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
LOG = log.getLogger(__name__)
class BGPTopologyReconciler:
def __init__(self, nb_ovn, sb_ovn):
self.nb_ovn = nb_ovn
self.sb_ovn = sb_ovn
# We are doing full sync when the extension is started so we don't
# need to process all events when IDLs connect.
self.register_events()
def register_events(self):
self.nb_ovn.register_events(self.nb_events)
self.sb_ovn.register_events(self.sb_events)
@property
def resource_map(self):
return {
}
@property
def nb_events(self):
return [
]
@property
def sb_events(self):
return [
]
def full_sync(self):
if not self.nb_ovn.ovsdb_connection.idl.is_lock_contended:
LOG.info("Full BGP topology synchronization started")
# TODO(jlibosva): Implement full sync
LOG.info(
"Full BGP topology synchronization completed successfully")
else:
LOG.info("Full BGP topology synchronization already in progress")
def reconcile(self, resource, uuid):
try:
self.resource_map[resource](uuid)
except KeyError:
LOG.error("Resource %s not found in reconciler resource map",
resource)

View File

@@ -0,0 +1,47 @@
# Copyright 2025 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron.services.bgp import ovn
from neutron.services.bgp import reconciler
from neutron import worker
class BGPWorker(worker.NeutronBaseWorker):
def __init__(self):
super().__init__(worker_process_count=0)
def start(self):
super().start(desc="bgp worker")
self.nb_api = ovn.OvnNbIdl(ovn_conf.get_ovn_nb_connection()).start(
timeout=ovn_conf.get_ovn_ovsdb_timeout())
self.nb_api.set_lock()
self.sb_api = ovn.OvnSbIdl(ovn_conf.get_ovn_sb_connection()).start(
timeout=ovn_conf.get_ovn_ovsdb_timeout())
self._reconciler = reconciler.BGPTopologyReconciler(
self.nb_api,
self.sb_api)
def wait(self):
self._reconciler.full_sync()
def stop(self):
pass
def reset(self):
self.nb_api.restart_connection()
self.sb_api.restart_connection()

View File

@@ -0,0 +1,166 @@
# Copyright 2025 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from neutron.common import utils as common_utils
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron.services.bgp import ovn as bgp_ovn
from neutron.tests.functional import base
class OvnNbIdlWithUniqueConnection(bgp_ovn.BgpOvnNbIdl):
# because the ovsdb connection is a class attribute, we cannot have
# two connections in the same process
@property
def ovsdb_connection(self):
return self._ovsdb_connection
@ovsdb_connection.setter
def ovsdb_connection(self, value):
self._ovsdb_connection = value
class OvnNbIdlTest(bgp_ovn.OvnNbIdl):
api_cls = OvnNbIdlWithUniqueConnection
class TestOvnNbIdl(base.TestOVNFunctionalBase):
"""Test OvnNbIdl read-write operations."""
def setUp(self):
super().setUp()
self.nb_connection = ovn_conf.get_ovn_nb_connection()
self.nb_idl = bgp_ovn.OvnNbIdl(self.nb_connection)
self.addCleanup(self._cleanup)
self.nb_bgp_api = self.nb_idl.start(timeout=10)
def _cleanup(self):
if hasattr(self, 'nb_bgp_api') and self.nb_bgp_api:
try:
self.nb_bgp_api.ovsdb_connection.stop(timeout=5)
self.nb_bgp_api.__class__._ovsdb_connection = None
except Exception:
pass
def test_read_write_operations(self):
"""Test NB IDL read and write operations."""
# Test write: create logical switch
ls_name = f"test_ls_{uuidutils.generate_uuid()}"
ls = self.nb_bgp_api.ls_add(ls_name).execute(check_error=True)
self.assertEqual(ls.name, ls_name)
# Test read: list logical switches
ls_list = self.nb_bgp_api.ls_list().execute(check_error=True)
ls_names = [switch.name for switch in ls_list]
self.assertIn(ls_name, ls_names)
def test_leader_only_is_true(self):
self.assertTrue(self.nb_idl.leader_only)
class TestOvnSbIdl(base.TestOVNFunctionalBase):
"""Test OvnSbIdl read-write operations."""
def setUp(self):
super().setUp()
self.sb_connection = ovn_conf.get_ovn_sb_connection()
# Register Encap table too so we can create a chassis
self.addCleanup(self._cleanup)
bgp_ovn.OvnSbIdl.tables = ('Chassis', 'Encap')
self.sb_idl = bgp_ovn.OvnSbIdl(self.sb_connection)
self.sb_bgp_api = self.sb_idl.start(timeout=10)
def _cleanup(self):
if hasattr(self, 'sb_bgp_api') and self.sb_bgp_api:
try:
self.sb_bgp_api.ovsdb_connection.stop(timeout=5)
self.sb_bgp_api.__class__._ovsdb_connection = None
except Exception:
pass
bgp_ovn.OvnSbIdl.tables = bgp_ovn.OVN_SB_TABLES
def test_read_write_operations(self):
"""Test SB IDL read and write operations."""
# Test write: add chassis
chassis_name = f"test_chassis_{uuidutils.generate_uuid()}"
hostname = f"{chassis_name}.example.com"
self.sb_bgp_api.chassis_add(
chassis_name, ['geneve'], '192.168.1.100',
hostname=hostname).execute(check_error=True)
# Test read: list chassis
chassis_list = self.sb_bgp_api.chassis_list().execute(check_error=True)
self.assertIn(chassis_name, [chassis.name for chassis in chassis_list])
self.assertEqual(len(chassis_list), 1)
def test_leader_only_is_false(self):
self.assertFalse(self.sb_idl.leader_only)
class TestBgpOvnLocking(base.TestOVNFunctionalBase):
"""Test BGP OVN locking mechanism."""
def setUp(self):
super().setUp()
self.nb_connection = ovn_conf.get_ovn_nb_connection()
self.addCleanup(self._cleanup)
# Create two IDL instances to test locking
self.nb_idl1 = OvnNbIdlTest(self.nb_connection)
self.nb_bgp_api1 = self.nb_idl1.start(timeout=10)
self.nb_idl2 = OvnNbIdlTest(self.nb_connection)
self.nb_bgp_api2 = self.nb_idl2.start(timeout=10)
def _cleanup(self):
for api in [getattr(self, 'nb_bgp_api1', None),
getattr(self, 'nb_bgp_api2', None)]:
if api:
try:
api.ovsdb_connection.stop(timeout=5)
except Exception:
pass
def test_locking_mechanism(self):
"""Test BGP topology locking mechanism."""
# No lock is active, both APIs should have access
self.assertTrue(self.nb_bgp_api1.has_lock)
self.assertTrue(self.nb_bgp_api2.has_lock)
# First API acquires the lock
self.nb_bgp_api1.set_lock()
self.nb_bgp_api2.set_lock()
# Second API should lose the lock
common_utils.wait_until_true(
lambda: not self.nb_bgp_api2.has_lock,
timeout=5,
exception=AssertionError("Second API did not lose the lock")
)
# First API should still have the lock
self.assertTrue(self.nb_bgp_api1.has_lock)
# Disconnect first API and check that second API can acquire the lock
self.nb_bgp_api1.ovsdb_connection.stop(timeout=5)
common_utils.wait_until_true(
lambda: self.nb_bgp_api2.has_lock,
timeout=5,
exception=AssertionError("Second API did not acquire lock")
)

View File

@@ -0,0 +1,44 @@
# Copyright 2025 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ovs import stream
from unittest import mock
from neutron.conf.plugins.ml2.drivers.ovn import ovn_conf
from neutron.services.bgp import ovn
from neutron.tests import base
class OvnNbIdlTestCase(base.BaseTestCase):
def setUp(self):
super().setUp()
ovn_conf.register_opts()
ovn_conf.cfg.CONF.set_override(
'ovn_nb_private_key', 'nb-private-key', 'ovn')
ovn_conf.cfg.CONF.set_override(
'ovn_nb_certificate', 'nb-certificate', 'ovn')
ovn_conf.cfg.CONF.set_override(
'ovn_nb_ca_cert', 'nb-ca-cert', 'ovn')
mock.patch(
'ovsdbapp.backend.ovs_idl.idlutils.get_schema_helper').start()
mock.patch('ovs.db.idl.Idl.__init__').start()
def test_init_with_ssl(self):
"""Check the SSL is configured correctly"""
connection = "ssl:127.0.0.1:6640"
ovn.OvnNbIdl(connection)
self.assertEqual('nb-ca-cert', stream.Stream._SSL_ca_cert_file)
self.assertEqual('nb-certificate', stream.Stream._SSL_certificate_file)
self.assertEqual('nb-private-key', stream.Stream._SSL_private_key_file)

View File

@@ -101,6 +101,7 @@ conntrack_helper = "neutron.services.conntrack_helper.plugin:Plugin"
ovn-router = "neutron.services.ovn_l3.plugin:OVNL3RouterPlugin"
local_ip = "neutron.services.local_ip.local_ip_plugin:LocalIPPlugin"
ndp_proxy = "neutron.services.ndp_proxy.plugin:NDPProxyPlugin"
ovn-bgp = "neutron.services.bgp.plugin:BGPServicePlugin"
[project.entry-points."neutron.ml2.type_drivers"]
flat = "neutron.plugins.ml2.drivers.type_flat:FlatTypeDriver"