neutron-dynamic-routing/neutron_dynamic_routing/services/bgp/agent/bgp_dragent.py

709 lines
31 KiB
Python

# Copyright 2016 Huawei Technologies India Pvt. Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
from neutron_lib import constants as n_const
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import loopingcall
from oslo_service import periodic_task
from oslo_utils import importutils
from neutron.agent import rpc as agent_rpc
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.common import utils
from neutron import context
from neutron import manager
from neutron_dynamic_routing.extensions import bgp as bgp_ext
from neutron_dynamic_routing._i18n import _, _LE, _LI, _LW
from neutron_dynamic_routing.services.bgp.agent.driver import exceptions as driver_exc # noqa
from neutron_dynamic_routing.services.bgp.common import constants as bgp_consts # noqa
LOG = logging.getLogger(__name__)
class BgpDrAgent(manager.Manager):
"""BGP Dynamic Routing agent service manager.
Note that the public methods of this class are exposed as the server side
of an rpc interface. The neutron server uses
neutron.api.rpc.agentnotifiers.bgp_dr_rpc_agent_api.
BgpDrAgentNotifyApi as the client side to execute the methods
here. For more information about changing rpc interfaces, see
http://docs.openstack.org/developer/neutron/devref/rpc_api.html.
API version history:
1.0 initial Version
"""
target = oslo_messaging.Target(version='1.0')
def __init__(self, host, conf=None):
super(BgpDrAgent, self).__init__()
self.initialize_driver(conf)
self.needs_resync_reasons = collections.defaultdict(list)
self.needs_full_sync_reason = None
self.cache = BgpSpeakerCache()
self.context = context.get_admin_context_without_session()
self.plugin_rpc = BgpDrPluginApi(bgp_consts.BGP_PLUGIN,
self.context, host)
def initialize_driver(self, conf):
self.conf = conf or cfg.CONF.BGP
try:
self.dr_driver_cls = (
importutils.import_object(self.conf.bgp_speaker_driver,
self.conf))
except ImportError:
LOG.exception(_LE("Error while importing BGP speaker driver %s"),
self.conf.bgp_speaker_driver)
raise SystemExit(1)
def _handle_driver_failure(self, bgp_speaker_id, method, driver_exec):
self.schedule_resync(reason=driver_exec,
speaker_id=bgp_speaker_id)
LOG.error(_LE('Call to driver for BGP Speaker %(bgp_speaker)s '
'%(method)s has failed with exception '
'%(driver_exec)s.'),
{'bgp_speaker': bgp_speaker_id,
'method': method,
'driver_exec': driver_exec})
def after_start(self):
self.run()
LOG.info(_LI("BGP Dynamic Routing agent started"))
def run(self):
"""Activate BGP Dynamic Routing agent."""
self.sync_state(self.context)
self.periodic_resync(self.context)
@utils.synchronized('bgp-dragent')
def sync_state(self, context, full_sync=None, bgp_speakers=None):
try:
hosted_bgp_speakers = self.plugin_rpc.get_bgp_speakers(context)
hosted_bgp_speaker_ids = [bgp_speaker['id']
for bgp_speaker in hosted_bgp_speakers]
cached_bgp_speakers = self.cache.get_bgp_speaker_ids()
for bgp_speaker_id in cached_bgp_speakers:
if bgp_speaker_id not in hosted_bgp_speaker_ids:
self.remove_bgp_speaker_from_dragent(bgp_speaker_id)
resync_all = not bgp_speakers or full_sync
only_bs = set() if resync_all else set(bgp_speakers)
for hosted_bgp_speaker in hosted_bgp_speakers:
hosted_bs_id = hosted_bgp_speaker['id']
if resync_all or hosted_bs_id in only_bs:
if not self.cache.is_bgp_speaker_added(hosted_bs_id):
self.safe_configure_dragent_for_bgp_speaker(
hosted_bgp_speaker)
continue
self.sync_bgp_speaker(hosted_bgp_speaker)
resync_reason = "Periodic route cache refresh"
self.schedule_resync(speaker_id=hosted_bs_id,
reason=resync_reason)
except Exception as e:
self.schedule_full_resync(reason=e)
LOG.error(_LE('Unable to sync BGP speaker state.'))
def sync_bgp_speaker(self, bgp_speaker):
# sync BGP Speakers
bgp_peer_ips = set(
[bgp_peer['peer_ip'] for bgp_peer in bgp_speaker['peers']])
cached_bgp_peer_ips = set(
self.cache.get_bgp_peer_ips(bgp_speaker['id']))
removed_bgp_peer_ips = cached_bgp_peer_ips - bgp_peer_ips
for bgp_peer_ip in removed_bgp_peer_ips:
self.remove_bgp_peer_from_bgp_speaker(bgp_speaker['id'],
bgp_peer_ip)
if bgp_peer_ips:
self.add_bgp_peers_to_bgp_speaker(bgp_speaker)
# sync advertise routes
cached_adv_routes = self.cache.get_adv_routes(bgp_speaker['id'])
adv_routes = bgp_speaker['advertised_routes']
if cached_adv_routes == adv_routes:
return
for cached_route in cached_adv_routes:
if cached_route not in adv_routes:
self.withdraw_route_via_bgp_speaker(bgp_speaker['id'],
bgp_speaker['local_as'],
cached_route)
self.advertise_routes_via_bgp_speaker(bgp_speaker)
@utils.exception_logger()
def _periodic_resync_helper(self, context):
"""Resync the BgpDrAgent state at the configured interval."""
if self.needs_resync_reasons or self.needs_full_sync_reason:
full_sync = self.needs_full_sync_reason
reasons = self.needs_resync_reasons
# Reset old reasons
self.needs_full_sync_reason = None
self.needs_resync_reasons = collections.defaultdict(list)
if full_sync:
LOG.debug("resync all: %(reason)s", {"reason": full_sync})
for bgp_speaker, reason in reasons.items():
LOG.debug("resync (%(bgp_speaker)s): %(reason)s",
{"reason": reason, "bgp_speaker": bgp_speaker})
self.sync_state(
context, full_sync=full_sync, bgp_speakers=reasons.keys())
# NOTE: spacing is set 1 sec. The actual interval is controlled
# by neutron/service.py which defaults to CONF.periodic_interval
@periodic_task.periodic_task(spacing=1)
def periodic_resync(self, context):
LOG.debug("Started periodic resync.")
self._periodic_resync_helper(context)
@utils.synchronized('bgp-dr-agent')
def bgp_speaker_create_end(self, context, payload):
"""Handle bgp_speaker_create_end notification event."""
bgp_speaker_id = payload['bgp_speaker']['id']
LOG.debug('Received BGP speaker create notification for '
'speaker_id=%(speaker_id)s from the neutron server.',
{'speaker_id': bgp_speaker_id})
self.add_bgp_speaker_helper(bgp_speaker_id)
@utils.synchronized('bgp-dr-agent')
def bgp_speaker_remove_end(self, context, payload):
"""Handle bgp_speaker_create_end notification event."""
bgp_speaker_id = payload['bgp_speaker']['id']
LOG.debug('Received BGP speaker remove notification for '
'speaker_id=%(speaker_id)s from the neutron server.',
{'speaker_id': bgp_speaker_id})
self.remove_bgp_speaker_from_dragent(bgp_speaker_id)
@utils.synchronized('bgp-dr-agent')
def bgp_peer_association_end(self, context, payload):
"""Handle bgp_peer_association_end notification event."""
bgp_peer_id = payload['bgp_peer']['peer_id']
bgp_speaker_id = payload['bgp_peer']['speaker_id']
LOG.debug('Received BGP peer associate notification for '
'speaker_id=%(speaker_id)s peer_id=%(peer_id)s '
'from the neutron server.',
{'speaker_id': bgp_speaker_id,
'peer_id': bgp_peer_id})
self.add_bgp_peer_helper(bgp_speaker_id, bgp_peer_id)
@utils.synchronized('bgp-dr-agent')
def bgp_peer_disassociation_end(self, context, payload):
"""Handle bgp_peer_disassociation_end notification event."""
bgp_peer_ip = payload['bgp_peer']['peer_ip']
bgp_speaker_id = payload['bgp_peer']['speaker_id']
LOG.debug('Received BGP peer disassociate notification for '
'speaker_id=%(speaker_id)s peer_ip=%(peer_ip)s '
'from the neutron server.',
{'speaker_id': bgp_speaker_id,
'peer_ip': bgp_peer_ip})
self.remove_bgp_peer_from_bgp_speaker(bgp_speaker_id, bgp_peer_ip)
@utils.synchronized('bgp-dr-agent')
def bgp_routes_advertisement_end(self, context, payload):
"""Handle bgp_routes_advertisement_end notification event."""
bgp_speaker_id = payload['advertise_routes']['speaker_id']
LOG.debug('Received routes advertisement end notification '
'for speaker_id=%(speaker_id)s from the neutron server.',
{'speaker_id': bgp_speaker_id})
routes = payload['advertise_routes']['routes']
self.add_routes_helper(bgp_speaker_id, routes)
@utils.synchronized('bgp-dr-agent')
def bgp_routes_withdrawal_end(self, context, payload):
"""Handle bgp_routes_withdrawal_end notification event."""
bgp_speaker_id = payload['withdraw_routes']['speaker_id']
LOG.debug('Received route withdrawal notification for '
'speaker_id=%(speaker_id)s from the neutron server.',
{'speaker_id': bgp_speaker_id})
routes = payload['withdraw_routes']['routes']
self.withdraw_routes_helper(bgp_speaker_id, routes)
def add_bgp_speaker_helper(self, bgp_speaker_id):
"""Add BGP speaker."""
bgp_speaker = self.safe_get_bgp_speaker_info(bgp_speaker_id)
if bgp_speaker:
self.add_bgp_speaker_on_dragent(bgp_speaker)
def add_bgp_peer_helper(self, bgp_speaker_id, bgp_peer_id):
"""Add BGP peer."""
# Ideally BGP Speaker must be added by now, If not then let's
# re-sync.
if not self.cache.is_bgp_speaker_added(bgp_speaker_id):
self.schedule_resync(speaker_id=bgp_speaker_id,
reason="BGP Speaker Out-of-sync")
return
bgp_peer = self.safe_get_bgp_peer_info(bgp_speaker_id,
bgp_peer_id)
if bgp_peer:
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(
bgp_speaker_id)
self.add_bgp_peer_to_bgp_speaker(bgp_speaker_id,
bgp_speaker_as,
bgp_peer)
def add_routes_helper(self, bgp_speaker_id, routes):
"""Advertise routes to BGP speaker."""
# Ideally BGP Speaker must be added by now, If not then let's
# re-sync.
if not self.cache.is_bgp_speaker_added(bgp_speaker_id):
self.schedule_resync(speaker_id=bgp_speaker_id,
reason="BGP Speaker Out-of-sync")
return
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(bgp_speaker_id)
for route in routes:
self.advertise_route_via_bgp_speaker(bgp_speaker_id,
bgp_speaker_as,
route)
if self.is_resync_scheduled(bgp_speaker_id):
break
def withdraw_routes_helper(self, bgp_speaker_id, routes):
"""Withdraw routes advertised by BGP speaker."""
# Ideally BGP Speaker must be added by now, If not then let's
# re-sync.
if not self.cache.is_bgp_speaker_added(bgp_speaker_id):
self.schedule_resync(speaker_id=bgp_speaker_id,
reason="BGP Speaker Out-of-sync")
return
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(bgp_speaker_id)
for route in routes:
self.withdraw_route_via_bgp_speaker(bgp_speaker_id,
bgp_speaker_as,
route)
if self.is_resync_scheduled(bgp_speaker_id):
break
def safe_get_bgp_speaker_info(self, bgp_speaker_id):
try:
bgp_speaker = self.plugin_rpc.get_bgp_speaker_info(self.context,
bgp_speaker_id)
if not bgp_speaker:
LOG.warning(_LW('BGP Speaker %s has been deleted.'),
bgp_speaker_id)
return bgp_speaker
except Exception as e:
self.schedule_resync(speaker_id=bgp_speaker_id,
reason=e)
LOG.error(_LE('BGP Speaker %(bgp_speaker)s info call '
'failed with reason=%(e)s.'),
{'bgp_speaker': bgp_speaker_id, 'e': e})
def safe_get_bgp_peer_info(self, bgp_speaker_id, bgp_peer_id):
try:
bgp_peer = self.plugin_rpc.get_bgp_peer_info(self.context,
bgp_peer_id)
if not bgp_peer:
LOG.warning(_LW('BGP Peer %s has been deleted.'), bgp_peer)
return bgp_peer
except Exception as e:
self.schedule_resync(speaker_id=bgp_speaker_id,
reason=e)
LOG.error(_LE('BGP peer %(bgp_peer)s info call '
'failed with reason=%(e)s.'),
{'bgp_peer': bgp_peer_id, 'e': e})
@utils.exception_logger()
def safe_configure_dragent_for_bgp_speaker(self, bgp_speaker):
try:
self.add_bgp_speaker_on_dragent(bgp_speaker)
except (bgp_ext.BgpSpeakerNotFound, RuntimeError):
LOG.warning(_LW('BGP speaker %s may have been deleted and its '
'resources may have already been disposed.'),
bgp_speaker['id'])
def add_bgp_speaker_on_dragent(self, bgp_speaker):
# Caching BGP speaker details in BGPSpeakerCache. Will be used
# during smooth.
self.cache.put_bgp_speaker(bgp_speaker)
LOG.debug('Calling driver for adding BGP speaker %(speaker_id)s,'
' speaking for local_as %(local_as)s',
{'speaker_id': bgp_speaker['id'],
'local_as': bgp_speaker['local_as']})
try:
self.dr_driver_cls.add_bgp_speaker(bgp_speaker['local_as'])
except driver_exc.BgpSpeakerAlreadyScheduled:
return
except Exception as e:
self._handle_driver_failure(bgp_speaker['id'],
'add_bgp_speaker', e)
# Add peer and route information to the driver.
self.add_bgp_peers_to_bgp_speaker(bgp_speaker)
self.advertise_routes_via_bgp_speaker(bgp_speaker)
self.schedule_resync(speaker_id=bgp_speaker['id'],
reason="Periodic route cache refresh")
def remove_bgp_speaker_from_dragent(self, bgp_speaker_id):
if self.cache.is_bgp_speaker_added(bgp_speaker_id):
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(
bgp_speaker_id)
self.cache.remove_bgp_speaker_by_id(bgp_speaker_id)
LOG.debug('Calling driver for removing BGP speaker %(speaker_as)s',
{'speaker_as': bgp_speaker_as})
try:
self.dr_driver_cls.delete_bgp_speaker(bgp_speaker_as)
except Exception as e:
self._handle_driver_failure(bgp_speaker_id,
'remove_bgp_speaker', e)
return
# Ideally, only the added speakers can be removed by the neutron
# server. Looks like there might be some synchronization
# issue between the server and the agent. Let's initiate a re-sync
# to resolve the issue.
self.schedule_resync(speaker_id=bgp_speaker_id,
reason="BGP Speaker Out-of-sync")
def add_bgp_peers_to_bgp_speaker(self, bgp_speaker):
for bgp_peer in bgp_speaker['peers']:
self.add_bgp_peer_to_bgp_speaker(bgp_speaker['id'],
bgp_speaker['local_as'],
bgp_peer)
if self.is_resync_scheduled(bgp_speaker['id']):
break
def add_bgp_peer_to_bgp_speaker(self, bgp_speaker_id,
bgp_speaker_as, bgp_peer):
if self.cache.get_bgp_peer_by_ip(bgp_speaker_id, bgp_peer['peer_ip']):
return
self.cache.put_bgp_peer(bgp_speaker_id, bgp_peer)
LOG.debug('Calling driver interface for adding BGP peer %(peer_ip)s '
'remote_as=%(remote_as)s to BGP Speaker running for '
'local_as=%(local_as)d',
{'peer_ip': bgp_peer['peer_ip'],
'remote_as': bgp_peer['remote_as'],
'local_as': bgp_speaker_as})
try:
self.dr_driver_cls.add_bgp_peer(bgp_speaker_as,
bgp_peer['peer_ip'],
bgp_peer['remote_as'],
bgp_peer['auth_type'],
bgp_peer['password'])
except Exception as e:
self._handle_driver_failure(bgp_speaker_id,
'add_bgp_peer', e)
def remove_bgp_peer_from_bgp_speaker(self, bgp_speaker_id, bgp_peer_ip):
# Ideally BGP Speaker must be added by now, If not then let's
# re-sync.
if not self.cache.is_bgp_speaker_added(bgp_speaker_id):
self.schedule_resync(speaker_id=bgp_speaker_id,
reason="BGP Speaker Out-of-sync")
return
if self.cache.is_bgp_peer_added(bgp_speaker_id, bgp_peer_ip):
self.cache.remove_bgp_peer_by_ip(bgp_speaker_id, bgp_peer_ip)
bgp_speaker_as = self.cache.get_bgp_speaker_local_as(
bgp_speaker_id)
LOG.debug('Calling driver interface to remove BGP peer '
'%(peer_ip)s from BGP Speaker running for '
'local_as=%(local_as)d',
{'peer_ip': bgp_peer_ip, 'local_as': bgp_speaker_as})
try:
self.dr_driver_cls.delete_bgp_peer(bgp_speaker_as,
bgp_peer_ip)
except Exception as e:
self._handle_driver_failure(bgp_speaker_id,
'remove_bgp_peer', e)
return
# Ideally, only the added peers can be removed by the neutron
# server. Looks like there might be some synchronization
# issue between the server and the agent. Let's initiate a re-sync
# to resolve the issue.
self.schedule_resync(speaker_id=bgp_speaker_id,
reason="BGP Peer Out-of-sync")
def advertise_routes_via_bgp_speaker(self, bgp_speaker):
for route in bgp_speaker['advertised_routes']:
self.advertise_route_via_bgp_speaker(bgp_speaker['id'],
bgp_speaker['local_as'],
route)
if self.is_resync_scheduled(bgp_speaker['id']):
break
def advertise_route_via_bgp_speaker(self, bgp_speaker_id,
bgp_speaker_as, route):
if self.cache.is_route_advertised(bgp_speaker_id, route):
# Requested route already advertised. Hence, Nothing to be done.
return
self.cache.put_adv_route(bgp_speaker_id, route)
LOG.debug('Calling driver for advertising prefix: %(cidr)s, '
'next_hop: %(nexthop)s',
{'cidr': route['destination'],
'nexthop': route['next_hop']})
try:
self.dr_driver_cls.advertise_route(bgp_speaker_as,
route['destination'],
route['next_hop'])
except Exception as e:
self._handle_driver_failure(bgp_speaker_id,
'advertise_route', e)
def withdraw_route_via_bgp_speaker(self, bgp_speaker_id,
bgp_speaker_as, route):
if self.cache.is_route_advertised(bgp_speaker_id, route):
self.cache.remove_adv_route(bgp_speaker_id, route)
LOG.debug('Calling driver for withdrawing prefix: %(cidr)s, '
'next_hop: %(nexthop)s',
{'cidr': route['destination'],
'nexthop': route['next_hop']})
try:
self.dr_driver_cls.withdraw_route(bgp_speaker_as,
route['destination'],
route['next_hop'])
except Exception as e:
self._handle_driver_failure(bgp_speaker_id,
'withdraw_route', e)
return
# Ideally, only the advertised routes can be withdrawn by the
# neutron server. Looks like there might be some synchronization
# issue between the server and the agent. Let's initiate a re-sync
# to resolve the issue.
self.schedule_resync(speaker_id=bgp_speaker_id,
reason="Advertised routes Out-of-sync")
def schedule_full_resync(self, reason):
LOG.debug('Recording full resync request for all BGP Speakers '
'with reason=%s', reason)
self.needs_full_sync_reason = reason
def schedule_resync(self, reason, speaker_id):
"""Schedule a full resync for a given BGP Speaker.
If no BGP Speaker is specified, resync all BGP Speakers.
"""
LOG.debug('Recording resync request for BGP Speaker %s '
'with reason=%s', speaker_id, reason)
self.needs_resync_reasons[speaker_id].append(reason)
def is_resync_scheduled(self, bgp_speaker_id):
if bgp_speaker_id not in self.needs_resync_reasons:
return False
reason = self.needs_resync_reasons[bgp_speaker_id]
# Re-sync scheduled for the queried BGP speaker. No point
# continuing further. Let's stop processing and wait for
# re-sync to happen.
LOG.debug('Re-sync already scheduled for BGP Speaker %s '
'with reason=%s', bgp_speaker_id, reason)
return True
class BgpDrPluginApi(object):
"""Agent side of BgpDrAgent RPC API.
This class implements the client side of an rpc interface.
The server side of this interface can be found in
neutron.api.rpc.handlers.bgp_speaker_rpc.BgpSpeakerRpcCallback.
For more information about changing rpc interfaces, see
doc/source/devref/rpc_api.rst.
API version history:
1.0 - Initial version.
"""
def __init__(self, topic, context, host):
self.context = context
self.host = host
target = oslo_messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def get_bgp_speakers(self, context):
"""Make a remote process call to retrieve all BGP speakers info."""
cctxt = self.client.prepare()
return cctxt.call(context, 'get_bgp_speakers', host=self.host)
def get_bgp_speaker_info(self, context, bgp_speaker_id):
"""Make a remote process call to retrieve a BGP speaker info."""
cctxt = self.client.prepare()
return cctxt.call(context, 'get_bgp_speaker_info',
bgp_speaker_id=bgp_speaker_id)
def get_bgp_peer_info(self, context, bgp_peer_id):
"""Make a remote process call to retrieve a BGP peer info."""
cctxt = self.client.prepare()
return cctxt.call(context, 'get_bgp_peer_info',
bgp_peer_id=bgp_peer_id)
class BgpSpeakerCache(object):
"""Agent cache of the current BGP speaker state.
This class is designed to support the advertisement for
multiple BGP speaker via a single driver interface.
Version history:
1.0 - Initial version for caching the state of BGP speaker.
"""
def __init__(self):
self.cache = {}
def get_bgp_speaker_ids(self):
return self.cache.keys()
def put_bgp_speaker(self, bgp_speaker):
if bgp_speaker['id'] in self.cache:
self.remove_bgp_speaker_by_id(self.cache[bgp_speaker['id']])
self.cache[bgp_speaker['id']] = {'bgp_speaker': bgp_speaker,
'peers': {},
'advertised_routes': []}
def get_bgp_speaker_by_id(self, bgp_speaker_id):
if bgp_speaker_id in self.cache:
return self.cache[bgp_speaker_id]['bgp_speaker']
def get_bgp_speaker_local_as(self, bgp_speaker_id):
bgp_speaker = self.get_bgp_speaker_by_id(bgp_speaker_id)
if bgp_speaker:
return bgp_speaker['local_as']
def is_bgp_speaker_added(self, bgp_speaker_id):
return self.get_bgp_speaker_by_id(bgp_speaker_id)
def remove_bgp_speaker_by_id(self, bgp_speaker_id):
if bgp_speaker_id in self.cache:
del self.cache[bgp_speaker_id]
def put_bgp_peer(self, bgp_speaker_id, bgp_peer):
if bgp_peer['peer_ip'] in self.get_bgp_peer_ips(bgp_speaker_id):
del self.cache[bgp_speaker_id]['peers'][bgp_peer['peer_ip']]
self.cache[bgp_speaker_id]['peers'][bgp_peer['peer_ip']] = bgp_peer
def is_bgp_peer_added(self, bgp_speaker_id, bgp_peer_ip):
return self.get_bgp_peer_by_ip(bgp_speaker_id, bgp_peer_ip)
def get_bgp_peer_ips(self, bgp_speaker_id):
bgp_speaker = self.get_bgp_speaker_by_id(bgp_speaker_id)
if bgp_speaker:
return self.cache[bgp_speaker_id]['peers'].keys()
def get_bgp_peer_by_ip(self, bgp_speaker_id, bgp_peer_ip):
bgp_speaker = self.get_bgp_speaker_by_id(bgp_speaker_id)
if bgp_speaker:
return self.cache[bgp_speaker_id]['peers'].get(bgp_peer_ip)
def remove_bgp_peer_by_ip(self, bgp_speaker_id, bgp_peer_ip):
if bgp_peer_ip in self.get_bgp_peer_ips(bgp_speaker_id):
del self.cache[bgp_speaker_id]['peers'][bgp_peer_ip]
def put_adv_route(self, bgp_speaker_id, route):
self.cache[bgp_speaker_id]['advertised_routes'].append(route)
def is_route_advertised(self, bgp_speaker_id, route):
routes = self.cache[bgp_speaker_id]['advertised_routes']
for r in routes:
if r['destination'] == route['destination'] and (
r['next_hop'] == route['next_hop']):
return True
return False
def remove_adv_route(self, bgp_speaker_id, route):
routes = self.cache[bgp_speaker_id]['advertised_routes']
updated_routes = [r for r in routes if (
r['destination'] != route['destination'])]
self.cache[bgp_speaker_id]['advertised_routes'] = updated_routes
def get_adv_routes(self, bgp_speaker_id):
return self.cache[bgp_speaker_id]['advertised_routes']
def get_state(self):
bgp_speaker_ids = self.get_bgp_speaker_ids()
num_bgp_speakers = len(bgp_speaker_ids)
num_bgp_peers = 0
num_advertised_routes = 0
for bgp_speaker_id in bgp_speaker_ids:
bgp_speaker = self.get_bgp_speaker_by_id(bgp_speaker_id)
num_bgp_peers += len(bgp_speaker['peers'])
num_advertised_routes += len(bgp_speaker['advertised_routes'])
return {'bgp_speakers': num_bgp_speakers,
'bgp_peers': num_bgp_peers,
'advertise_routes': num_advertised_routes}
class BgpDrAgentWithStateReport(BgpDrAgent):
def __init__(self, host, conf=None):
super(BgpDrAgentWithStateReport,
self).__init__(host, conf)
self.state_rpc = agent_rpc.PluginReportStateAPI(topics.REPORTS)
self.agent_state = {
'agent_type': bgp_consts.AGENT_TYPE_BGP_ROUTING,
'binary': 'neutron-bgp-dragent',
'configurations': {},
'host': host,
'topic': bgp_consts.BGP_DRAGENT,
'start_flag': True}
report_interval = cfg.CONF.AGENT.report_interval
if report_interval:
self.heartbeat = loopingcall.FixedIntervalLoopingCall(
self._report_state)
self.heartbeat.start(interval=report_interval)
def _report_state(self):
LOG.debug("Report state task started")
try:
self.agent_state.get('configurations').update(
self.cache.get_state())
ctx = context.get_admin_context_without_session()
agent_status = self.state_rpc.report_state(ctx, self.agent_state,
True)
if agent_status == n_const.AGENT_REVIVED:
LOG.info(_LI("Agent has just been revived. "
"Scheduling full sync"))
self.schedule_full_resync(
reason=_("Agent has just been revived"))
except AttributeError:
# This means the server does not support report_state
LOG.warning(_LW("Neutron server does not support state report. "
"State report for this agent will be disabled."))
self.heartbeat.stop()
self.run()
return
except Exception:
LOG.exception(_LE("Failed reporting state!"))
return
if self.agent_state.pop('start_flag', None):
self.run()
def agent_updated(self, context, payload):
"""Handle the agent_updated notification event."""
self.schedule_full_resync(
reason=_("BgpDrAgent updated: %s") % payload)
LOG.info(_LI("agent_updated by server side %s!"), payload)
def after_start(self):
LOG.info(_LI("BGP dynamic routing agent started"))