neutron-dynamic-routing/neutron_dynamic_routing/services/bgp/agent/l3/bgp_extension.py

253 lines
12 KiB
Python

# Copyright 2021 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
from neutron.api.rpc.callbacks.consumer import registry
from neutron.api.rpc.callbacks import events as rpc_events
from neutron.api.rpc.callbacks import resources
from neutron.api.rpc.handlers import resources_rpc
from neutron_lib.agent import l3_extension
from neutron_lib import rpc as n_rpc
from oslo_concurrency import lockutils
from oslo_log import log as logging
from neutron_dynamic_routing.objects import bgp_associations as assoc_objects
from neutron_dynamic_routing.privileged import driver as priv_driver
from neutron_dynamic_routing.services.bgp.agent.l3 import bgp_rpc_api
from neutron_dynamic_routing.services.bgp.common import constants as bgp_consts
LOG = logging.getLogger(__name__)
BGP_ROUTER_ASSOC = assoc_objects.BgpSpeakerRouterAssociation.obj_name()
BGP_PEER_ASSOC = assoc_objects.BgpSpeakerPeerAssociation.obj_name()
class BGPSpeakerRoutesCache():
"""Class for saving BGP speaker and the information about routes that have
to be advertised by it.
Version history:
1.0 - Initial version for caching multiple BGP speaker and advertised
routes information.
"""
def __init__(self):
self.speaker_routes = collections.defaultdict(set)
"""
self.speaker_routes = {
speaker_id_1: set(route_1, route_2),
speaker_id_2: set(route_3)
}
"""
@lockutils.synchronized('speaker-routes-cache')
def create_speaker_routes(self, bgp_speaker_id, routes):
for route in routes:
self.speaker_routes[bgp_speaker_id].add(route)
@lockutils.synchronized('speaker-routes-cache')
def update_speaker_routes(self, bgp_speaker_id, routes):
self.speaker_routes[bgp_speaker_id].update(routes)
@lockutils.synchronized('speaker-routes-cache')
def remove_speaker_routes(self, bgp_speaker_id, routes):
for route in routes:
self.speaker_routes[bgp_speaker_id].discard(route)
@lockutils.synchronized('speaker-routes-cache')
def get_speaker_routes(self, bgp_speaker_id):
return self.speaker_routes[bgp_speaker_id]
class BgpAgentExtension(l3_extension.L3AgentExtension):
SUPPORTED_RESOURCE_TYPES = [BGP_ROUTER_ASSOC, BGP_PEER_ASSOC]
def initialize(self, connection, driver_type):
LOG.debug("Initializing BgpL3AgentExtension")
resources.register_resource_class(
assoc_objects.BgpSpeakerRouterAssociation)
resources.register_resource_class(
assoc_objects.BgpSpeakerPeerAssociation)
self._register_rpc_consumers()
self.rpc_plugin = bgp_rpc_api.BgpL3ExtPluginApi(
bgp_consts.BGP_PLUGIN)
self.routes_cache = BGPSpeakerRoutesCache()
LOG.debug("Initialized BgpaasAgentExtension")
def _register_rpc_consumers(self):
LOG.debug("Reg RPC consumers BgpaasAgentExtension")
registry.register(self._handle_router_notification, BGP_ROUTER_ASSOC)
registry.register(self._handle_peer_notification, BGP_PEER_ASSOC)
self._register_router_consumers()
self._register_peer_consumers()
LOG.debug("Registered RPC consumers BgpaasAgentExtension")
def _register_router_consumers(self):
self._connection = n_rpc.Connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
topic = resources_rpc.resource_type_versioned_topic(BGP_ROUTER_ASSOC)
self._connection.create_consumer(topic, endpoints, fanout=True)
self._connection.consume_in_threads()
def _register_peer_consumers(self):
self._connection_peer = n_rpc.Connection()
endpoints = [resources_rpc.ResourcesPushRpcCallback()]
topic_peer = resources_rpc.resource_type_versioned_topic(
BGP_PEER_ASSOC)
self._connection_peer.create_consumer(topic_peer, endpoints,
fanout=True)
self._connection_peer.consume_in_threads()
def consume_api(self, agent_api):
self.agent_api = agent_api
def _handle_router_notification(self, context, resource_type,
router_assocs, event_type):
LOG.debug('Received router associated push event %s for resource %s,'
'with router associations %s', event_type, resource_type,
router_assocs)
for assoc in router_assocs:
router_info = self.agent_api.get_router_info(assoc.router_id)
sp = self.rpc_plugin.get_bgp_speaker_info(context,
assoc.bgp_speaker_id)
if event_type == rpc_events.CREATED:
priv_driver.add_bgp_speaker(assoc.bgp_speaker_id,
sp['local_as'], '127.0.0.1',
router_info.ns_name,
sp['ip_version'])
routes = self.rpc_plugin.get_routes_to_advertise(
context, assoc.bgp_speaker_id, assoc.router_id)
self.routes_cache.create_speaker_routes(assoc.bgp_speaker_id,
routes)
self.rpc_plugin.update_speaker_router_association_status(
context, assoc.id, bgp_consts.STATUS_ACTIVE)
elif event_type == rpc_events.UPDATED:
if assoc.advertise_extra_routes:
routes = self.rpc_plugin.get_routes_to_advertise(
context, assoc.bgp_speaker_id, assoc.router_id)
priv_driver.advertise_routes(tuple(routes))
self.routes_cache.update_speaker_routes(
assoc.bgp_speaker_id, routes)
else:
routes = self.rpc_plugin.get_routes_to_withdraw(
context, assoc.bgp_speaker_id, assoc.router_id)
priv_driver.withdraw_routes(tuple(routes))
self.routes_cache.remove_speaker_routes(
assoc.bgp_speaker_id, routes)
elif event_type == rpc_events.DELETED:
priv_driver.del_bgp_speaker(assoc.bgp_speaker_id,
router_info.ns_name)
def _handle_peer_notification(self, context, resource_type,
peer_assocs, event_type):
LOG.debug('Received peer associated push event %s for resource %s,'
'with peer associations %s', event_type, resource_type,
peer_assocs)
for peer_assoc in peer_assocs:
sp = self.rpc_plugin.get_bgp_speaker_info(
context,
peer_assoc.bgp_speaker_id)
peer = self.rpc_plugin.get_bgp_peer_info(context,
peer_assoc.peer_id)
for router_assoc in sp['router_associations']:
router_id = router_assoc['router_id']
router_info = self.agent_api.get_router_info(router_id)
if event_type == rpc_events.CREATED:
priv_driver.add_bgp_neighbor(peer_assoc.bgp_speaker_id,
peer['peer_ip'],
peer['remote_as'],
router_info.ns_name)
priv_driver.advertise_routes(
tuple(self.routes_cache.get_speaker_routes(
peer_assoc.bgp_speaker_id)))
self.rpc_plugin.update_speaker_peer_association_status(
context, peer_assoc.id, bgp_consts.STATUS_ACTIVE)
elif event_type == rpc_events.DELETED:
priv_driver.del_bgp_neighbor(peer_assoc.bgp_speaker_id,
peer['peer_ip'],
router_info.ns_name)
def add_router(self, context, data):
"""Handle a router add event.
Called on router create.
:param context: RPC context.
:param data: Router data.
"""
LOG.debug("ADD ROUTER %s", data)
router_info = self.agent_api.get_router_info(data['id'])
if router_info:
router_assocs = self.rpc_plugin.get_speaker_associated_to_router(
context, data['id'])
for router_assoc in router_assocs:
bgp_speaker_id = router_assoc['bgp_speaker_id']
sp = self.rpc_plugin.get_bgp_speaker_info(context,
bgp_speaker_id)
if priv_driver.PROCESS_CACHE.get_bgp_speaker_process(
bgp_speaker_id):
LOG.debug("BGP process already running for speaker %s",
bgp_speaker_id)
continue
priv_driver.add_bgp_speaker(bgp_speaker_id,
sp['local_as'], '1.1.1.1',
router_info.ns_name,
sp['ip_version'])
LOG.debug("ADD ROUTER PEER ASSOC %s", sp['peer_associations'])
for peer_assoc in sp['peer_associations']:
peer = self.rpc_plugin.get_bgp_peer_info(
context, peer_assoc['peer_id'])
LOG.debug("ADD ROUTER PEER %s", peer)
priv_driver.add_bgp_neighbor(peer_assoc['bgp_speaker_id'],
peer['peer_ip'],
peer['remote_as'],
router_info.ns_name)
def update_router(self, context, data):
"""Handle a router update event.
Called on router update.
:param context: RPC context.
:param data: Router data.
"""
router_info = self.agent_api.get_router_info(data['id'])
if router_info:
router_assocs = self.rpc_plugin.get_speaker_associated_to_router(
context, data['id'])
for router_assoc in router_assocs:
bgp_speaker_id = router_assoc['bgp_speaker_id']
new_routes = set(self.rpc_plugin.get_routes_to_advertise(
context, bgp_speaker_id, router_assoc['router_id']))
old_routes = self.routes_cache.get_speaker_routes(
bgp_speaker_id)
routes_to_advertise = new_routes - old_routes
routes_to_withdraw = old_routes - new_routes
priv_driver.advertise_routes(tuple(routes_to_advertise))
self.routes_cache.update_speaker_routes(bgp_speaker_id,
routes_to_advertise)
priv_driver.withdraw_routes(tuple(routes_to_withdraw))
self.routes_cache.remove_speaker_routes(bgp_speaker_id,
routes_to_withdraw)
def delete_router(self, context, data):
"""Handle a router delete event.
:param context: RPC context.
:param data: Router data.
"""
pass
def ha_state_change(self, context, data):
pass