266246fd84
The neutron.common.rpc and exceptions were rehomed into neutron-lib and are currently shimmed in neutron [1] This patch consumes those modules from neutron-lib by using lib's modules rather than neutrons. [1] https://review.openstack.org/#/c/586525/ Change-Id: Ic95e9ab17fa6ba3b12cea7837322f113d0084aeb
713 lines
31 KiB
Python
713 lines
31 KiB
Python
# Copyright 2016 Huawei Technologies India Pvt. Ltd.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import collections
|
|
|
|
from neutron_lib.agent import constants as agent_consts
|
|
from neutron_lib.agent import topics
|
|
from neutron_lib import context
|
|
from neutron_lib import rpc as n_rpc
|
|
from neutron_lib.utils import runtime
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
import oslo_messaging
|
|
from oslo_service import loopingcall
|
|
from oslo_service import periodic_task
|
|
from oslo_utils import importutils
|
|
|
|
from neutron.agent import rpc as agent_rpc
|
|
from neutron.common import utils
|
|
from neutron import manager
|
|
|
|
from neutron_dynamic_routing.extensions import bgp as bgp_ext
|
|
from neutron_dynamic_routing._i18n import _, _LE, _LI, _LW
|
|
from neutron_dynamic_routing.services.bgp.agent.driver import exceptions as driver_exc # noqa
|
|
from neutron_dynamic_routing.services.bgp.common import constants as bgp_consts # noqa
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class BgpDrAgent(manager.Manager):
|
|
"""BGP Dynamic Routing agent service manager.
|
|
|
|
Note that the public methods of this class are exposed as the server side
|
|
of an rpc interface. The neutron server uses
|
|
api.rpc.agentnotifiers.bgp_dr_rpc_agent_api.BgpDrAgentNotifyApi as the
|
|
client side to execute the methods here. For more information about
|
|
changing rpc interfaces, see
|
|
https://docs.openstack.org/neutron/latest/contributor/internals/rpc_api.html.
|
|
|
|
API version history:
|
|
1.0 initial Version
|
|
"""
|
|
target = oslo_messaging.Target(version='1.0')
|
|
|
|
def __init__(self, host, conf=None):
|
|
super(BgpDrAgent, self).__init__()
|
|
self.initialize_driver(conf)
|
|
self.needs_resync_reasons = collections.defaultdict(list)
|
|
self.needs_full_sync_reason = None
|
|
|
|
self.cache = BgpSpeakerCache()
|
|
self.context = context.get_admin_context_without_session()
|
|
self.plugin_rpc = BgpDrPluginApi(bgp_consts.BGP_PLUGIN,
|
|
self.context, host)
|
|
|
|
def initialize_driver(self, conf):
|
|
self.conf = conf or cfg.CONF.BGP
|
|
try:
|
|
self.dr_driver_cls = (
|
|
importutils.import_object(self.conf.bgp_speaker_driver,
|
|
self.conf))
|
|
except ImportError:
|
|
LOG.exception(_LE("Error while importing BGP speaker driver %s"),
|
|
self.conf.bgp_speaker_driver)
|
|
raise SystemExit(1)
|
|
|
|
def _handle_driver_failure(self, bgp_speaker_id, method, driver_exec):
|
|
self.schedule_resync(reason=driver_exec,
|
|
speaker_id=bgp_speaker_id)
|
|
LOG.error(_LE('Call to driver for BGP Speaker %(bgp_speaker)s '
|
|
'%(method)s has failed with exception '
|
|
'%(driver_exec)s.'),
|
|
{'bgp_speaker': bgp_speaker_id,
|
|
'method': method,
|
|
'driver_exec': driver_exec})
|
|
|
|
def after_start(self):
|
|
self.run()
|
|
LOG.info(_LI("BGP Dynamic Routing agent started"))
|
|
|
|
def run(self):
|
|
"""Activate BGP Dynamic Routing agent."""
|
|
self.sync_state(self.context)
|
|
self.periodic_resync(self.context)
|
|
|
|
@runtime.synchronized('bgp-dragent')
|
|
def sync_state(self, context, full_sync=None, bgp_speakers=None):
|
|
try:
|
|
hosted_bgp_speakers = self.plugin_rpc.get_bgp_speakers(context)
|
|
hosted_bgp_speaker_ids = [bgp_speaker['id']
|
|
for bgp_speaker in hosted_bgp_speakers]
|
|
cached_bgp_speakers = self.cache.get_bgp_speaker_ids()
|
|
for bgp_speaker_id in cached_bgp_speakers:
|
|
if bgp_speaker_id not in hosted_bgp_speaker_ids:
|
|
self.remove_bgp_speaker_from_dragent(bgp_speaker_id)
|
|
|
|
resync_all = not bgp_speakers or full_sync
|
|
only_bs = set() if resync_all else set(bgp_speakers)
|
|
for hosted_bgp_speaker in hosted_bgp_speakers:
|
|
hosted_bs_id = hosted_bgp_speaker['id']
|
|
if resync_all or hosted_bs_id in only_bs:
|
|
if not self.cache.is_bgp_speaker_added(hosted_bs_id):
|
|
self.safe_configure_dragent_for_bgp_speaker(
|
|
hosted_bgp_speaker)
|
|
continue
|
|
self.sync_bgp_speaker(hosted_bgp_speaker)
|
|
resync_reason = "Periodic route cache refresh"
|
|
self.schedule_resync(speaker_id=hosted_bs_id,
|
|
reason=resync_reason)
|
|
except Exception as e:
|
|
self.schedule_full_resync(reason=e)
|
|
LOG.error(_LE('Unable to sync BGP speaker state.'))
|
|
|
|
def sync_bgp_speaker(self, bgp_speaker):
|
|
# sync BGP Speakers
|
|
bgp_peer_ips = set(
|
|
[bgp_peer['peer_ip'] for bgp_peer in bgp_speaker['peers']])
|
|
cached_bgp_peer_ips = set(
|
|
self.cache.get_bgp_peer_ips(bgp_speaker['id']))
|
|
removed_bgp_peer_ips = cached_bgp_peer_ips - bgp_peer_ips
|
|
|
|
for bgp_peer_ip in removed_bgp_peer_ips:
|
|
self.remove_bgp_peer_from_bgp_speaker(bgp_speaker['id'],
|
|
bgp_peer_ip)
|
|
if bgp_peer_ips:
|
|
self.add_bgp_peers_to_bgp_speaker(bgp_speaker)
|
|
|
|
# sync advertise routes
|
|
cached_adv_routes = self.cache.get_adv_routes(bgp_speaker['id'])
|
|
adv_routes = bgp_speaker['advertised_routes']
|
|
if cached_adv_routes == adv_routes:
|
|
return
|
|
|
|
for cached_route in cached_adv_routes:
|
|
if cached_route not in adv_routes:
|
|
self.withdraw_route_via_bgp_speaker(bgp_speaker['id'],
|
|
bgp_speaker['local_as'],
|
|
cached_route)
|
|
|
|
self.advertise_routes_via_bgp_speaker(bgp_speaker)
|
|
|
|
@utils.exception_logger()
|
|
def _periodic_resync_helper(self, context):
|
|
"""Resync the BgpDrAgent state at the configured interval."""
|
|
if self.needs_resync_reasons or self.needs_full_sync_reason:
|
|
full_sync = self.needs_full_sync_reason
|
|
reasons = self.needs_resync_reasons
|
|
# Reset old reasons
|
|
self.needs_full_sync_reason = None
|
|
self.needs_resync_reasons = collections.defaultdict(list)
|
|
if full_sync:
|
|
LOG.debug("resync all: %(reason)s", {"reason": full_sync})
|
|
for bgp_speaker, reason in reasons.items():
|
|
LOG.debug("resync (%(bgp_speaker)s): %(reason)s",
|
|
{"reason": reason, "bgp_speaker": bgp_speaker})
|
|
self.sync_state(
|
|
context, full_sync=full_sync, bgp_speakers=reasons.keys())
|
|
|
|
# NOTE: spacing is set 1 sec. The actual interval is controlled
|
|
# by neutron/service.py which defaults to CONF.periodic_interval
|
|
@periodic_task.periodic_task(spacing=1)
|
|
def periodic_resync(self, context):
|
|
LOG.debug("Started periodic resync.")
|
|
self._periodic_resync_helper(context)
|
|
|
|
@runtime.synchronized('bgp-dr-agent')
|
|
def bgp_speaker_create_end(self, context, payload):
|
|
"""Handle bgp_speaker_create_end notification event."""
|
|
bgp_speaker_id = payload['bgp_speaker']['id']
|
|
LOG.debug('Received BGP speaker create notification for '
|
|
'speaker_id=%(speaker_id)s from the neutron server.',
|
|
{'speaker_id': bgp_speaker_id})
|
|
self.add_bgp_speaker_helper(bgp_speaker_id)
|
|
|
|
@runtime.synchronized('bgp-dr-agent')
|
|
def bgp_speaker_remove_end(self, context, payload):
|
|
"""Handle bgp_speaker_remove_end notification event."""
|
|
|
|
bgp_speaker_id = payload['bgp_speaker']['id']
|
|
LOG.debug('Received BGP speaker remove notification for '
|
|
'speaker_id=%(speaker_id)s from the neutron server.',
|
|
{'speaker_id': bgp_speaker_id})
|
|
self.remove_bgp_speaker_from_dragent(bgp_speaker_id)
|
|
|
|
@runtime.synchronized('bgp-dr-agent')
|
|
def bgp_peer_association_end(self, context, payload):
|
|
"""Handle bgp_peer_association_end notification event."""
|
|
|
|
bgp_peer_id = payload['bgp_peer']['peer_id']
|
|
bgp_speaker_id = payload['bgp_peer']['speaker_id']
|
|
LOG.debug('Received BGP peer associate notification for '
|
|
'speaker_id=%(speaker_id)s peer_id=%(peer_id)s '
|
|
'from the neutron server.',
|
|
{'speaker_id': bgp_speaker_id,
|
|
'peer_id': bgp_peer_id})
|
|
self.add_bgp_peer_helper(bgp_speaker_id, bgp_peer_id)
|
|
|
|
@runtime.synchronized('bgp-dr-agent')
|
|
def bgp_peer_disassociation_end(self, context, payload):
|
|
"""Handle bgp_peer_disassociation_end notification event."""
|
|
|
|
bgp_peer_ip = payload['bgp_peer']['peer_ip']
|
|
bgp_speaker_id = payload['bgp_peer']['speaker_id']
|
|
LOG.debug('Received BGP peer disassociate notification for '
|
|
'speaker_id=%(speaker_id)s peer_ip=%(peer_ip)s '
|
|
'from the neutron server.',
|
|
{'speaker_id': bgp_speaker_id,
|
|
'peer_ip': bgp_peer_ip})
|
|
self.remove_bgp_peer_from_bgp_speaker(bgp_speaker_id, bgp_peer_ip)
|
|
|
|
@runtime.synchronized('bgp-dr-agent')
|
|
def bgp_routes_advertisement_end(self, context, payload):
|
|
"""Handle bgp_routes_advertisement_end notification event."""
|
|
|
|
bgp_speaker_id = payload['advertise_routes']['speaker_id']
|
|
LOG.debug('Received routes advertisement end notification '
|
|
'for speaker_id=%(speaker_id)s from the neutron server.',
|
|
{'speaker_id': bgp_speaker_id})
|
|
routes = payload['advertise_routes']['routes']
|
|
self.add_routes_helper(bgp_speaker_id, routes)
|
|
|
|
@runtime.synchronized('bgp-dr-agent')
|
|
def bgp_routes_withdrawal_end(self, context, payload):
|
|
"""Handle bgp_routes_withdrawal_end notification event."""
|
|
|
|
bgp_speaker_id = payload['withdraw_routes']['speaker_id']
|
|
LOG.debug('Received route withdrawal notification for '
|
|
'speaker_id=%(speaker_id)s from the neutron server.',
|
|
{'speaker_id': bgp_speaker_id})
|
|
routes = payload['withdraw_routes']['routes']
|
|
self.withdraw_routes_helper(bgp_speaker_id, routes)
|
|
|
|
def add_bgp_speaker_helper(self, bgp_speaker_id):
|
|
"""Add BGP speaker."""
|
|
bgp_speaker = self.safe_get_bgp_speaker_info(bgp_speaker_id)
|
|
if bgp_speaker:
|
|
self.add_bgp_speaker_on_dragent(bgp_speaker)
|
|
|
|
def add_bgp_peer_helper(self, bgp_speaker_id, bgp_peer_id):
|
|
"""Add BGP peer."""
|
|
# Ideally BGP Speaker must be added by now, If not then let's
|
|
# re-sync.
|
|
if not self.cache.is_bgp_speaker_added(bgp_speaker_id):
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason="BGP Speaker Out-of-sync")
|
|
return
|
|
|
|
bgp_peer = self.safe_get_bgp_peer_info(bgp_speaker_id,
|
|
bgp_peer_id)
|
|
if bgp_peer:
|
|
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(
|
|
bgp_speaker_id)
|
|
self.add_bgp_peer_to_bgp_speaker(bgp_speaker_id,
|
|
bgp_speaker_as,
|
|
bgp_peer)
|
|
|
|
def add_routes_helper(self, bgp_speaker_id, routes):
|
|
"""Advertise routes to BGP speaker."""
|
|
# Ideally BGP Speaker must be added by now, If not then let's
|
|
# re-sync.
|
|
if not self.cache.is_bgp_speaker_added(bgp_speaker_id):
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason="BGP Speaker Out-of-sync")
|
|
return
|
|
|
|
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(bgp_speaker_id)
|
|
for route in routes:
|
|
self.advertise_route_via_bgp_speaker(bgp_speaker_id,
|
|
bgp_speaker_as,
|
|
route)
|
|
if self.is_resync_scheduled(bgp_speaker_id):
|
|
break
|
|
|
|
def withdraw_routes_helper(self, bgp_speaker_id, routes):
|
|
"""Withdraw routes advertised by BGP speaker."""
|
|
# Ideally BGP Speaker must be added by now, If not then let's
|
|
# re-sync.
|
|
if not self.cache.is_bgp_speaker_added(bgp_speaker_id):
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason="BGP Speaker Out-of-sync")
|
|
return
|
|
|
|
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(bgp_speaker_id)
|
|
for route in routes:
|
|
self.withdraw_route_via_bgp_speaker(bgp_speaker_id,
|
|
bgp_speaker_as,
|
|
route)
|
|
if self.is_resync_scheduled(bgp_speaker_id):
|
|
break
|
|
|
|
def safe_get_bgp_speaker_info(self, bgp_speaker_id):
|
|
try:
|
|
bgp_speaker = self.plugin_rpc.get_bgp_speaker_info(self.context,
|
|
bgp_speaker_id)
|
|
if not bgp_speaker:
|
|
LOG.warning(_LW('BGP Speaker %s has been deleted.'),
|
|
bgp_speaker_id)
|
|
return bgp_speaker
|
|
except Exception as e:
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason=e)
|
|
LOG.error(_LE('BGP Speaker %(bgp_speaker)s info call '
|
|
'failed with reason=%(e)s.'),
|
|
{'bgp_speaker': bgp_speaker_id, 'e': e})
|
|
|
|
def safe_get_bgp_peer_info(self, bgp_speaker_id, bgp_peer_id):
|
|
try:
|
|
bgp_peer = self.plugin_rpc.get_bgp_peer_info(self.context,
|
|
bgp_peer_id)
|
|
if not bgp_peer:
|
|
LOG.warning(_LW('BGP Peer %s has been deleted.'), bgp_peer)
|
|
return bgp_peer
|
|
except Exception as e:
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason=e)
|
|
LOG.error(_LE('BGP peer %(bgp_peer)s info call '
|
|
'failed with reason=%(e)s.'),
|
|
{'bgp_peer': bgp_peer_id, 'e': e})
|
|
|
|
@utils.exception_logger()
|
|
def safe_configure_dragent_for_bgp_speaker(self, bgp_speaker):
|
|
try:
|
|
self.add_bgp_speaker_on_dragent(bgp_speaker)
|
|
except (bgp_ext.BgpSpeakerNotFound, RuntimeError):
|
|
LOG.warning(_LW('BGP speaker %s may have been deleted and its '
|
|
'resources may have already been disposed.'),
|
|
bgp_speaker['id'])
|
|
|
|
def add_bgp_speaker_on_dragent(self, bgp_speaker):
|
|
# Caching BGP speaker details in BGPSpeakerCache. Will be used
|
|
# during smooth.
|
|
self.cache.put_bgp_speaker(bgp_speaker)
|
|
|
|
LOG.debug('Calling driver for adding BGP speaker %(speaker_id)s,'
|
|
' speaking for local_as %(local_as)s',
|
|
{'speaker_id': bgp_speaker['id'],
|
|
'local_as': bgp_speaker['local_as']})
|
|
try:
|
|
self.dr_driver_cls.add_bgp_speaker(bgp_speaker['local_as'])
|
|
except driver_exc.BgpSpeakerAlreadyScheduled:
|
|
return
|
|
except Exception as e:
|
|
self._handle_driver_failure(bgp_speaker['id'],
|
|
'add_bgp_speaker', e)
|
|
|
|
# Add peer and route information to the driver.
|
|
self.add_bgp_peers_to_bgp_speaker(bgp_speaker)
|
|
self.advertise_routes_via_bgp_speaker(bgp_speaker)
|
|
self.schedule_resync(speaker_id=bgp_speaker['id'],
|
|
reason="Periodic route cache refresh")
|
|
|
|
def remove_bgp_speaker_from_dragent(self, bgp_speaker_id):
|
|
if self.cache.is_bgp_speaker_added(bgp_speaker_id):
|
|
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(
|
|
bgp_speaker_id)
|
|
self.cache.remove_bgp_speaker_by_id(bgp_speaker_id)
|
|
|
|
LOG.debug('Calling driver for removing BGP speaker %(speaker_as)s',
|
|
{'speaker_as': bgp_speaker_as})
|
|
try:
|
|
self.dr_driver_cls.delete_bgp_speaker(bgp_speaker_as)
|
|
except Exception as e:
|
|
self._handle_driver_failure(bgp_speaker_id,
|
|
'remove_bgp_speaker', e)
|
|
return
|
|
|
|
# Ideally, only the added speakers can be removed by the neutron
|
|
# server. Looks like there might be some synchronization
|
|
# issue between the server and the agent. Let's initiate a re-sync
|
|
# to resolve the issue.
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason="BGP Speaker Out-of-sync")
|
|
|
|
def add_bgp_peers_to_bgp_speaker(self, bgp_speaker):
|
|
for bgp_peer in bgp_speaker['peers']:
|
|
self.add_bgp_peer_to_bgp_speaker(bgp_speaker['id'],
|
|
bgp_speaker['local_as'],
|
|
bgp_peer)
|
|
if self.is_resync_scheduled(bgp_speaker['id']):
|
|
break
|
|
|
|
def add_bgp_peer_to_bgp_speaker(self, bgp_speaker_id,
|
|
bgp_speaker_as, bgp_peer):
|
|
if self.cache.get_bgp_peer_by_ip(bgp_speaker_id, bgp_peer['peer_ip']):
|
|
return
|
|
|
|
self.cache.put_bgp_peer(bgp_speaker_id, bgp_peer)
|
|
|
|
LOG.debug('Calling driver interface for adding BGP peer %(peer_ip)s '
|
|
'remote_as=%(remote_as)s to BGP Speaker running for '
|
|
'local_as=%(local_as)d '
|
|
'auth_type=%(auth_type)s',
|
|
{'peer_ip': bgp_peer['peer_ip'],
|
|
'remote_as': bgp_peer['remote_as'],
|
|
'local_as': bgp_speaker_as,
|
|
'auth_type': bgp_peer['auth_type']})
|
|
try:
|
|
self.dr_driver_cls.add_bgp_peer(bgp_speaker_as,
|
|
bgp_peer['peer_ip'],
|
|
bgp_peer['remote_as'],
|
|
bgp_peer['auth_type'],
|
|
bgp_peer['password'])
|
|
except Exception as e:
|
|
self._handle_driver_failure(bgp_speaker_id,
|
|
'add_bgp_peer', e)
|
|
|
|
def remove_bgp_peer_from_bgp_speaker(self, bgp_speaker_id, bgp_peer_ip):
|
|
# Ideally BGP Speaker must be added by now, If not then let's
|
|
# re-sync.
|
|
if not self.cache.is_bgp_speaker_added(bgp_speaker_id):
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason="BGP Speaker Out-of-sync")
|
|
return
|
|
|
|
if self.cache.is_bgp_peer_added(bgp_speaker_id, bgp_peer_ip):
|
|
self.cache.remove_bgp_peer_by_ip(bgp_speaker_id, bgp_peer_ip)
|
|
|
|
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(
|
|
bgp_speaker_id)
|
|
|
|
LOG.debug('Calling driver interface to remove BGP peer '
|
|
'%(peer_ip)s from BGP Speaker running for '
|
|
'local_as=%(local_as)d',
|
|
{'peer_ip': bgp_peer_ip, 'local_as': bgp_speaker_as})
|
|
try:
|
|
self.dr_driver_cls.delete_bgp_peer(bgp_speaker_as,
|
|
bgp_peer_ip)
|
|
except Exception as e:
|
|
self._handle_driver_failure(bgp_speaker_id,
|
|
'remove_bgp_peer', e)
|
|
return
|
|
|
|
# Ideally, only the added peers can be removed by the neutron
|
|
# server. Looks like there might be some synchronization
|
|
# issue between the server and the agent. Let's initiate a re-sync
|
|
# to resolve the issue.
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason="BGP Peer Out-of-sync")
|
|
|
|
def advertise_routes_via_bgp_speaker(self, bgp_speaker):
|
|
for route in bgp_speaker['advertised_routes']:
|
|
self.advertise_route_via_bgp_speaker(bgp_speaker['id'],
|
|
bgp_speaker['local_as'],
|
|
route)
|
|
if self.is_resync_scheduled(bgp_speaker['id']):
|
|
break
|
|
|
|
def advertise_route_via_bgp_speaker(self, bgp_speaker_id,
|
|
bgp_speaker_as, route):
|
|
if self.cache.is_route_advertised(bgp_speaker_id, route):
|
|
# Requested route already advertised. Hence, Nothing to be done.
|
|
return
|
|
self.cache.put_adv_route(bgp_speaker_id, route)
|
|
|
|
LOG.debug('Calling driver for advertising prefix: %(cidr)s, '
|
|
'next_hop: %(nexthop)s',
|
|
{'cidr': route['destination'],
|
|
'nexthop': route['next_hop']})
|
|
try:
|
|
self.dr_driver_cls.advertise_route(bgp_speaker_as,
|
|
route['destination'],
|
|
route['next_hop'])
|
|
except Exception as e:
|
|
self._handle_driver_failure(bgp_speaker_id,
|
|
'advertise_route', e)
|
|
|
|
def withdraw_route_via_bgp_speaker(self, bgp_speaker_id,
|
|
bgp_speaker_as, route):
|
|
if self.cache.is_route_advertised(bgp_speaker_id, route):
|
|
self.cache.remove_adv_route(bgp_speaker_id, route)
|
|
LOG.debug('Calling driver for withdrawing prefix: %(cidr)s, '
|
|
'next_hop: %(nexthop)s',
|
|
{'cidr': route['destination'],
|
|
'nexthop': route['next_hop']})
|
|
try:
|
|
self.dr_driver_cls.withdraw_route(bgp_speaker_as,
|
|
route['destination'],
|
|
route['next_hop'])
|
|
except Exception as e:
|
|
self._handle_driver_failure(bgp_speaker_id,
|
|
'withdraw_route', e)
|
|
return
|
|
|
|
# Ideally, only the advertised routes can be withdrawn by the
|
|
# neutron server. Looks like there might be some synchronization
|
|
# issue between the server and the agent. Let's initiate a re-sync
|
|
# to resolve the issue.
|
|
self.schedule_resync(speaker_id=bgp_speaker_id,
|
|
reason="Advertised routes Out-of-sync")
|
|
|
|
def schedule_full_resync(self, reason):
|
|
LOG.debug('Recording full resync request for all BGP Speakers '
|
|
'with reason=%s', reason)
|
|
self.needs_full_sync_reason = reason
|
|
|
|
def schedule_resync(self, reason, speaker_id):
|
|
"""Schedule a full resync for a given BGP Speaker.
|
|
If no BGP Speaker is specified, resync all BGP Speakers.
|
|
"""
|
|
LOG.debug('Recording resync request for BGP Speaker %s '
|
|
'with reason=%s', speaker_id, reason)
|
|
self.needs_resync_reasons[speaker_id].append(reason)
|
|
|
|
def is_resync_scheduled(self, bgp_speaker_id):
|
|
if bgp_speaker_id not in self.needs_resync_reasons:
|
|
return False
|
|
|
|
reason = self.needs_resync_reasons[bgp_speaker_id]
|
|
# Re-sync scheduled for the queried BGP speaker. No point
|
|
# continuing further. Let's stop processing and wait for
|
|
# re-sync to happen.
|
|
LOG.debug('Re-sync already scheduled for BGP Speaker %s '
|
|
'with reason=%s', bgp_speaker_id, reason)
|
|
return True
|
|
|
|
|
|
class BgpDrPluginApi(object):
|
|
"""Agent side of BgpDrAgent RPC API.
|
|
|
|
This class implements the client side of an rpc interface.
|
|
The server side of this interface can be found in
|
|
api.rpc.handlers.bgp_speaker_rpc.BgpSpeakerRpcCallback.
|
|
For more information about changing rpc interfaces, see
|
|
doc/source/devref/rpc_api.rst.
|
|
|
|
API version history:
|
|
1.0 - Initial version.
|
|
"""
|
|
def __init__(self, topic, context, host):
|
|
self.context = context
|
|
self.host = host
|
|
target = oslo_messaging.Target(topic=topic, version='1.0')
|
|
self.client = n_rpc.get_client(target)
|
|
|
|
def get_bgp_speakers(self, context):
|
|
"""Make a remote process call to retrieve all BGP speakers info."""
|
|
cctxt = self.client.prepare()
|
|
return cctxt.call(context, 'get_bgp_speakers', host=self.host)
|
|
|
|
def get_bgp_speaker_info(self, context, bgp_speaker_id):
|
|
"""Make a remote process call to retrieve a BGP speaker info."""
|
|
cctxt = self.client.prepare()
|
|
return cctxt.call(context, 'get_bgp_speaker_info',
|
|
bgp_speaker_id=bgp_speaker_id)
|
|
|
|
def get_bgp_peer_info(self, context, bgp_peer_id):
|
|
"""Make a remote process call to retrieve a BGP peer info."""
|
|
cctxt = self.client.prepare()
|
|
return cctxt.call(context, 'get_bgp_peer_info',
|
|
bgp_peer_id=bgp_peer_id)
|
|
|
|
|
|
class BgpSpeakerCache(object):
|
|
"""Agent cache of the current BGP speaker state.
|
|
|
|
This class is designed to support the advertisement for
|
|
multiple BGP speaker via a single driver interface.
|
|
|
|
Version history:
|
|
1.0 - Initial version for caching the state of BGP speaker.
|
|
"""
|
|
def __init__(self):
|
|
self.cache = {}
|
|
|
|
def get_bgp_speaker_ids(self):
|
|
return self.cache.keys()
|
|
|
|
def put_bgp_speaker(self, bgp_speaker):
|
|
if bgp_speaker['id'] in self.cache:
|
|
self.remove_bgp_speaker_by_id(self.cache[bgp_speaker['id']])
|
|
self.cache[bgp_speaker['id']] = {'bgp_speaker': bgp_speaker,
|
|
'peers': {},
|
|
'advertised_routes': []}
|
|
|
|
def get_bgp_speaker_by_id(self, bgp_speaker_id):
|
|
if bgp_speaker_id in self.cache:
|
|
return self.cache[bgp_speaker_id]['bgp_speaker']
|
|
|
|
def get_bgp_speaker_local_as(self, bgp_speaker_id):
|
|
bgp_speaker = self.get_bgp_speaker_by_id(bgp_speaker_id)
|
|
if bgp_speaker:
|
|
return bgp_speaker['local_as']
|
|
|
|
def is_bgp_speaker_added(self, bgp_speaker_id):
|
|
return self.get_bgp_speaker_by_id(bgp_speaker_id)
|
|
|
|
def remove_bgp_speaker_by_id(self, bgp_speaker_id):
|
|
if bgp_speaker_id in self.cache:
|
|
del self.cache[bgp_speaker_id]
|
|
|
|
def put_bgp_peer(self, bgp_speaker_id, bgp_peer):
|
|
if bgp_peer['peer_ip'] in self.get_bgp_peer_ips(bgp_speaker_id):
|
|
del self.cache[bgp_speaker_id]['peers'][bgp_peer['peer_ip']]
|
|
|
|
self.cache[bgp_speaker_id]['peers'][bgp_peer['peer_ip']] = bgp_peer
|
|
|
|
def is_bgp_peer_added(self, bgp_speaker_id, bgp_peer_ip):
|
|
return self.get_bgp_peer_by_ip(bgp_speaker_id, bgp_peer_ip)
|
|
|
|
def get_bgp_peer_ips(self, bgp_speaker_id):
|
|
bgp_speaker = self.get_bgp_speaker_by_id(bgp_speaker_id)
|
|
if bgp_speaker:
|
|
return self.cache[bgp_speaker_id]['peers'].keys()
|
|
|
|
def get_bgp_peer_by_ip(self, bgp_speaker_id, bgp_peer_ip):
|
|
bgp_speaker = self.get_bgp_speaker_by_id(bgp_speaker_id)
|
|
if bgp_speaker:
|
|
return self.cache[bgp_speaker_id]['peers'].get(bgp_peer_ip)
|
|
|
|
def remove_bgp_peer_by_ip(self, bgp_speaker_id, bgp_peer_ip):
|
|
if bgp_peer_ip in self.get_bgp_peer_ips(bgp_speaker_id):
|
|
del self.cache[bgp_speaker_id]['peers'][bgp_peer_ip]
|
|
|
|
def put_adv_route(self, bgp_speaker_id, route):
|
|
self.cache[bgp_speaker_id]['advertised_routes'].append(route)
|
|
|
|
def is_route_advertised(self, bgp_speaker_id, route):
|
|
routes = self.cache[bgp_speaker_id]['advertised_routes']
|
|
for r in routes:
|
|
if r['destination'] == route['destination'] and (
|
|
route['next_hop'] is None or
|
|
r['next_hop'] == route['next_hop']):
|
|
return True
|
|
return False
|
|
|
|
def remove_adv_route(self, bgp_speaker_id, route):
|
|
routes = self.cache[bgp_speaker_id]['advertised_routes']
|
|
updated_routes = [r for r in routes if (
|
|
r['destination'] != route['destination'])]
|
|
self.cache[bgp_speaker_id]['advertised_routes'] = updated_routes
|
|
|
|
def get_adv_routes(self, bgp_speaker_id):
|
|
return self.cache[bgp_speaker_id]['advertised_routes']
|
|
|
|
def get_state(self):
|
|
bgp_speaker_ids = self.get_bgp_speaker_ids()
|
|
num_bgp_speakers = len(bgp_speaker_ids)
|
|
num_bgp_peers = 0
|
|
num_advertised_routes = 0
|
|
for bgp_speaker_id in bgp_speaker_ids:
|
|
bgp_speaker = self.get_bgp_speaker_by_id(bgp_speaker_id)
|
|
num_bgp_peers += len(bgp_speaker['peers'])
|
|
num_advertised_routes += len(bgp_speaker['advertised_routes'])
|
|
return {'bgp_speakers': num_bgp_speakers,
|
|
'bgp_peers': num_bgp_peers,
|
|
'advertise_routes': num_advertised_routes}
|
|
|
|
|
|
class BgpDrAgentWithStateReport(BgpDrAgent):
|
|
def __init__(self, host, conf=None):
|
|
super(BgpDrAgentWithStateReport,
|
|
self).__init__(host, conf)
|
|
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
|
|
self.agent_state = {
|
|
'agent_type': bgp_consts.AGENT_TYPE_BGP_ROUTING,
|
|
'binary': 'neutron-bgp-dragent',
|
|
'configurations': {},
|
|
'host': host,
|
|
'topic': bgp_consts.BGP_DRAGENT,
|
|
'start_flag': True}
|
|
report_interval = cfg.CONF.AGENT.report_interval
|
|
if report_interval:
|
|
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
|
|
self._report_state)
|
|
self.heartbeat.start(interval=report_interval)
|
|
|
|
def _report_state(self):
|
|
LOG.debug("Report state task started")
|
|
try:
|
|
self.agent_state.get('configurations').update(
|
|
self.cache.get_state())
|
|
ctx = context.get_admin_context_without_session()
|
|
agent_status = self.state_rpc.report_state(ctx, self.agent_state,
|
|
True)
|
|
if agent_status == agent_consts.AGENT_REVIVED:
|
|
LOG.info(_LI("Agent has just been revived. "
|
|
"Scheduling full sync"))
|
|
self.schedule_full_resync(
|
|
reason=_("Agent has just been revived"))
|
|
except AttributeError:
|
|
# This means the server does not support report_state
|
|
LOG.warning(_LW("Neutron server does not support state report. "
|
|
"State report for this agent will be disabled."))
|
|
self.heartbeat.stop()
|
|
self.run()
|
|
return
|
|
except Exception:
|
|
LOG.exception(_LE("Failed reporting state!"))
|
|
return
|
|
if self.agent_state.pop('start_flag', None):
|
|
self.run()
|
|
|
|
def agent_updated(self, context, payload):
|
|
"""Handle the agent_updated notification event."""
|
|
self.schedule_full_resync(
|
|
reason=_("BgpDrAgent updated: %s") % payload)
|
|
LOG.info(_LI("agent_updated by server side %s!"), payload)
|
|
|
|
def after_start(self):
|
|
LOG.info(_LI("BGP dynamic routing agent started"))
|